graflag.core

Core GraFlag functionality.

exception graflag.core.GraFlagError[source]

Bases: Exception

Custom exception for GraFlag errors.

class graflag.core.GraFlag(config_file: str = '.env')[source]

Bases: object

Main GraFlag orchestration class.

All public methods return structured data. No direct printing to stdout (except follow_logs which streams in real time).

setup()[source]

Setup GraFlag cluster: initialize swarm and setup workers.

status() ClusterInfo[source]

Get cluster status.

Returns:

ClusterInfo with nodes, services, and shared directory info.

run(method_name: str, dataset: str, tag: str = 'latest', build: bool = False, gpu: bool = True, method_params: dict = None) str[source]

Run experiment.

Returns:

Experiment name.

Raises:

GraFlagError – If run fails.

register_metric(result_type: str, metric_func: Callable, experiment: str = None)[source]

Register a custom metric as a plugin file on the cluster.

The function source is extracted via inspect.getsource and written to a .py plugin file that the evaluator loads at runtime.

Parameters:
  • result_type – Result type the metric applies to (e.g. "EDGE_STREAM_ANOMALY_SCORES").

  • metric_func – A function with signature (scores, ground_truth, **kwargs) -> Dict[str, float].

  • experiment – If given, the plugin is scoped to that experiment (custom_metrics/ inside the experiment directory). Otherwise it is saved to the global plugins directory.

Raises:

GraFlagError – If the function source cannot be extracted or the file cannot be written.

evaluate(experiment_name: str)[source]

Evaluate an experiment: compute metrics and generate plots.

Raises:

GraFlagError – If evaluation fails.

list_methods() List[MethodInfo][source]

List available methods.

Returns:

List of MethodInfo objects.

list_datasets() List[DatasetInfo][source]

List available datasets.

Returns:

List of DatasetInfo objects.

list_experiments(limit: int = 50) List[ExperimentInfo][source]

List recent experiments.

Returns:

List of ExperimentInfo (most recent first).

list_services() List[Dict][source]

List running Docker services.

Returns:

List of service dicts with name, replicas, image, status.

get_logs(experiment_name: str, tail: int = 100) List[str][source]

Get experiment logs (non-streaming).

Tries Docker service logs first, then falls back to method_output.txt.

Returns:

List of log lines.

follow_logs(experiment_name: str, tee_file: str = None)[source]

Follow logs for an experiment (streams to stdout).

Shows build log (if exists) + service logs. Falls back to method_output.txt if the service is gone.

show_logs(experiment_name: str, tee_file: str = None)[source]

Show logs (non-follow mode) — prints to stdout.

stop(experiment_name: str, remove: bool = False)[source]

Stop a running experiment/service.

Parameters:
  • experiment_name – Name of the experiment

  • remove – If True, also delete the experiment directory

get_experiment_results(experiment_name: str) ExperimentResults | None[source]

Get experiment results from results.json.

get_evaluation_results(experiment_name: str) EvaluationResults | None[source]

Get evaluation results from eval/evaluation.json.

copy_files(source_paths, dest_path: str, recursive: bool = False, from_remote: bool = False)[source]

Copy files/directories bidirectionally.

mount_nfs(shared_dir: str)[source]

Mount NFS share on local machine.

sync(local_path: str, is_lib: bool = False)[source]

Sync a local method or library directory to remote shared storage.