graflag package

GraFlag - Graph Anomaly Detection Benchmarking Tool

A tool for benchmarking Graph Anomaly Detection methods using Docker Swarm across multiple nodes with shared NFS storage.

class graflag.GraFlag(config_file: str = '.env')[source]

Bases: object

Main GraFlag orchestration class.

All public methods return structured data. No direct printing to stdout (except follow_logs which streams in real time).

setup()[source]

Setup GraFlag cluster: initialize swarm and setup workers.

status() ClusterInfo[source]

Get cluster status.

Returns:

ClusterInfo with nodes, services, and shared directory info.

run(method_name: str, dataset: str, tag: str = 'latest', build: bool = False, gpu: bool = True, method_params: dict = None) str[source]

Run experiment.

Returns:

Experiment name.

Raises:

GraFlagError – If run fails.

register_metric(result_type: str, metric_func: Callable, experiment: str = None)[source]

Register a custom metric as a plugin file on the cluster.

The function source is extracted via inspect.getsource and written to a .py plugin file that the evaluator loads at runtime.

Parameters:
  • result_type – Result type the metric applies to (e.g. "EDGE_STREAM_ANOMALY_SCORES").

  • metric_func – A function with signature (scores, ground_truth, **kwargs) -> Dict[str, float].

  • experiment – If given, the plugin is scoped to that experiment (custom_metrics/ inside the experiment directory). Otherwise it is saved to the global plugins directory.

Raises:

GraFlagError – If the function source cannot be extracted or the file cannot be written.

evaluate(experiment_name: str)[source]

Evaluate an experiment: compute metrics and generate plots.

Raises:

GraFlagError – If evaluation fails.

list_methods() List[MethodInfo][source]

List available methods.

Returns:

List of MethodInfo objects.

list_datasets() List[DatasetInfo][source]

List available datasets.

Returns:

List of DatasetInfo objects.

list_experiments(limit: int = 50) List[ExperimentInfo][source]

List recent experiments.

Returns:

List of ExperimentInfo (most recent first).

list_services() List[Dict][source]

List running Docker services.

Returns:

List of service dicts with name, replicas, image, status.

get_logs(experiment_name: str, tail: int = 100) List[str][source]

Get experiment logs (non-streaming).

Tries Docker service logs first, then falls back to method_output.txt.

Returns:

List of log lines.

follow_logs(experiment_name: str, tee_file: str = None)[source]

Follow logs for an experiment (streams to stdout).

Shows build log (if exists) + service logs. Falls back to method_output.txt if the service is gone.

show_logs(experiment_name: str, tee_file: str = None)[source]

Show logs (non-follow mode) — prints to stdout.

stop(experiment_name: str, remove: bool = False)[source]

Stop a running experiment/service.

Parameters:
  • experiment_name – Name of the experiment

  • remove – If True, also delete the experiment directory

get_experiment_results(experiment_name: str) ExperimentResults | None[source]

Get experiment results from results.json.

get_evaluation_results(experiment_name: str) EvaluationResults | None[source]

Get evaluation results from eval/evaluation.json.

copy_files(source_paths, dest_path: str, recursive: bool = False, from_remote: bool = False)[source]

Copy files/directories bidirectionally.

mount_nfs(shared_dir: str)[source]

Mount NFS share on local machine.

sync(local_path: str, is_lib: bool = False)[source]

Sync a local method or library directory to remote shared storage.

exception graflag.GraFlagError[source]

Bases: Exception

Custom exception for GraFlag errors.

class graflag.GraflagConfig(config_file: str = '.env')[source]

Bases: object

Handle configuration loading and validation for GraFlag.

get(key: str, default: str | None = None) str | None[source]

Get configuration value.

property remote_shared_dir: str
property manager_ip: str
property ssh_port: str
property ssh_key: str | None
property nfs_port: str
property hosts_file: str | None
class graflag.ClusterInfo(manager_ip: str, is_connected: bool, swarm_initialized: bool, worker_nodes: Dict[str, str]]=<factory>, shared_dir: str = '', shared_contents: List[str] = <factory>, services: List[Dict] = <factory>, error: str | None = None)[source]

Bases: object

Cluster status information.

manager_ip: str
is_connected: bool
swarm_initialized: bool
worker_nodes: List[Dict[str, str]]
shared_dir: str = ''
shared_contents: List[str]
services: List[Dict]
error: str | None = None
to_dict() dict[source]
class graflag.MethodInfo(name: str, description: str = '', source_code: str = '', supported_data: str = '', parameters: Dict[str, ~typing.Any]=<factory>, has_dockerfile: bool = False, has_env: bool = False)[source]

Bases: object

Method metadata.

name: str
description: str = ''
source_code: str = ''
supported_data: str = ''
parameters: Dict[str, Any]
has_dockerfile: bool = False
has_env: bool = False
to_dict() dict[source]
class graflag.DatasetInfo(name: str, path: str = '', size_mb: float = 0.0, file_count: int = 0)[source]

Bases: object

Dataset metadata.

name: str
path: str = ''
size_mb: float = 0.0
file_count: int = 0
to_dict() dict[source]
class graflag.ExperimentInfo(name: str, method: str, dataset: str, timestamp: str, status: str, has_results: bool = False, has_evaluation: bool = False, results_path: str | None = None, evaluation_path: str | None = None, service_name: str | None = None)[source]

Bases: object

Experiment metadata and status.

name: str
method: str
dataset: str
timestamp: str
status: str
has_results: bool = False
has_evaluation: bool = False
results_path: str | None = None
evaluation_path: str | None = None
service_name: str | None = None
to_dict() dict[source]
class graflag.ExperimentResults(experiment_name: str, method_name: str, dataset: str, metadata: Dict[str, ~typing.Any]=<factory>, execution_time_ms: float | None = None, peak_memory_mb: float | None = None, peak_gpu_memory_mb: float | None = None, result_type: str | None = None, scores_available: bool = False)[source]

Bases: object

Parsed experiment results.

experiment_name: str
method_name: str
dataset: str
metadata: Dict[str, Any]
execution_time_ms: float | None = None
peak_memory_mb: float | None = None
peak_gpu_memory_mb: float | None = None
result_type: str | None = None
scores_available: bool = False
to_dict() dict[source]
class graflag.EvaluationResults(experiment_name: str, metrics: Dict[str, float]=<factory>, plots_available: List[str] = <factory>, evaluation_path: str | None = None)[source]

Bases: object

Parsed evaluation results.

experiment_name: str
metrics: Dict[str, float]
plots_available: List[str]
evaluation_path: str | None = None
to_dict() dict[source]
class graflag.RunProgress(experiment_name: str, status: str, message: str = '', log_lines: List[str] = <factory>)[source]

Bases: object

Progress information for run execution.

experiment_name: str
status: str
message: str = ''
log_lines: List[str]
to_dict() dict[source]