GraFlag Method Integration Guide

Complete guide for integrating new graph anomaly detection methods into GraFlag.


Overview

GraFlag supports two integration patterns:

  • Pattern A (--pass-env-args): For methods with their own training script using argparse. The runner converts _-prefixed env vars to CLI arguments.

  • Pattern B (direct env): For library-based methods (e.g., PyGOD via graflag_bond). The method reads env vars directly.

Integrating a new method involves creating a standardized directory structure with 2-3 key files:

  1. .env - Method configuration and parameters (required)

  2. Dockerfile - Container environment setup (required)

  3. train_graflag.py - GraFlag integration wrapper (Pattern A only)


Directory Structure

methods/
+-- your_method_name/            # Lowercase, alphanumeric + underscore
    +-- .env                     # Method configuration (REQUIRED)
    +-- Dockerfile               # Container setup (REQUIRED)
    +-- train_graflag.py         # Integration wrapper (Pattern A)
    +-- src/                     # Original source code (optional)
    +-- *.py                     # Additional helper files (optional)

Example: Pattern A method (generaldyg)

methods/generaldyg/
+-- .env                         # Configuration with _* parameters
+-- Dockerfile                   # CUDA 12.1 + PyTorch 2.1.2
+-- train_graflag.py             # Training wrapper with ResultWriter
+-- dataset_all.py               # Helper for loading full dataset
+-- src/                         # Cloned from GitHub at build time

Example: Pattern B method (bond_dominant)

methods/bond_dominant/
+-- .env                         # Configuration with _* parameters
+-- Dockerfile                   # CUDA + PyGOD + graflag_bond

Step 1: Create .env Configuration File

The .env file defines method metadata and configurable parameters.

Template (Pattern A)

METHOD_NAME=your_method_name
DESCRIPTION=Brief description of the method
SOURCE_CODE=https://github.com/author/repo
SUPPORTED_DATASETS=dataset1,dataset2

COMMAND=python3 train_graflag.py

# Method-specific parameters (prefix with underscore)
# These will be:
# 1. Available as environment variables in container
# 2. Auto-extracted to CLI args if --pass-env-args is used
# 3. Listed in GUI for user configuration

_BATCH_SIZE=128
_N_EPOCHS=200
_LEARNING_RATE=0.0001
_HIDDEN_DIM=256
_DROPOUT=0.4
_SEED=42

Template (Pattern B – bond_* methods)

METHOD_NAME=bond_dominant
DESCRIPTION=Deep Anomaly Detection on Attributed Networks
SOURCE_CODE=https://github.com/pygod-team/pygod
SUPPORTED_DATASETS=bond_*

COMMAND=python3 -m graflag_bond.train

_HID_DIM=64
_NUM_LAYERS=4
_DROPOUT=0
_WEIGHT_DECAY=0
_LR=0.004
_EPOCH=100
_GPU=0
_BATCH_SIZE=0

Key Fields

Field

Required

Description

Example

METHOD_NAME

Yes

Unique method identifier (lowercase)

generaldyg

DESCRIPTION

Yes

Short description

A Generalizable Anomaly Detection Method

SOURCE_CODE

Yes

GitHub repo or paper link

https://github.com/...

COMMAND

Yes

Entry point command

python3 train_graflag.py

SUPPORTED_DATASETS

No

Compatible datasets (comma-separated, wildcards ok)

bond_*

_PARAMETER

No

User-configurable parameters (prefix with _)

_BATCH_SIZE=128

Parameter Naming Convention

  • Prefix with _: All configurable parameters must start with underscore

  • Uppercase: Use uppercase for consistency: _LEARNING_RATE, _BATCH_SIZE

  • Reserved names: These are set by the orchestrator and cannot be overridden: DATA, EXP, METHOD_NAME, COMMAND, MONITOR_INTERVAL


Step 2: Create Dockerfile

The Dockerfile defines the containerized execution environment.

Template (Pattern A)

FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

# Install Python and system dependencies
RUN apt-get update && apt-get install -y \
    python3 python3-pip git \
    && rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir --upgrade pip

# Install Python dependencies specific to your method
RUN pip install --no-cache-dir \
    numpy scipy scikit-learn networkx pandas tqdm

# Install PyTorch with CUDA support (adjust version as needed)
RUN pip install --no-cache-dir \
    torch torchvision torchaudio \
    --index-url https://download.pytorch.org/whl/cu121

# Install PyTorch Geometric (if needed)
# RUN pip install --no-cache-dir torch-geometric
# RUN pip install --no-cache-dir \
#     torch-scatter torch-sparse \
#     -f https://data.pyg.org/whl/torch-2.1.0+cu121.html

WORKDIR /app

# Clone source code from GitHub
RUN git clone https://github.com/your/repo src

# Copy GraFlag integration files
COPY methods/your_method/train_graflag.py ./train_graflag.py

# Copy and install graflag_runner library
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner

# Entry point: --pass-env-args converts _* env vars to CLI args
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]

Template (Pattern B – bond_* methods)

FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    python3 python3-pip git \
    && rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir --upgrade pip

RUN pip install --no-cache-dir \
    torch torchvision torchaudio \
    --index-url https://download.pytorch.org/whl/cu121

RUN pip install --no-cache-dir torch-geometric pygod

WORKDIR /app

# Install GraFlag libraries (runner + bond wrapper)
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner
RUN pip install --no-cache-dir ./libs/graflag_bond

# No --pass-env-args: graflag_bond reads env vars directly
CMD ["python3", "-m", "graflag_runner"]

Key Components

  1. Base Image: Choose appropriate CUDA version for your method’s requirements

    • nvidia/cuda:12.1.0-runtime-ubuntu22.04 (CUDA 12.1, newer methods)

    • nvidia/cuda:11.1.1-runtime-ubuntu20.04 (CUDA 11.1, older methods)

  2. Dependencies: Install all required Python packages

  3. Source Code: Either clone from GitHub or copy local files

  4. GraFlag Libraries: Always copy and install graflag_runner

    COPY libs/ ./libs/
    RUN pip install --no-cache-dir ./libs/graflag_runner
    
  5. Entry Point: Use graflag_runner wrapper for automatic monitoring

    • Pattern A: CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]

    • Pattern B: CMD ["python3", "-m", "graflag_runner"]

Note: The build context is the entire graflag-shared/ directory, so COPY libs/ and COPY methods/ paths work relative to it.


Step 3: Create Training Wrapper (train_graflag.py)

This is only needed for Pattern A methods. Pattern B methods use graflag_bond.train directly.

Core Requirements

  1. Import GraFlag utilities

    from graflag_runner import ResultWriter, info, warning, error
    
  2. Access environment variables

    import os
    data_dir = os.environ.get("DATA")      # Input dataset path
    exp_dir = os.environ.get("EXP")        # Output directory
    method_name = os.environ.get("METHOD_NAME")
    
  3. Initialize ResultWriter

    writer = ResultWriter()
    
  4. Add metadata (optional but recommended)

    writer.add_metadata(
        method_name="your_method",
        dataset=dataset_name,
        learning_rate=0.001,
        epochs=100
    )
    
  5. Save results using standardized format

    writer.save_scores(
        result_type="NODE_ANOMALY_SCORES",  # or TEMPORAL_*, EDGE_*, STREAM_*
        scores=anomaly_scores,              # List or numpy array
        ground_truth=labels                 # Always include ground truth
    )
    writer.finalize()
    
  6. Track training progress (optional)

    writer.spot("training", epoch=i, loss=loss, time_sec=epoch_time)
    

Result Types

Choose the appropriate result type for your method:

Result Type

Description

Data Format

NODE_ANOMALY_SCORES

Static graph, node-level

1D array: [0.1, 0.2, ...]

EDGE_ANOMALY_SCORES

Static graph, edge-level

1D array per edge

GRAPH_ANOMALY_SCORES

Graph classification

1D array per graph

TEMPORAL_NODE_ANOMALY_SCORES

Dynamic graphs, node snapshots

2D array: [[t0_scores], [t1_scores]]

TEMPORAL_EDGE_ANOMALY_SCORES

Dynamic graphs, edge snapshots

2D array per timestamp

TEMPORAL_GRAPH_ANOMALY_SCORES

Temporal graph classification

2D array

NODE_STREAM_ANOMALY_SCORES

Streaming nodes

1D array with timestamps

EDGE_STREAM_ANOMALY_SCORES

Streaming edges

1D array with timestamps

GRAPH_STREAM_ANOMALY_SCORES

Streaming graphs

1D array with timestamps

Template

"""
GraFlag-integrated training script for YourMethod.
"""
import sys
import os
import time
from pathlib import Path

# Add your method's source to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))

import numpy as np
# Import your method's classes/functions
# from your_module import YourModel

# GraFlag integration
from graflag_runner import ResultWriter, info, warning, error


def load_dataset():
    """Load and preprocess dataset from DATA environment variable."""
    data_dir = Path(os.environ.get("DATA"))
    info(f"Loading dataset from: {data_dir}")

    # Load your data format
    # Example: edges, features, labels = load_data(data_dir)

    return data


def train_model(data, config):
    """Train the model and return predictions."""
    info(f"Training {config['method_name']}...")

    # Initialize your model
    model = YourModel(**config)

    # Train
    start_time = time.time()
    model.fit(data)
    training_time = time.time() - start_time

    info(f"Training completed in {training_time:.2f}s")

    # Get predictions
    anomaly_scores = model.predict(data)

    return anomaly_scores, training_time


def main():
    """Main execution function."""
    # Get environment variables
    data_dir = os.environ.get("DATA")
    method_name = os.environ.get("METHOD_NAME")

    info("=" * 60)
    info(f"{method_name.upper()} Training")
    info("=" * 60)
    info(f"Data: {data_dir}")

    # Initialize ResultWriter
    writer = ResultWriter()

    # Load dataset
    data = load_dataset()

    # Configuration from environment variables
    config = {
        'method_name': method_name,
        'learning_rate': float(os.environ.get("_LEARNING_RATE", "0.001")),
        'epochs': int(os.environ.get("_EPOCHS", "100")),
        # Add more parameters as needed
    }

    # Add metadata
    writer.add_metadata(**config)

    # Train model
    anomaly_scores, training_time = train_model(data, config)

    # Save results
    info("Saving results...")
    writer.save_scores(
        result_type="NODE_ANOMALY_SCORES",  # Choose appropriate type
        scores=anomaly_scores.tolist(),
        ground_truth=labels.tolist()        # Always include ground truth
    )

    # Track training metrics
    writer.spot("training", time_sec=training_time)

    # Finalize
    writer.finalize()

    info("=" * 60)
    info(f"{method_name.upper()} completed successfully!")
    info("=" * 60)


if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        error(f"Execution failed: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)

Step 4: Test Integration Locally

Before deploying to the cluster, test your integration:

1. Build Docker Image

cd /path/to/graflag-shared
docker build -f methods/your_method/Dockerfile -t your_method:latest .

2. Test Run Locally

docker run --rm \
  -v $(pwd):/shared \
  -e DATA=/shared/datasets/your_dataset \
  -e EXP=/shared/experiments/test_exp \
  -e METHOD_NAME=your_method \
  -e COMMAND="python3 train_graflag.py" \
  -e _BATCH_SIZE=64 \
  -e _EPOCHS=10 \
  your_method:latest

3. Verify Output

Check that results.json is created correctly:

cat experiments/test_exp/results.json

Expected structure:

{
  "result_type": "NODE_ANOMALY_SCORES",
  "scores": [0.1, 0.2, 0.3],
  "ground_truth": [0, 0, 1],
  "metadata": {
    "method_name": "your_method"
  }
}

Step 5: Deploy to GraFlag Cluster

Once tested, deploy to the GraFlag cluster:

1. Sync Method to Shared Directory

graflag sync --path methods/your_method

2. Build and Run

graflag run -m your_method -d your_dataset --build

3. Run with Custom Parameters

graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64

4. Monitor and Evaluate

# Follow logs in real-time
graflag logs -e exp__your_method__your_dataset__TIMESTAMP -f

# Evaluate results
graflag evaluate -e exp__your_method__your_dataset__TIMESTAMP

Advanced Features

1. Streaming Large Results

For methods producing massive datasets, use streaming to avoid memory issues:

from graflag_runner import ResultWriter, StreamableArray

def generate_scores():
    """Generator that yields scores incrementally."""
    for batch in large_dataset:
        scores = model.predict(batch)
        yield scores

writer = ResultWriter()
writer.save_scores(
    result_type="NODE_ANOMALY_SCORES",
    scores=StreamableArray(generate_scores())  # Wrap generator
)
writer.finalize()

2. Progress Tracking with spot()

Track arbitrary metrics during execution:

# Training metrics (creates training.csv)
writer.spot("training", epoch=i, loss=loss, accuracy=acc, time_sec=t)

# Validation metrics (creates validation.csv)
writer.spot("validation", epoch=i, val_loss=val_loss, val_auc=auc)

# Custom metrics (creates preprocessing.csv)
writer.spot("preprocessing", num_nodes=n, num_edges=e)

Schema is locked after the first call – subsequent calls must provide the same metric keys.

3. Environment Variable Extraction

Use --pass-env-args flag to automatically convert _* env vars to CLI arguments:

# In Dockerfile
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]

This converts:

  • _BATCH_SIZE=128 -> --batch_size 128

  • _LEARNING_RATE=0.001 -> --learning_rate 0.001

Parameter names are lowercased by the runner. If the original method uses mixed-case arguments, add lowercase aliases:

parser.add_argument('--lr_g', '--lr_G', type=float, default=0.0001)

Troubleshooting

Common Issues

1. Import Errors

ModuleNotFoundError: No module named 'your_module'

Solution: Ensure sys.path.insert(0, "src") is before imports, or install package in Dockerfile

2. CUDA/GPU Issues

RuntimeError: CUDA out of memory

Solution: Reduce batch size, use _BATCH_SIZE parameter, or disable GPU with --no-gpu flag

3. Dataset Not Found

FileNotFoundError: Dataset not found

Solution: Check dataset name matches directory in datasets/, use lowercase

4. Results Not Saving

results.json empty or missing

Solution: Ensure writer.finalize() is called, check permissions on experiment directory

5. Container Crashes Silently Solution: Check logs with:

graflag logs -e exp__your_method__dataset__TIMESTAMP -f

6. Boolean Argument Errors

unrecognized arguments: True

Solution: Use str2bool helper instead of action='store_true':

def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1', ''):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
        return False
    raise argparse.ArgumentTypeError('Boolean value expected.')

parser.add_argument('--flag', type=str2bool, nargs='?', const=True, default=False)

Best Practices

  1. Version Control: Pin all dependency versions in Dockerfile

  2. Logging: Use info(), warning(), error() from graflag_runner for consistent logging

  3. Resource Tracking: Let graflag_runner handle monitoring, don’t implement custom tracking

  4. Error Handling: Wrap main() in try-except and call sys.exit(1) on failure

  5. Testing: Always test locally before deploying to cluster

  6. Reproducibility: Set random seeds, include in metadata

  7. Ground Truth: Always include ground_truth in save_scores() for evaluation to work


Quick Reference

Environment Variables (Automatically Set)

Variable

Description

Example

DATA

Input dataset directory

/shared/datasets/uci

EXP

Experiment output directory

/shared/experiments/exp__...

METHOD_NAME

Method identifier

generaldyg

COMMAND

Command from .env

python3 train_graflag.py

graflag_runner API

from graflag_runner import ResultWriter, info, warning, error

# Logging
info("Message")
warning("Warning message")
error("Error message")

# Result writing
writer = ResultWriter()
writer.save_scores(result_type="...", scores=[...], ground_truth=[...])
writer.add_metadata(method_name="...", dataset="...", ...)
writer.add_resource_metrics(exec_time_ms=1234.5, peak_memory_mb=512.3, peak_gpu_mb=2048.0)
writer.spot("training", epoch=1, loss=0.5, auc=0.85)
writer.finalize()

CLI Commands

# Build and run
graflag run -m your_method -d your_dataset --build

# Run with custom parameters
graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64

# Replay from saved config
graflag run --from-config service_config.json

# Check logs
graflag logs -e exp__your_method__dataset__timestamp -f

# Stop running experiment
graflag stop -e exp__your_method__dataset__timestamp

# Evaluate results
graflag evaluate -e exp__your_method__dataset__timestamp

Complete Checklist

  • [ ] Created methods/your_method/ directory

  • [ ] Created .env with METHOD_NAME, DESCRIPTION, SOURCE_CODE, COMMAND

  • [ ] Added SUPPORTED_DATASETS if applicable

  • [ ] Added configurable parameters with _ prefix

  • [ ] Created Dockerfile with correct base image and dependencies

  • [ ] Installed graflag_runner library in Dockerfile

  • [ ] Created train_graflag.py integration wrapper (Pattern A) or installed graflag_bond (Pattern B)

  • [ ] Loaded data from DATA environment variable

  • [ ] Saved results with appropriate result_type and ground_truth

  • [ ] Called writer.finalize()

  • [ ] Tested locally with Docker

  • [ ] Verified results.json format

  • [ ] Deployed to cluster and tested full pipeline

  • [ ] Evaluated results with graflag evaluate


Example: Complete Integration (Minimal, Pattern A)

File: methods/mymethod/.env

METHOD_NAME=mymethod
DESCRIPTION=My Custom Graph Anomaly Detector
SOURCE_CODE=https://github.com/me/mymethod

COMMAND=python3 train_graflag.py

_LEARNING_RATE=0.001
_EPOCHS=100

File: methods/mymethod/Dockerfile

FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y python3 python3-pip git && \
    rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir numpy scipy scikit-learn torch

WORKDIR /app
COPY methods/mymethod/train_graflag.py ./train_graflag.py
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner

CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]

File: methods/mymethod/train_graflag.py

import os
import numpy as np
from pathlib import Path
from graflag_runner import ResultWriter, info

def main():
    data_dir = Path(os.environ.get("DATA"))
    writer = ResultWriter()

    info("Computing anomaly scores...")
    scores = np.random.rand(100).tolist()
    labels = np.random.randint(0, 2, 100).tolist()

    writer.save_scores(
        result_type="NODE_ANOMALY_SCORES",
        scores=scores,
        ground_truth=labels
    )
    writer.finalize()
    info("[OK] Done!")

if __name__ == "__main__":
    main()

Test:

docker build -f methods/mymethod/Dockerfile -t mymethod:latest .
docker run --rm -v $(pwd):/shared \
  -e DATA=/shared/datasets/test \
  -e EXP=/shared/experiments/test \
  -e METHOD_NAME=mymethod \
  -e COMMAND="python3 train_graflag.py" \
  mymethod:latest

Resources

  • Result Types Reference: See RESULTS_STANDARD

  • Agent Integration Guide: See AGENT_METHOD_INTEGRATION for AI-assisted integration

  • Example Methods: methods/generaldyg/, methods/taddy/, methods/bond_*/

  • graflag_runner Source: graflag-shared/libs/graflag_runner/

  • graflag_bond Source: graflag-shared/libs/graflag_bond/ (for PyGOD integration)