graflag package
GraFlag - Graph Anomaly Detection Benchmarking Tool
A tool for benchmarking Graph Anomaly Detection methods using Docker Swarm across multiple nodes with shared NFS storage.
- class graflag.GraFlag(config_file: str = '.env')[source]
Bases:
objectMain GraFlag orchestration class.
All public methods return structured data. No direct printing to stdout (except follow_logs which streams in real time).
- status() ClusterInfo[source]
Get cluster status.
- Returns:
ClusterInfo with nodes, services, and shared directory info.
- run(method_name: str, dataset: str, tag: str = 'latest', build: bool = False, gpu: bool = True, method_params: dict = None) str[source]
Run experiment.
- Returns:
Experiment name.
- Raises:
GraFlagError – If run fails.
- register_metric(result_type: str, metric_func: Callable, experiment: str = None)[source]
Register a custom metric as a plugin file on the cluster.
The function source is extracted via
inspect.getsourceand written to a.pyplugin file that the evaluator loads at runtime.- Parameters:
result_type – Result type the metric applies to (e.g.
"EDGE_STREAM_ANOMALY_SCORES").metric_func – A function with signature
(scores, ground_truth, **kwargs) -> Dict[str, float].experiment – If given, the plugin is scoped to that experiment (
custom_metrics/inside the experiment directory). Otherwise it is saved to the global plugins directory.
- Raises:
GraFlagError – If the function source cannot be extracted or the file cannot be written.
- evaluate(experiment_name: str)[source]
Evaluate an experiment: compute metrics and generate plots.
- Raises:
GraFlagError – If evaluation fails.
- list_methods() List[MethodInfo][source]
List available methods.
- Returns:
List of MethodInfo objects.
- list_datasets() List[DatasetInfo][source]
List available datasets.
- Returns:
List of DatasetInfo objects.
- list_experiments(limit: int = 50) List[ExperimentInfo][source]
List recent experiments.
- Returns:
List of ExperimentInfo (most recent first).
- list_services() List[Dict][source]
List running Docker services.
- Returns:
List of service dicts with name, replicas, image, status.
- get_logs(experiment_name: str, tail: int = 100) List[str][source]
Get experiment logs (non-streaming).
Tries Docker service logs first, then falls back to method_output.txt.
- Returns:
List of log lines.
- follow_logs(experiment_name: str, tee_file: str = None)[source]
Follow logs for an experiment (streams to stdout).
Shows build log (if exists) + service logs. Falls back to method_output.txt if the service is gone.
- show_logs(experiment_name: str, tee_file: str = None)[source]
Show logs (non-follow mode) — prints to stdout.
- stop(experiment_name: str, remove: bool = False)[source]
Stop a running experiment/service.
- Parameters:
experiment_name – Name of the experiment
remove – If True, also delete the experiment directory
- get_experiment_results(experiment_name: str) ExperimentResults | None[source]
Get experiment results from results.json.
- get_evaluation_results(experiment_name: str) EvaluationResults | None[source]
Get evaluation results from eval/evaluation.json.
- class graflag.GraflagConfig(config_file: str = '.env')[source]
Bases:
objectHandle configuration loading and validation for GraFlag.
- property manager_ip: str
- property ssh_port: str
- property ssh_key: str | None
- property nfs_port: str
- property hosts_file: str | None
- class graflag.ClusterInfo(manager_ip: str, is_connected: bool, swarm_initialized: bool, worker_nodes: Dict[str, str]]=<factory>, shared_dir: str = '', shared_contents: List[str] = <factory>, services: List[Dict] = <factory>, error: str | None = None)[source]
Bases:
objectCluster status information.
- manager_ip: str
- is_connected: bool
- swarm_initialized: bool
- worker_nodes: List[Dict[str, str]]
- services: List[Dict]
- error: str | None = None
- class graflag.MethodInfo(name: str, description: str = '', source_code: str = '', supported_data: str = '', parameters: Dict[str, ~typing.Any]=<factory>, has_dockerfile: bool = False, has_env: bool = False)[source]
Bases:
objectMethod metadata.
- name: str
- description: str = ''
- source_code: str = ''
- supported_data: str = ''
- parameters: Dict[str, Any]
- has_dockerfile: bool = False
- has_env: bool = False
- class graflag.DatasetInfo(name: str, path: str = '', size_mb: float = 0.0, file_count: int = 0)[source]
Bases:
objectDataset metadata.
- name: str
- path: str = ''
- size_mb: float = 0.0
- file_count: int = 0
- class graflag.ExperimentInfo(name: str, method: str, dataset: str, timestamp: str, status: str, has_results: bool = False, has_evaluation: bool = False, results_path: str | None = None, evaluation_path: str | None = None, service_name: str | None = None)[source]
Bases:
objectExperiment metadata and status.
- name: str
- method: str
- dataset: str
- timestamp: str
- status: str
- has_results: bool = False
- has_evaluation: bool = False
- results_path: str | None = None
- evaluation_path: str | None = None
- service_name: str | None = None
- class graflag.ExperimentResults(experiment_name: str, method_name: str, dataset: str, metadata: Dict[str, ~typing.Any]=<factory>, execution_time_ms: float | None = None, peak_memory_mb: float | None = None, peak_gpu_memory_mb: float | None = None, result_type: str | None = None, scores_available: bool = False)[source]
Bases:
objectParsed experiment results.
- experiment_name: str
- method_name: str
- dataset: str
- metadata: Dict[str, Any]
- execution_time_ms: float | None = None
- peak_memory_mb: float | None = None
- peak_gpu_memory_mb: float | None = None
- result_type: str | None = None
- scores_available: bool = False
- class graflag.EvaluationResults(experiment_name: str, metrics: Dict[str, float]=<factory>, plots_available: List[str] = <factory>, evaluation_path: str | None = None)[source]
Bases:
objectParsed evaluation results.
- experiment_name: str
- metrics: Dict[str, float]
- plots_available: List[str]
- evaluation_path: str | None = None