Graph Builder API リファレンス (プレビュー)

sentinel_graph クラスは、Microsoft Sentinel グラフと対話する方法を提供します。これにより、グラフ スキーマの定義、Microsoft Sentinel データ レイクからノードとエッジへのデータの変換、グラフの公開、グラフのクエリ、高度なグラフ アルゴリズムの実行を行うことができます。 このクラスは、Microsoft Sentinel Spark コンピューティングで実行されている Jupyter Notebook の Spark セッションを操作するように設計されています。

GraphSpecBuilder

GraphSpecBuilder クラスは、データ パイプラインとスキーマ統合を使用してグラフ仕様を作成するための fluent ビルダーを提供します。

重要

このクラスの GraphBuilder エイリアスは非推奨となり、今後のバージョンで削除される予定です。 すべての新しいコードで GraphSpecBuilder を使用します。

# Deprecated — emits DeprecationWarning
from sentinel_graph.builders.graph_builder import GraphBuilder

# Recommended
from sentinel_graph import GraphSpecBuilder

コンストラクター

GraphSpecBuilder(context: ExecutionContext)

パラメーター:

  • context (ExecutionContext): Spark セッションと構成を含む実行コンテキスト

発生 させます:

  • ValueError: コンテキストが None の場合、またはグラフ名を決定できない場合

静的メソッド

start

GraphSpecBuilder.start(context: Optional[ExecutionContext] = None) -> GraphSpecBuilder

新しい Fluent グラフ ビルダーを定義します。

パラメーター:

  • context (ExecutionContext、省略可能): ExecutionContext インスタンス。 None の場合は、既定のコンテキストを使用します。

返します:

  • GraphSpecBuilder: 新しいビルダー インスタンス

例:

builder = GraphSpecBuilder.start(context=context)

インスタンス メソッド

add_node

def add_node(alias: str) -> NodeBuilderInitial

ノード定義の作成を開始します。

パラメーター:

  • alias (str): グラフ内のこのノードの一意識別子

返します:

  • NodeBuilderInitial: 初期状態のノード ビルダー

例:

builder.add_node("user")

add_edge

def add_edge(alias: str) -> EdgeBuilderInitial

エッジ定義の作成を開始します。

パラメーター:

  • alias (str): グラフ内のこのエッジの識別子 (複数のエッジ間で共有できます)

返します:

  • EdgeBuilderInitial: 初期状態のエッジ ビルダー

例:

builder.add_edge("accessed")

done

def done() -> GraphSpec

グラフの仕様を確定し、GraphSpec インスタンスを返します。

返します:

  • GraphSpec: データ パイプラインとスキーマを使用した完全なグラフ仕様

発生 させます:

  • ValueError: グラフにノードまたはエッジがない場合、または検証が失敗した場合

例:

graph_spec = builder.done()

GraphSpec

データ パイプライン、スキーマ、および表示機能を備えたグラフの仕様。

コンストラクター

GraphSpec(
    name: str,
    context: ExecutionContext,
    graph_schema: GraphSchema,
    etl_pipeline: Optional[ETLPipeline] = None
)

パラメーター:

  • name (str): グラフ名
  • context (ExecutionContext): 実行コンテキスト
  • graph_schema (GraphSchema): グラフ スキーマ定義
  • etl_pipeline (ETLPipeline、省略可能): グラフ準備用のデータ パイプライン

プロパティ

nodes

def nodes() -> DataFrame

ノード DataFrame (遅延、キャッシュ) を取得します。 データ パイプラインまたはレイク テーブルからソースを自動的に決定します。

返します:

  • DataFrame: すべてのノードを含む Spark DataFrame

発生 させます:

  • ValueError: コンテキストが見つからない場合、または DataFrames を読み込めなかった場合

edges

def edges() -> DataFrame

エッジ DataFrame (遅延、キャッシュ) を取得します。 データ パイプラインまたはレイク テーブルからソースを自動的に決定します。

返します:

  • DataFrame: すべてのエッジを含む Spark DataFrame

発生 させます:

  • ValueError: コンテキストが見つからない場合、または DataFrames を読み込めなかった場合

メソッド

build_graph_with_data

注:

build_graph_with_data は非推奨であり、将来のバージョンでは削除される予定です。 代わりに Graph.build(spec) を使用します。

def build_graph_with_data() -> Dict[str, Any]

データ パイプラインを実行し、グラフを発行します。 内部的に Graph.build(self)を呼び出し、返された Graphを隠し、下位互換性のあるディクショナリを返します。

返します:

  • Dict[str, Any]: を含むディクショナリ:
    • etl_result: データ準備の結果
    • api_result: 結果を発行する (成功した場合)
    • api_error: エラー文字列 (発行に失敗した場合)
    • instance_name: Graph インスタンス名
    • status: "published" または "prepared"

例:

graph = Graph.build(spec)
print(f"Status: {graph.build_status.status}")

get_schema

def get_schema() -> GraphSchema

グラフ スキーマを取得します。

返します:

  • GraphSchema: グラフ スキーマ定義

get_pipeline

注:

このメソッドは非推奨であり、今後のバージョンでは削除される予定です。 データ パイプラインは内部実装の詳細であり、直接アクセスしないでください。

def get_pipeline() -> Optional[ETLPipeline]

データ パイプラインを取得します (既存のグラフの場合はなし)。

返します:

  • ETLPipeline または None: 使用可能な場合はデータ パイプライン

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

グラフ アルゴリズムを実行するために、グラフ全体を GraphFrame に変換します。 ローカル データのみを操作します (データ パイプラインまたはレイク テーブルから)。

パラメーター:

  • column_mapping (Dict[str, str], optional): キーを使用したカスタム列マッピング:
    • "id": 頂点 ID 列名
    • "source_id": エッジ ソース ID 列名
    • "target_id": Edge ターゲット ID 列名

返します:

  • GraphFrame: すべての頂点とエッジを持つ GraphFrame オブジェクト

発生 させます:

  • ValueError: ExecutionContext が使用できない場合

例:

gf = graph_spec.to_graphframe()
pagerank = gf.pageRank(resetProbability=0.15, maxIter=10)

show

def show(limit: int = 100, viz_format: str = "visual") -> None

グラフ データをさまざまな形式で表示します。

パラメーター:

  • limit (int,default=100): 表示する最大ノード/エッジ
  • viz_format (str, default="visual"): 出力形式
    • "table": 完全な DataFrame テーブル (すべての列)
    • "visual": 対話型グラフの視覚化
    • "all": すべての形式を表示

発生 させます:

  • ValueError: フォーマットがサポートされている値の 1 つでない場合

例:

graph_spec.show(limit=50, viz_format="table")

show_schema

def show_schema() -> None

グラフ スキーマを対話型のグラフ視覚化として表示します。

例:

spec.show_schema()

Graph

クエリ可能なグラフ インスタンス。 Graph.get() (既存のグラフ) または (GraphSpecから) Graph.build()を使用して作成されます。

コンストラクター

Graph(
    name: str,
    context: ExecutionContext,
    spec: Optional[GraphSpec] = None,
    build_status: Optional[BuildStatus] = None,
)

パラメーター:

  • name (str): グラフ名
  • context (ExecutionContext): 実行コンテキスト
  • spec (GraphSpec、省略可能): 添付グラフの仕様 ( Graph.build()によって設定)
  • build_status (BuildStatus、省略可能): 結果メタデータのビルド ( Graph.build()によって設定)

発生 させます:

  • ValueError: ExecutionContext が None の場合

静的メソッド

get

Graph.get(name: str, context: Optional[ExecutionContext] = None) -> Graph

既存のグラフからグラフ インスタンスを取得します。 返された Graph には、 spec=Nonebuild_status=Noneがあります。

パラメーター:

  • name (str): Graph インスタンス名
  • context (ExecutionContext、省略可能): 実行コンテキスト (既定値は ExecutionContext.default())

返します:

  • Graph: Graph インスタンス

発生 させます:

  • ValueError: グラフ名が空の場合、またはグラフ インスタンスが存在しない場合

例:

graph = Graph.get("my_graph", context=context)
graph.query("MATCH (n) RETURN n")

prepare

Graph.prepare(spec: GraphSpec) -> Graph

GraphSpecのデータ準備ステージを発行なしで実行します。 後 publish() 使用してグラフを登録し、クエリ可能にします。

パラメーター:

  • spec (GraphSpec): 準備するグラフの仕様

返します:

  • Graph: spec がアタッチされた Graph インスタンスと build_status.status == "prepared"

発生 させます:

  • ValueError: spec にデータ パイプラインがない場合、または実行コンテキストがない場合
  • RuntimeError: データ パイプラインの実行が失敗した場合

例:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.prepare(spec)
# Inspect results before publishing
graph.nodes.show()
graph.publish()
graph.query("MATCH (n) RETURN n")

build

Graph.build(spec: GraphSpec) -> Graph

データを準備して公開することで、 GraphSpec からグラフを作成します。 内部的に Graph.prepare(spec) を呼び出し、 graph.publish()を試行します。 これら 2 つのメソッドを個別に呼び出すのとは異なり、発行エラーがキャッチされます。返されるグラフは、発生する代わりに build_status.status == "prepared"build_status.api_error 設定されます。

パラメーター:

  • spec (GraphSpec): ビルド元のグラフ仕様

返します:

  • Graph: spec がアタッチされ、 build_status が設定された Graph インスタンス

発生 させます:

  • ValueError: spec にデータ パイプラインがない場合、または実行コンテキストがない場合
  • RuntimeError: データ パイプラインの実行が失敗した場合

例:

spec = GraphSpecBuilder.start(context=ctx).add_node(...).done()
graph = Graph.build(spec)
print(graph.build_status.status)  # "published" or "prepared" (None if neither ran)
graph.query("MATCH (n) RETURN n")

プロパティ

nodes

def nodes() -> Optional[DataFrame]

ノード DataFrame を取得します。 仕様がアタッチされたときに self.spec.nodes を委任します。それ以外の場合は None を返します。

edges

def edges() -> Optional[DataFrame]

エッジ DataFrame を取得します。 仕様がアタッチされたときに self.spec.edges を委任します。それ以外の場合は None を返します。

schema

def schema() -> Optional[GraphSchema]

グラフ スキーマを取得します。 仕様がアタッチされたときに self.spec.get_schema() を委任します。それ以外の場合は None を返します。

メソッド

query

def query(query_string: str, query_language: str = "GQL") -> QueryResult

GQL を使用してグラフ インスタンスに対してクエリを実行します。

パラメーター:

  • query_string (str): Graph クエリ文字列 (GQL 言語)
  • query_language (str, default="GQL"): クエリ言語

返します:

  • QueryResult: ノード、エッジ、およびメタデータを含むオブジェクト

発生 させます:

  • ValueError: ExecutionContext または Spark セッションが見つからない場合
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

例:

result = graph.query("MATCH (u:user) WHERE u.age > 30 RETURN u")
result.show()

reachability

def reachability(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

[!注] reachability(query_input=ReachabilityQueryInput(...)) は引き続き受け入れられますが、 DeprecationWarning 出力され、今後のバージョンで削除されます。

ソース ノードとターゲット ノードの間で到達可能性分析を実行します。

パラメーター:

  • source_property_value (str): ソース プロパティに一致する値 (実行時に検証されます)。 query_input) を使用しない場合は指定する必要があります。
  • target_property_value (str): ターゲット プロパティに一致する値 (実行時に検証されます)。 query_input) を使用しない場合は指定する必要があります。
  • source_property (省略可能[str]): ソース ノードをフィルター処理するプロパティ名
  • participating_source_node_labels (省略可能[List[str]]): ソースとして考慮するノード ラベル
  • target_property (省略可能[str]): ターゲット ノードをフィルター処理するプロパティ名
  • participating_target_node_labels (省略可能[List[str]]): ターゲットとして考慮するノード ラベル
  • participating_edge_labels (省略可能[List[str]]): 走査するエッジ ラベル
  • is_directional (bool): エッジが方向であるかどうか (既定値: True)
  • min_hop_count (int): 最小ホップ (既定値: 1)
  • max_hop_count (int): 最大ホップ (既定値: 4)
  • shortest_path (bool): 最短パスのみを返します (既定値: False)
  • max_results (int): 最大結果 (既定値: 500)

発生 させます:

  • ValueError: source_property_value または target_property_value が見つからない場合は、 min_hop_count < 1max_hop_count < min_hop_count、または max_results < 1
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

返します:

  • QueryResult: 到達可能性パスを含む

例:

result = graph.reachability(
    source_property_value="user-001",
    target_property_value="device-003")
result.show()

k_hop

def k_hop(
    *,
    source_property: Optional[str] = None,
    source_property_value: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    target_property_value: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

注:

k_hop(query_input=K_HopQueryInput(...)) は引き続き受け入れられますが、 DeprecationWarning 出力され、将来のバージョンで削除されます。

特定のソース ノードから k ホップ分析を実行します。

パラメーター:

検証:

  • 少なくとも 1 つの source_property_value または target_property_value を指定する必要があります

発生 させます:

  • ValueError: source_property_valuetarget_property_value も指定されていない場合、または数値制約に違反した場合 ( reachabilityと同じ)
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

返します:

  • QueryResult: k ホップの結果を含む

例:

result = graph.k_hop(source_property_value="user-001")
result.show()

blast_radius

def blast_radius(
    *,
    source_property_value: str = None,
    target_property_value: str = None,
    source_property: Optional[str] = None,
    participating_source_node_labels: Optional[List[str]] = None,
    target_property: Optional[str] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

注:

blast_radius(query_input=BlastRadiusQueryInput(...)) は引き続き受け入れられますが、 DeprecationWarning 出力され、将来のバージョンで削除されます。

ソース ノードからターゲット ノードへのブラスト半径分析を実行します。

パラメーター:

  • source_property_value (str): ソース ノードを識別する値 (実行時に検証されます)。 query_input) を使用しない場合は指定する必要があります。
  • target_property_value (str): ターゲット ノードを識別する値 (実行時に検証されます)。 query_input) を使用しない場合は指定する必要があります。
  • その他のパラメーター: と同じ reachability

発生 させます:

  • ValueError: source_property_value または target_property_value が見つからない場合、または数値制約に違反した場合 ( reachabilityと同じ)
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

返します:

  • QueryResult:爆発半径の結果を含む

例:

result = graph.blast_radius(
    source_property_value="user-003",
    target_property_value="device-003",
    min_hop_count=1)
result.show()

centrality

def centrality(
    *,
    participating_source_node_labels: Optional[List[str]] = None,
    participating_target_node_labels: Optional[List[str]] = None,
    participating_edge_labels: Optional[List[str]] = None,
    threshold: int = 3,
    centrality_type: CentralityType = None,
    max_paths: int = 1000000,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

注:

centrality(query_input=CentralityQueryInput(...)) は引き続き受け入れられますが、 DeprecationWarning 出力され、将来のバージョンで削除されます。

グラフで一元性分析を実行します。

パラメーター:

  • participating_source_node_labels (省略可能[List[str]]):ソース ノード ラベル
  • participating_target_node_labels (省略可能[List[str]]): ターゲット ノード ラベル
  • participating_edge_labels (省略可能[List[str]]): 走査するエッジ ラベル
  • threshold (int): 最小中心性スコア (既定値: 3)、負以外にする必要があります
  • centrality_type (CentralityType): CentralityType.Node または CentralityType.Edge (既定値: NoneCentralityType.Nodeにフォールバック)
  • max_paths (int): 考慮する最大パス (既定値: 1000000; 0 = すべてのパス);負以外のパスにする必要があります
  • is_directional (bool): エッジが方向であるかどうか (既定値: True)
  • min_hop_count (int): 最小ホップ (既定値: 1)、≥ 1 にする必要があります
  • max_hop_count (int): 最大ホップ (既定値: 4);≥する必要があります min_hop_count
  • shortest_path (bool): 最短パスのみを返します (既定値: False)
  • max_results (int): 最大結果 (既定値: 500)、≥ 1 にする必要があります

発生 させます:

  • ValueError: threshold < 0max_paths < 0min_hop_count < 1max_hop_count < min_hop_count、または max_results < 1
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

返します:

  • QueryResult: 一元性メトリックを含む

例:

result = graph.centrality(
    participating_source_node_labels=["user", "device"],
    participating_target_node_labels=["device", "user"],
    participating_edge_labels=["sign_in"],
    is_directional=False)
result.show()

ranked

def ranked(
    *,
    rank_property_name: str = None,
    threshold: int = 0,
    max_paths: int = 1000000,
    decay_factor: float = 1,
    is_directional: bool = True,
    min_hop_count: int = 1,
    max_hop_count: int = 4,
    shortest_path: bool = False,
    max_results: int = 500
) -> QueryResult

注:

ranked(query_input=RankedQueryInput(...)) は引き続き受け入れられますが、 DeprecationWarning 出力され、将来のバージョンで削除されます。

グラフでランク付けされた分析を実行します。

パラメーター:

  • rank_property_name (str): ランク付けに使用するプロパティ名 (実行時に検証されます)。 query_input) を使用しない場合は指定する必要があります。
  • threshold (int): この重みを超える戻りパスのみ (既定値: 0)、負以外にする必要があります
  • max_paths (int): 考慮する最大パス (既定値: 1000000; 0 = すべてのパス);負以外のパスにする必要があります
  • decay_factor (float): ステップごとのランクの減衰。2 は半減を意味します (既定値: 1)。負以外にする必要があります
  • is_directional (bool): エッジが方向であるかどうか (既定値: True)
  • min_hop_count (int): 最小ホップ (既定値: 1)、≥ 1 にする必要があります
  • max_hop_count (int): 最大ホップ (既定値: 4);≥する必要があります min_hop_count
  • shortest_path (bool): 最短パスのみを返します (既定値: False)
  • max_results (int): 最大結果 (既定値: 500)、≥ 1 にする必要があります

発生 させます:

  • ValueError: rank_property_name が見つからない場合は、 threshold < 0max_paths < 0decay_factor < 0min_hop_count < 1max_hop_count < min_hop_count、または max_results < 1
  • RuntimeError: クライアントの初期化またはクエリの実行が失敗した場合

返します:

  • QueryResult: ランク付けされたノード/エッジを含む

例:

result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2)
result.show()

to_graphframe

def to_graphframe(column_mapping: Optional[Dict[str, str]] = None) -> GraphFrame

グラフ全体を GraphFrame に変換します。 使用可能な場合はスペック データを使用します。それ以外の場合は、レイク テーブルから読み取ります。

パラメーター:

  • column_mapping (Dict[str, str], optional): カスタム列マッピング

返します:

  • GraphFrame: すべての頂点とエッジを持つ GraphFrame オブジェクト

例:

gf = graph.to_graphframe()

show

def show() -> None

グラフ情報を表示します。 仕様がアタッチされている場合は、リッチ ディスプレイの spec.show() に委任します。それ以外の場合は最小限の情報を出力します。

show_schema

def show_schema() -> None

グラフ スキーマを表示します。 仕様がアタッチされたときに spec.show_schema() を委任します。それ以外の場合はスキーマが使用できないというメッセージを出力します。

publish (v0.3.3 の新機能)

def publish() -> Graph

グラフを API に登録し、クエリ可能にします。 これを Graph.prepare() (またはスペックがアタッチされている任意の Graph ) の後で呼び出して、グラフ インスタンスを発行します。

返します:

  • Graph: メソッドチェーンの自己

発生 させます:

  • ValueError: スペックがアタッチされていないか、コンテキストが見つからない場合
  • RuntimeError: 発行が失敗した場合

例:

graph = Graph.prepare(spec)
graph.publish()
# Now the graph is queryable
graph.query("MATCH (n) RETURN n")

BuildStatus

Graph.build()操作からのメタデータを保持するデータクラス。

フィールド

フィールド 種類 説明
etl_result Any prepare ステージからの結果 (データ パイプラインの実行)
api_result Optional[Dict] 発行ステージからの結果 (発行に失敗した場合None )
api_error Optional[str] 発行に失敗した場合のエラー メッセージ (発行が成功した場合None )
instance_name str グラフ インスタンスの名前
status Optional[BuildStatusKind] None"published"、または "prepared"

コンストラクション パス

GraphSpecBuilder.start(...).done()  →  GraphSpec             (spec only, no graph yet)
Graph.get(name, context)            →  Graph (spec=None, build_status=None)
Graph.prepare(spec)                 →  Graph (spec=spec, build_status.status="prepared")
graph.publish()                     →  Graph (build_status.status="published")
Graph.build(spec)                   →  Graph (prepare + publish in one step)

例:

graph = Graph.build(spec)
if graph.build_status.status == "published":
    print("Graph prepared and published successfully")
elif graph.build_status.status == "prepared":
    print(f"Prepare succeeded but publish failed: {graph.build_status.api_error}")
elif graph.build_status.status is None:
    print("Neither prepare nor publish has run")

ノード ビルダー

NodeBuilderInitial

ノード ビルダーの初期状態: 使用可能なデータ ソース メソッドのみ。

コンストラクター

NodeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

メモ: 通常、直接インスタンス化されるのではなく、 GraphSpecBuilder.add_node()を介して作成されます。

メソッド

注:

カスタム グラフでノード、エッジ、またはプロパティに名前を付ける場合に _ アンダースコアの使用はサポートされていません。 アンダースコアを使用すると、無効な要求エラーが返されます。

from_table
def from_table(table_name: str, database: Optional[str] = None) -> NodeBuilderSourceSet

インテリジェントなデータベース解決を使用して、テーブルをデータ ソースとして設定します。

パラメーター:

  • table_name (str): テーブルの名前 (必須)
  • database (str、省略可能): 明示的なデータベース名 (コンテキストの既定値よりも優先されます)

返します:

  • NodeBuilderSourceSet: 追加の構成のためのビルダー

発生 させます:

  • ValueError: テーブルが見つからない場合、または複数の競合するテーブルが見つかった場合

データベース解決順序:

  1. 明示的な database パラメーター (優先順位が最も高い)
  2. ExecutionContext.default_database
  3. すべてのデータベースを検索する (競合検出あり)

例:

builder.add_node("user").from_table("SigninLogs", database="security_db")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> NodeBuilderSourceSet

Spark DataFrame をデータ ソースとして設定します。

パラメーター:

  • dataframe (DataFrame): Spark DataFrame

返します:

  • NodeBuilderSourceSet: 追加の構成のためのビルダー

例:

df = spark.read.table("users")
builder.add_node("user").from_dataframe(df)

NodeBuilderSourceSet

データ ソースが設定された後のノード ビルダー: 構成メソッドを使用できます。

コンストラクター

NodeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

メモ: NodeBuilderInitial ソース メソッドによって内部的に作成されます。

メソッド

with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> NodeBuilderSourceSet

ノードのデータ ソースに時間範囲フィルター処理を適用します。

パラメーター:

  • time_column (str): タイムスタンプ データを含む列名 (必須)
  • start_time (str または datetime、省略可能): 開始日 ('10/20/25'、'2025-10-20'、または datetime オブジェクト)
  • end_time (str または datetime、省略可能): 終了日 (start_timeと同じ形式)
  • lookback_hours (float、省略可能): 今から振り返る時間

返します:

  • NodeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ソース スキーマに時刻列が見つからない場合

時間範囲ロジック:

  1. start_timeとend_timeが指定されている場合は、それらを直接使用します
  2. 指定lookback_hours場合: end=now、start=now-lookback_hours
  3. 何も指定されていない場合:時間フィルタリングなし
  4. start/end AND lookback_hours: start/end が優先される場合

例:

# Explicit date range
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", start_time="2025-01-01", end_time="2025-01-31")

# Lookback window
builder.add_node("user").from_table("SigninLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=24)
with_label
def with_label(label: str) -> NodeBuilderSourceSet

ノード ラベルを設定します (呼び出されない場合は、既定値は alias になります)。

パラメーター:

  • label (str): ノード ラベル

返します:

  • NodeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ラベルが既に設定されている場合

例:

builder.add_node("u").from_table("Users").with_label("user")
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> NodeBuilderSourceSet

必要なキーと表示の指定を使用して列を構成します。

パラメーター:

  • *columns (str): 含める列名 (少なくとも 1 つ必要)
  • key (str): キーとしてマークする列名 (必須、列に含める必要があります)
  • display (str): 表示値としてマークする列名 (必須、列に含める必要があります、キーと同じにすることができます)

返します:

  • NodeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: 検証が失敗した場合 (重複する列、キー/表示が見つからないなど)

注:

  • プロパティは列型から自動的に構築されます

  • 指定した場合、時間フィルター列が自動的に追加されます

  • プロパティ型は、ソース スキーマから自動的に推論されます

  • 「制限事項」を参照してください

例:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", "email", "created_at", key="id", display="name")
add_node
def add_node(alias: str) -> NodeBuilderInitial

このノードを終了し、別のノードの構築を開始します。

パラメーター:

  • alias (str): 新しいノードのエイリアス

返します:

  • NodeBuilderInitial: 新しいノード ビルダー

例:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_node("device")
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

このノードを終了し、エッジの構築を開始します。

パラメーター:

  • alias (str): エッジのエイリアス

返します:

  • EdgeBuilderInitial: 新しいエッジ ビルダー

例:

builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("accessed")
done
def done() -> GraphSpec

このノードを完成させ、グラフの仕様を完了します。

返します:

  • GraphSpec: 完全なグラフ仕様

例:

graph_spec = builder.add_node("user").from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .done()

エッジ ビルダー

EdgeBuilderInitial

エッジ ビルダーの初期状態: 使用可能なデータ ソース メソッドのみ。

コンストラクター

EdgeBuilderInitial(alias: str, graph_builder: GraphSpecBuilder)

メモ: 通常、直接インスタンス化されるのではなく、 GraphSpecBuilder.add_edge()を介して作成されます。

メソッド

注:

カスタム グラフでノード、エッジ、またはプロパティに名前を付ける場合に _ アンダースコアの使用はサポートされていません。 アンダースコアを使用すると、無効な要求エラーが返されます。

from_table
def from_table(table_name: str, database: Optional[str] = None) -> EdgeBuilderSourceSet

インテリジェントなデータベース解決を使用して、テーブルをデータ ソースとして設定します。

パラメーター:

  • table_name (str): テーブルの名前 (必須)
  • database (str、省略可能): 明示的なデータベース名

返します:

  • EdgeBuilderSourceSet: 追加の構成のためのビルダー

発生 させます:

  • ValueError: テーブルが見つからない場合、または複数の競合するテーブルが見つかった場合

例:

builder.add_edge("accessed").from_table("AccessLogs")
from_dataframe
def from_dataframe(dataframe: DataFrame) -> EdgeBuilderSourceSet

Spark DataFrame をデータ ソースとして設定します。

パラメーター:

  • dataframe (DataFrame): Spark DataFrame

返します:

  • EdgeBuilderSourceSet: 追加の構成のためのビルダー

例:

df = spark.read.table("access_logs")
builder.add_edge("accessed").from_dataframe(df)

EdgeBuilderSourceSet

データ ソースが設定された後のエッジ ビルダー: 使用可能な構成方法。

コンストラクター

EdgeBuilderSourceSet(alias: str, graph_builder: GraphSpecBuilder, source_step: DataInputETLStep)

メモ: EdgeBuilderInitial ソース メソッドによって内部的に作成されます。

メソッド

with_label
def with_label(label: str) -> EdgeBuilderSourceSet

エッジ リレーションシップの種類/ラベルを設定します (呼び出されていない場合、既定値はエイリアスになります)。

パラメーター:

  • label (str): エッジ ラベル

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ラベルが既に設定されている場合

例:

builder.add_edge("rel").from_table("AccessLogs").with_label("ACCESSED")
edge_label

注:

代わりに with_label() を使用します。 このメソッドは、将来のバージョンで削除されます。

def edge_label(label: str) -> EdgeBuilderSourceSet

エッジ リレーションシップの種類/ラベルを設定します (呼び出されていない場合、既定値はエイリアスになります)。

パラメーター:

  • label (str): エッジ ラベル

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ラベルが既に設定されている場合

例:

builder.add_edge("acc").from_table("AccessLogs").edge_label("accessed")
source
def source(id_column: str, node_type: str) -> EdgeBuilderSourceSet

ID 列とラベルを使用してソース ノードを設定します。

パラメーター:

  • id_column (str): ソース ノード ID を含む列名
  • node_type (str): ソース ノード ラベル

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ソースが既に設定されている場合

例:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user")
target
def target(id_column: str, node_type: str) -> EdgeBuilderSourceSet

ID 列とラベルを使用してターゲット ノードを設定します。

パラメーター:

  • id_column (str): ターゲット ノード ID を含む列名
  • node_type (str): ターゲット ノード ラベル

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ターゲットが既に設定されている場合

例:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device")
with_time_range
def with_time_range(
    time_column: str,
    start_time: Optional[Union[str, datetime]] = None,
    end_time: Optional[Union[str, datetime]] = None,
    lookback_hours: Optional[float] = None
) -> EdgeBuilderSourceSet

エッジのデータ ソースに時間範囲フィルター処理を適用します。

パラメーター:

  • time_column (str): タイムスタンプ データを含む列名 (必須)
  • start_time (str または datetime、省略可能): 開始日
  • end_time (str または datetime、省略可能): 終了日
  • lookback_hours (float、省略可能): 今から振り返る時間

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: ソース スキーマに時刻列が見つからない場合

例:

builder.add_edge("accessed").from_table("AccessLogs") \
    .with_time_range(time_column="TimeGenerated", lookback_hours=48)
with_columns
def with_columns(
    *columns: str,
    key: str,
    display: str
) -> EdgeBuilderSourceSet

必要なキーと表示の指定を使用して列を構成します。

パラメーター:

  • *columns (str): 含める列名 (少なくとも 1 つ必要)
  • key (str): キーとしてマークする列名 (必須、列に含める必要があります)
  • display (str): 表示値としてマークする列名 (必須、列に含める必要があります)

返します:

  • EdgeBuilderSourceSet: メソッドチェーンの自己

発生 させます:

  • ValueError: 検証が失敗した場合

例:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", "status", key="id", display="location")
add_node
def add_node(alias: str) -> NodeBuilderInitial

このエッジを終了し、ノードの構築を開始します。

パラメーター:

  • alias (str): 新しいノードのエイリアス

返します:

  • NodeBuilderInitial: 新しいノード ビルダー
add_edge
def add_edge(alias: str) -> EdgeBuilderInitial

このエッジを終了し、別のエッジの構築を開始します。

パラメーター:

  • alias (str): 新しいエッジのエイリアス

返します:

  • EdgeBuilderInitial: 新しいエッジ ビルダー

例:

builder.add_edge("accessed").from_table("AccessLogs") \
    .source(id_column="user_id", node_type="user") \
    .target(id_column="device_id", node_type="device") \
    .with_columns("id", "location", key="id", display="location") \
    .add_edge("connected_to")
done
def done() -> GraphSpec

このエッジを完成させ、グラフの仕様を完了します。

返します:

  • GraphSpec: 完全なグラフ仕様

スキーマ クラス

GraphDefinitionReference

名前とバージョンを含むグラフ定義への参照。

コンストラクター

GraphDefinitionReference(
    fully_qualified_name: str,
    version: str
)

パラメーター:

  • fully_qualified_name (str): 参照されるグラフの完全修飾名
  • version (str): 参照されるグラフのバージョン

発生 させます:

  • ValueError: fully_qualified_nameまたはバージョンが空の場合

メソッド

to_dict
def to_dict() -> Dict[str, Any]

ディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化された参照

プロパティ

型セーフ インターフェイスを使用したプロパティ定義。

コンストラクター

Property(
    name: str,
    property_type: PropertyType,
    is_non_null: bool = False,
    description: str = "",
    is_key: bool = False,
    is_display_value: bool = False,
    is_internal: bool = False
)

パラメーター:

  • name (str): プロパティ名
  • property_type (PropertyType): プロパティ データ型
  • is_non_null (bool、default=False): プロパティが必要かどうか
  • description (str, default=""): プロパティの説明
  • is_key (bool、default=False): プロパティがキーかどうか
  • is_display_value (bool、default=False): プロパティが表示値かどうか
  • is_internal (bool、default=False): プロパティが内部かどうか

発生 させます:

  • ValueError: 名前が空の場合、または検証が失敗した場合

クラス メソッド

key
@classmethod
Property.key(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

共通設定を使用してキー プロパティを作成します (is_key=True、is_display_value=True)。

display
@classmethod
Property.display(
    name: str,
    property_type: PropertyType,
    description: str = "",
    is_non_null: bool = False
) -> Property

表示値プロパティを作成します (is_display_value=True)。

メソッド

describe
def describe(text: str) -> Property

説明を fluently に追加します。

パラメーター:

  • text (str): 説明テキスト

返します:

  • Property: メソッドチェーンの自己
to_dict
def to_dict() -> Dict[str, Any]

@プレフィックス付き注釈キーを使用して、プロパティをディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化されたプロパティ
to_gql
def to_gql() -> str

GQL プロパティ定義を生成します。

返します:

  • str: GQL 文字列表現

EdgeNode

エッジ定義で使用されるノード参照。

コンストラクター

EdgeNode(
    alias: Optional[str] = None,
    labels: List[str] = []
)

パラメーター:

  • alias (str、省略可能): ノード エイリアス (None または空の場合は最初のラベルに自動設定)
  • labels (List[str]): ノード ラベル (少なくとも 1 つ必要)

発生 させます:

  • ValueError: ラベルリストが空の場合
  • TypeError: ラベルが文字列でない場合

自動ミューテーション:

  • alias が None または空の場合は、最初のラベルに設定されます

メソッド

to_dict
def to_dict() -> Dict[str, Any]

ディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化されたエッジ ノード参照

ノード

タイプ セーフ インターフェイスを持つノード定義。

コンストラクター

Node(
    alias: str = "",
    labels: List[str] = [],
    implies_labels: List[str] = [],
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_labels: bool = False,
    abstract_edge_aliases: bool = False
)

パラメーター:

  • alias (str, default=""): ノード エイリアス (空の場合は最初のラベルに自動設定)
  • labels (List[str]): ノード ラベル (少なくとも 1 つ必要)
  • implies_labels (List[str], default=[]): 暗黙的なラベル
  • properties (List[Property], default=[]): Node プロパティ
  • description (str, default=""): ノードの説明
  • entity_group (str, default=""): エンティティ グループ名
  • dynamic_labels (bool、default=False): ノードに動的ラベルがあるかどうか
  • abstract_edge_aliases (bool、default=False): ノードが抽象エッジ エイリアスを使用するかどうか

発生 させます:

  • ValueError: 検証が失敗した場合 (ラベルなし、キー プロパティなし、表示プロパティなしなど)

自動ミューテーション:

  • エイリアスが空の場合は、最初のラベルに設定されます
  • entity_groupが空の場合は、プライマリ ラベルに設定されます

メソッド

get_primary_label
def get_primary_label() -> Optional[str]

プライマリ (最初) ラベルを取得します。

返します:

  • str または None: プライマリ ラベル、ラベルがない場合はなし
get_entity_group_name
def get_entity_group_name() -> str

エンティティ グループ名またはプライマリ ラベルへのフォールバックを取得します。

返します:

  • str: エンティティ グループ名
get_primary_key_property_name
def get_primary_key_property_name() -> Optional[str]

主キー プロパティの名前を取得します。

返します:

  • str または None: 主キー プロパティ名
get_properties
def get_properties() -> Dict[str, Property]

簡単にアクセスできるように、ディクショナリとしてプロパティを取得します。

返します:

  • Dict[str, Property]: 名前でキー付けされたプロパティ
get_property
def get_property(name: str) -> Optional[Property]

名前で特定のプロパティを取得します。

パラメーター:

  • name (str): プロパティ名

返します:

  • Property または None: 見つかった場合はプロパティ
add_property
def add_property(prop: Property) -> None

このノードにプロパティを追加します。

パラメーター:

  • prop (プロパティ): 追加するプロパティ

発生 させます:

  • ValueError: プロパティ名が重複している場合
is_dynamically_labeled
def is_dynamically_labeled() -> bool

ノードに動的ラベルがあるかどうかを確認します。

返します:

  • bool: 動的ラベルが有効な場合は True
is_abstract_edge_node_aliases
def is_abstract_edge_node_aliases() -> bool

ノードで抽象エッジ ノードエイリアスが使用されているかどうかを確認します。

返します:

  • bool: 抽象エッジ エイリアスが有効な場合は True
describe
def describe(text: str) -> Node

説明を fluently に追加します。

パラメーター:

  • text (str): 説明テキスト

返します:

  • Node: メソッドチェーンの自己
to_dict
def to_dict() -> Dict[str, Any]

ノードをディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化されたノード
to_gql
def to_gql() -> str

GQL ノード定義を生成します。

返します:

  • str: GQL 文字列表現

発生 させます:

  • ValueError: ノードに GQL の必須フィールドがない場合

クラス メソッド

create
@classmethod
Node.create(
    alias: str,
    labels: List[str],
    properties: List[Property],
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Node

すべての必須フィールドを含むノードを作成します。

パラメーター:

  • alias (str): ノード エイリアス
  • labels (List[str]): ノード ラベル
  • properties (List[プロパティ]): Node プロパティ
  • description (str, default=""): ノードの説明
  • entity_group (str, default=""): エンティティ グループ名

返します:

  • Node: 新しいノード インスタンス

Edge

型セーフ インターフェイスを使用したエッジ定義。

コンストラクター

Edge(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    direction: EdgeDirection = EdgeDirection.DIRECTED_RIGHT,
    properties: List[Property] = [],
    description: str = "",
    entity_group: str = "",
    dynamic_type: bool = False
)

パラメーター:

  • relationship_type (str): Edge リレーションシップの種類 ("FOLLOWS"、"OWNS"など)
  • source_node_label (str): ソース ノード ラベル
  • target_node_label (str): ターゲット ノード ラベル
  • direction (EdgeDirection、default=DIRECTED_RIGHT): エッジ方向
  • properties (List[Property], default=[]): Edge プロパティ
  • description (str, default=""): エッジの説明
  • entity_group (str, default=""): エンティティ グループ名
  • dynamic_type (bool、default=False): エッジに動的な型があるかどうか

発生 させます:

  • ValueError: 検証が失敗した場合

自動ミューテーション:

  • labels list に自動的に設定されます。 [relationship_type]
  • entity_groupが空の場合は、relationship_typeに設定されます

プロパティ

edge_type
def edge_type() -> str

relationship_typeの下位互換性エイリアス。

返します:

  • str: リレーションシップの種類

メソッド

get_entity_group_name
def get_entity_group_name() -> str

エンティティ グループ名を取得するか、リレーションシップの種類にフォールバックします。

返します:

  • str: エンティティ グループ名
is_dynamic_type
def is_dynamic_type() -> bool

エッジに動的な型があるかどうかを確認します。

返します:

  • bool: 動的型の場合は True
add_property
def add_property(edge_property: Property) -> None

このエッジにプロパティを追加します。

パラメーター:

  • edge_property (プロパティ): 追加するプロパティ
describe
def describe(text: str) -> Edge

説明を fluently に追加します。

パラメーター:

  • text (str): 説明テキスト

返します:

  • Edge: メソッドチェーンの自己
to_dict
def to_dict() -> Dict[str, Any]

エッジをディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化されたエッジ
to_gql
def to_gql() -> str

GQL エッジ定義を生成します。

返します:

  • str: GQL 文字列表現

クラス メソッド

create
Edge.create(
    relationship_type: str,
    source_node_label: str,
    target_node_label: str,
    properties: List[Property] = None,
    description: str = "",
    entity_group: str = "",
    **kwargs
) -> Edge

すべての必須フィールドを含むエッジを作成します。

パラメーター:

  • relationship_type (str): エッジ リレーションシップの種類
  • source_node_label (str): ソース ノード ラベル
  • target_node_label (str): ターゲット ノード ラベル
  • properties (List[Property], optional): Edge プロパティ
  • description (str, default=""): エッジの説明
  • entity_group (str, default=""): エンティティ グループ名

返します:

  • Edge: 新しいエッジ インスタンス

GraphSchema

型セーフ インターフェイスを使用したグラフ スキーマ定義。

コンストラクター

GraphSchema(
    name: str,
    nodes: List[Node] = [],
    edges: List[Edge] = [],
    base_graphs: List[GraphSchema] = [],
    description: str = "",
    version: str = "1.0",
    fully_qualified_name: str = "",
    namespace: str = ""
)

パラメーター:

  • name (str): グラフ スキーマ名
  • nodes (List[Node], default=[]): ノード定義
  • edges (List[Edge], default=[]): エッジ定義
  • base_graphs (List[GraphSchema], default=[]): 基本グラフ スキーマ
  • description (str, default=""): スキーマの説明
  • version (str, default="1.0"): スキーマ バージョン
  • fully_qualified_name (str, default=""): 完全修飾名
  • namespace (str, default=""): 名前空間

発生 させます:

  • ValueError: 検証が失敗した場合 (重複するエイリアス、エッジが存在しないノードを参照するなど)

メソッド

get_fully_qualified_name
def get_fully_qualified_name() -> str

完全修飾名を取得します。

返します:

  • str: 完全修飾名
get_namespace
def get_namespace() -> str

完全修飾名から名前空間を取得するか、既定値を返します。

返します:

  • str:名前 空間
get_version
def get_version() -> str

バージョンを取得します。

返します:

  • str: バージョン文字列
get_node
def get_node(label_or_alias: str) -> Optional[Node]

ラベルまたはエイリアスでノードを取得します。

パラメーター:

  • label_or_alias (str): ノード ラベルまたはエイリアス

返します:

  • Node または None: 見つかった場合はノード
get_edge
def get_edge(name: str) -> Optional[Edge]

名前/型でエッジを取得します。

パラメーター:

  • name (str): エッジ リレーションシップの種類

返します:

  • Edge または None: 見つかった場合は Edge
add_node
def add_node(node: Node) -> None

このグラフにノードを追加します。

パラメーター:

  • node (ノード): 追加するノード

発生 させます:

  • ValueError: ノードエイリアスが重複している場合
add_edge
def add_edge(edge: Edge) -> None

このグラフにエッジを追加します。

パラメーター:

  • edge (Edge): 追加するエッジ

発生 させます:

  • ValueError: エッジの種類が重複している場合
include_graph
def include_graph(fully_qualified_name: str, version: str) -> GraphSchema

グラフ インクルード (fluent API) を追加します。

パラメーター:

  • fully_qualified_name (str): 含めるグラフの完全修飾名
  • version (str): 含めるグラフのバージョン

返します:

  • GraphSchema: メソッドチェーンの自己
get_included_graph_references
def get_included_graph_references() -> List[GraphDefinitionReference]

含まれているグラフ参照の一覧を取得します。

返します:

  • List[GraphDefinitionReference]: グラフ定義参照の一覧
describe
def describe(text: str) -> GraphSchema

説明を fluently に追加します。

パラメーター:

  • text (str): 説明テキスト

返します:

  • GraphSchema: メソッドチェーンの自己
to_dict
def to_dict() -> Dict[str, Any]

スキーマをディクショナリにシリアル化します。

返します:

  • Dict[str, Any]: シリアル化されたスキーマ
to_json
def to_json(indent: int = 2) -> str

JSON 表現を生成します。

パラメーター:

  • indent (int,default=2): JSON インデント レベル

返します:

  • str: JSON 文字列
to_gql
def to_gql() -> str

GQL スキーマ定義を生成します。

返します:

  • str: GQL 文字列表現

クラス メソッド

create
@classmethod
GraphSchema.create(
    name: str,
    nodes: List[Node] = None,
    edges: List[Edge] = None,
    description: str = "",
    version: str = "1.0",
    **kwargs
) -> GraphSchema

すべての必須フィールドを含むグラフ スキーマを作成します。

パラメーター:

  • name (str): グラフ スキーマ名
  • nodes (List[Node], optional): ノード定義
  • edges (List[Edge], optional): Edge 定義
  • description (str, default=""): スキーマの説明
  • version (str, default="1.0"): スキーマ バージョン

返します:

  • GraphSchema: 新しいグラフ スキーマ インスタンス

クエリ入力クラス

定義済みのグラフ クエリの入力パラメーターを表すデータクラス。

注:

QueryInput オブジェクトGraphクエリ メソッドに直接渡すことは非推奨となり、今後のバージョンでは削除される予定です。 代わりにキーワード (keyword)引数を使用します。 Graph メソッド (reachabilityk_hopblast_radiuscentralityranked) は、すべてのパラメーターをキーワード (keyword)引数として受け入れ、内部的に入力オブジェクトを構築します。 これらのクラスは今のところコードベースに残りますが、新しいコードでは使用しないでください。

QueryInputBase

すべてのクエリ入力パラメーターの基底クラス。

メソッド

to_json_payload
def to_json_payload() -> Dict[str, Any]

入力パラメーターを API 送信用のディクショナリに変換します。

返します:

  • Dict[str, Any]: 入力パラメーターのディクショナリ表現
validate
def validate() -> None

入力パラメーターを検証します。

発生 させます:

  • ValueError: 入力パラメーターが無効な場合

ReachabilityQueryInput

ソース ノードとターゲット ノード間の到達可能性クエリの入力パラメーター。 QueryInputBaseから継承するReachabilityQueryInputBaseから継承します。

フィールド

フィールド 種類 既定値 説明
source_property_value str (必須) source プロパティに一致する値
target_property_value str (必須) ターゲット プロパティに一致する値
source_property Optional[str] None ソース ノードをフィルター処理するプロパティ名
participating_source_node_labels Optional[List[str]] None ソース ノードとして考慮するノード ラベル
target_property Optional[str] None ターゲット ノードをフィルター処理するプロパティ名
participating_target_node_labels Optional[List[str]] None ターゲット ノードとして考慮するノード ラベル
participating_edge_labels Optional[List[str]] None パス内で走査するエッジ ラベル
is_directional Optional[bool] True エッジが方向であるかどうか
min_hop_count Optional[int] 1 パス内のホップの最小数
max_hop_count Optional[int] 4 パス内のホップの最大数
shortest_path Optional[bool] False 最短パスのみを検索するかどうか
max_results Optional[int] 500 返す結果の最大数

検証:

  • source_property_value が必要です
  • target_property_value が必要です

例:

# Preferred: keyword arguments (no import needed)
result = graph.reachability(
    source_property="UserId",
    source_property_value="user123",
    target_property="DeviceId",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"],
    shortest_path=True
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import ReachabilityQueryInput
result = graph.reachability(query_input=ReachabilityQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

K_HopQueryInput

特定のソース ノードからの k ホップ クエリの入力パラメーター。 ReachabilityQueryInputBaseから継承します。

ReachabilityQueryInputからすべてのフィールドを継承します。

検証:

  • 少なくとも 1 つの source_property_value または target_property_value を指定する必要があります

例:

# Preferred: keyword arguments
result = graph.k_hop(
    source_property_value="user123",
    max_hop_count=3,
    participating_edge_labels=["accessed"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import K_HopQueryInput
result = graph.k_hop(query_input=K_HopQueryInput(source_property_value="user123"))

BlastRadiusQueryInput

ソースノードからターゲット ノードへのブラスト半径クエリの入力パラメーター。 ReachabilityQueryInputBaseから継承します。

次の必須フィールドを使用して、 ReachabilityQueryInputからすべてのフィールドを継承します。

フィールド 種類 必須 説明
source_property_value str はい ソース ノードを識別する値
target_property_value str はい ターゲット ノードを識別する値

検証:

  • source_property_value が必要です
  • target_property_value が必要です

例:

# Preferred: keyword arguments
result = graph.blast_radius(
    source_property_value="user123",
    target_property_value="device456",
    participating_edge_labels=["accessed", "connected_to"]
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import BlastRadiusQueryInput
result = graph.blast_radius(query_input=BlastRadiusQueryInput(
    source_property_value="user123",
    target_property_value="device456"
))

CentralityQueryInput

中央集中型分析クエリの入力パラメーター。 QueryInputBaseから継承します。

CentralityType 列挙型

説明
CentralityType.Node コンピューティング ノードの一元性
CentralityType.Edge コンピューティング エッジの一元性

フィールド

フィールド 種類 既定値 説明
threshold Optional[int] 3 考慮する最小集中度スコア
centrality_type CentralityType CentralityType.Node コンピューティングする一元性の種類
max_paths Optional[int] 1000000 考慮する最大パス (0 = すべて)
participating_source_node_labels Optional[List[str]] None ソース ノード ラベル
participating_target_node_labels Optional[List[str]] None ターゲット ノード ラベル
participating_edge_labels Optional[List[str]] None 走査するエッジ ラベル
is_directional Optional[bool] True エッジが方向であるかどうか
min_hop_count Optional[int] 1 最小ホップ数
max_hop_count Optional[int] 4 最大ホップ数
shortest_path Optional[bool] False 最短パスのみ
max_results Optional[int] 500 最大結果

例:

# Preferred: keyword arguments (works for all centrality types)
result = graph.centrality(
    centrality_type=CentralityType.Edge,  # or CentralityType.Node (default)
    participating_edge_labels=["accessed", "connected_to"],
    threshold=5,
    max_results=100
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import CentralityQueryInput, CentralityType
result = graph.centrality(query_input=CentralityQueryInput(
    centrality_type=CentralityType.Edge,
    participating_edge_labels=["accessed"]
))

RankedQueryInput

ランク付けされた分析クエリの入力パラメーター。 QueryInputBaseから継承します。

フィールド

フィールド 種類 既定値 説明
rank_property_name str (必須) ランク付けパスに使用するプロパティ名
threshold Optional[int] 0 この値を超える重みを持つ戻りパスのみ
max_paths Optional[int] 1000000 考慮する最大パス (0 = すべて)
decay_factor Optional[float] 1 各グラフ ステップのランクの削減量 (2 = 各ステップの半分)
is_directional Optional[bool] True エッジが方向であるかどうか
min_hop_count Optional[int] 1 最小ホップ数
max_hop_count Optional[int] 4 最大ホップ数
shortest_path Optional[bool] False 最短パスのみ
max_results Optional[int] 500 最大結果

例:

# Preferred: keyword arguments
result = graph.ranked(
    rank_property_name="risk_score",
    threshold=5,
    decay_factor=2,
    max_results=50
)

# DEPRECATED — will be removed in a future version. Use keyword arguments above.
from sentinel_graph.builders.query_input import RankedQueryInput
result = graph.ranked(query_input=RankedQueryInput(
    rank_property_name="risk_score",
    threshold=5
))

クエリ結果

QueryResult

遅延 DataFrame アクセスを持つグラフ クエリの結果。

コンストラクター

QueryResult(raw_response: Dict[str, Any], graph: Graph)

パラメーター:

  • raw_response (Dict[str, Any]): Raw API 応答ディクショナリ
  • graph (グラフ): 親グラフへの参照

メモ: 通常、直接インスタンス化されるのではなく、 Graph.query()によって作成されます。

メソッド

to_dataframe
def to_dataframe() -> DataFrame

クエリ結果を Spark DataFrame に変換します。

返します:

  • DataFrame: Spark DataFrame としてのクエリ結果

発生 させます:

  • ValueError: 変換が失敗した場合

例:

result = graph.query("MATCH (u:user) RETURN u")
df = result.to_dataframe()
df.show()
get_raw_data
def get_raw_data() -> Dict[str, Any]

応答から RawData セクションを取得します。

返します:

  • Dict[str, Any]: 生のメタデータを含むディクショナリ、または存在しない場合は空の dict

例:

result = graph.query("MATCH (u:user) RETURN u")
metadata = result.get_raw_data()
show
def show(format: str = "visual") -> None

クエリ結果をさまざまな形式で表示します。

パラメーター:

  • format (str, default="visual"): 出力形式
    • "table": 完全な DataFrame テーブル (すべての列)
    • "visual": VSC プラグインを使用した対話型グラフの視覚化
    • "all": すべての形式を表示

発生 させます:

  • ValueError: フォーマットがサポートされている値の 1 つでない場合

例:

result = graph.query("MATCH (u:user)-[r:accessed]->(d:device) RETURN u, r, d")
result.show()  # Visual by default
result.show(format="table")  # Table format

# 0. Imports
from sentinel_graph import GraphSpecBuilder, Graph

# 1. Define graph specification
spec = (
    GraphSpecBuilder.start()
    
    .add_node("User")
        .from_dataframe(user_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "UserId", "UserDisplayName", "UserPrincipalName",
                "DistinctLocationCount", "DistinctIPCount", "DistinctAppCount",
                "TotalSignIns", "RiskySignInCount", "ImpossibleTravelFlag",
                key="UserId", display="UserDisplayName"
            )
    
    .add_node("IPAddress")
        .from_dataframe(ip_nodes)  # native Spark DF from groupBy → no .df
            .with_columns(
                "IPAddress", "UniqueUsers", "UniqueLocations",
                "SignInCount", "RiskySignInCount", "SharedIPFlag",
                key="IPAddress", display="IPAddress"
            )
    
    .add_edge("UsedIP")
        .from_dataframe(edge_used_ip)  # native Spark DF → no .df
            .source(id_column="UserId", node_type="User")
            .target(id_column="IPAddress", node_type="IPAddress")
            .with_columns(
                "SignInCount", "FirstSeen", "LastSeen", "EdgeKey",
                key="EdgeKey", display="EdgeKey"
            )
   
    .done()
)

# 2. Inspect schema before building (GraphSpec owns this)
spec.show_schema()

# 3. Build: prepares data + publishes graph → returns Graph
graph = Graph.build(spec)
print(f"Build status: {graph.build_status.status}")

# 4. Query the graph (query lives on Graph)
result = graph.query("MATCH (u:user)-[used:UsedIP]->(ip:IPAddress) RETURN * LIMIT 100")
result.show()

# 5. Access data via delegation
df = result.to_dataframe()
df.printSchema()

# 6. Graph algorithms
gf = graph.to_graphframe()
pagerank_result = gf.pageRank(resetProbability=0.15, maxIter=10)
pagerank_result.vertices.select("id", "pagerank").show()

# 7. Fetch an existing graph (no spec needed)
graph = Graph.get("my_existing_graph", context=context)
graph.query("MATCH (n) RETURN n LIMIT 10").show()

設計パターンに関する注意事項

Fluent API

すべてのビルダーは、読み取り可能な宣言型グラフ定義のメソッド チェーンをサポートします。

builder.add_node("user") \
    .from_table("Users") \
    .with_columns("id", "name", key="id", display="name") \
    .add_edge("follows")

共用体スキーマ

同じエイリアスを持つ複数のエッジが、マージされたプロパティと自動的に結合されます。

# Both edges use alias "sign_in" - they will be merged into one schema edge
builder.add_edge("sign_in") \
    .from_table("AzureSignins") \
    .source(id_column="UserId", node_type="AZuser") \
    .target(id_column="DeviceId", node_type="device")

builder.add_edge("sign_in") \
    .from_table("EntraSignins") \
    .source(id_column="UserId", node_type="EntraUser") \
    .target(id_column="DeviceId", node_type="device")

自動

多くのフィールドには、適切な既定値があります。

  • ノード/エッジ ラベルは既定でエイリアスに設定されます
  • プロパティはソース スキーマから自動推論されます
  • エンティティ グループは、既定でプライマリ ラベル/リレーションシップの種類に設定されます

遅延評価

DataFrame とリソースは、遅延して読み込まれ、キャッシュされます。

  • graph_spec.nodesgraph_spec.edges は最初のアクセス時に読み込まれます
  • クエリ結果は、要求された場合にのみ DataFrames を作成します