GraFlag Method Integration Guide
Complete guide for integrating new graph anomaly detection methods into GraFlag.
Overview
GraFlag supports two integration patterns:
Pattern A (
--pass-env-args): For methods with their own training script using argparse. The runner converts_-prefixed env vars to CLI arguments.Pattern B (direct env): For library-based methods (e.g., PyGOD via
graflag_bond). The method reads env vars directly.
Integrating a new method involves creating a standardized directory structure with 2-3 key files:
.env- Method configuration and parameters (required)Dockerfile- Container environment setup (required)train_graflag.py- GraFlag integration wrapper (Pattern A only)
Directory Structure
methods/
+-- your_method_name/ # Lowercase, alphanumeric + underscore
+-- .env # Method configuration (REQUIRED)
+-- Dockerfile # Container setup (REQUIRED)
+-- train_graflag.py # Integration wrapper (Pattern A)
+-- src/ # Original source code (optional)
+-- *.py # Additional helper files (optional)
Example: Pattern A method (generaldyg)
methods/generaldyg/
+-- .env # Configuration with _* parameters
+-- Dockerfile # CUDA 12.1 + PyTorch 2.1.2
+-- train_graflag.py # Training wrapper with ResultWriter
+-- dataset_all.py # Helper for loading full dataset
+-- src/ # Cloned from GitHub at build time
Example: Pattern B method (bond_dominant)
methods/bond_dominant/
+-- .env # Configuration with _* parameters
+-- Dockerfile # CUDA + PyGOD + graflag_bond
Step 1: Create .env Configuration File
The .env file defines method metadata and configurable parameters.
Template (Pattern A)
METHOD_NAME=your_method_name
DESCRIPTION=Brief description of the method
SOURCE_CODE=https://github.com/author/repo
SUPPORTED_DATASETS=dataset1,dataset2
COMMAND=python3 train_graflag.py
# Method-specific parameters (prefix with underscore)
# These will be:
# 1. Available as environment variables in container
# 2. Auto-extracted to CLI args if --pass-env-args is used
# 3. Listed in GUI for user configuration
_BATCH_SIZE=128
_N_EPOCHS=200
_LEARNING_RATE=0.0001
_HIDDEN_DIM=256
_DROPOUT=0.4
_SEED=42
Template (Pattern B – bond_* methods)
METHOD_NAME=bond_dominant
DESCRIPTION=Deep Anomaly Detection on Attributed Networks
SOURCE_CODE=https://github.com/pygod-team/pygod
SUPPORTED_DATASETS=bond_*
COMMAND=python3 -m graflag_bond.train
_HID_DIM=64
_NUM_LAYERS=4
_DROPOUT=0
_WEIGHT_DECAY=0
_LR=0.004
_EPOCH=100
_GPU=0
_BATCH_SIZE=0
Key Fields
Field |
Required |
Description |
Example |
|---|---|---|---|
|
Yes |
Unique method identifier (lowercase) |
|
|
Yes |
Short description |
|
|
Yes |
GitHub repo or paper link |
|
|
Yes |
Entry point command |
|
|
No |
Compatible datasets (comma-separated, wildcards ok) |
|
|
No |
User-configurable parameters (prefix with |
|
Parameter Naming Convention
Prefix with
_: All configurable parameters must start with underscoreUppercase: Use uppercase for consistency:
_LEARNING_RATE,_BATCH_SIZEReserved names: These are set by the orchestrator and cannot be overridden:
DATA,EXP,METHOD_NAME,COMMAND,MONITOR_INTERVAL
Step 2: Create Dockerfile
The Dockerfile defines the containerized execution environment.
Template (Pattern A)
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
# Install Python and system dependencies
RUN apt-get update && apt-get install -y \
python3 python3-pip git \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir --upgrade pip
# Install Python dependencies specific to your method
RUN pip install --no-cache-dir \
numpy scipy scikit-learn networkx pandas tqdm
# Install PyTorch with CUDA support (adjust version as needed)
RUN pip install --no-cache-dir \
torch torchvision torchaudio \
--index-url https://download.pytorch.org/whl/cu121
# Install PyTorch Geometric (if needed)
# RUN pip install --no-cache-dir torch-geometric
# RUN pip install --no-cache-dir \
# torch-scatter torch-sparse \
# -f https://data.pyg.org/whl/torch-2.1.0+cu121.html
WORKDIR /app
# Clone source code from GitHub
RUN git clone https://github.com/your/repo src
# Copy GraFlag integration files
COPY methods/your_method/train_graflag.py ./train_graflag.py
# Copy and install graflag_runner library
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner
# Entry point: --pass-env-args converts _* env vars to CLI args
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]
Template (Pattern B – bond_* methods)
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
python3 python3-pip git \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir --upgrade pip
RUN pip install --no-cache-dir \
torch torchvision torchaudio \
--index-url https://download.pytorch.org/whl/cu121
RUN pip install --no-cache-dir torch-geometric pygod
WORKDIR /app
# Install GraFlag libraries (runner + bond wrapper)
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner
RUN pip install --no-cache-dir ./libs/graflag_bond
# No --pass-env-args: graflag_bond reads env vars directly
CMD ["python3", "-m", "graflag_runner"]
Key Components
Base Image: Choose appropriate CUDA version for your method’s requirements
nvidia/cuda:12.1.0-runtime-ubuntu22.04(CUDA 12.1, newer methods)nvidia/cuda:11.1.1-runtime-ubuntu20.04(CUDA 11.1, older methods)
Dependencies: Install all required Python packages
Source Code: Either clone from GitHub or copy local files
GraFlag Libraries: Always copy and install
graflag_runnerCOPY libs/ ./libs/ RUN pip install --no-cache-dir ./libs/graflag_runner
Entry Point: Use
graflag_runnerwrapper for automatic monitoringPattern A:
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]Pattern B:
CMD ["python3", "-m", "graflag_runner"]
Note: The build context is the entire graflag-shared/ directory, so COPY libs/ and COPY methods/ paths work relative to it.
Step 3: Create Training Wrapper (train_graflag.py)
This is only needed for Pattern A methods. Pattern B methods use graflag_bond.train directly.
Core Requirements
Import GraFlag utilities
from graflag_runner import ResultWriter, info, warning, error
Access environment variables
import os data_dir = os.environ.get("DATA") # Input dataset path exp_dir = os.environ.get("EXP") # Output directory method_name = os.environ.get("METHOD_NAME")
Initialize ResultWriter
writer = ResultWriter()
Add metadata (optional but recommended)
writer.add_metadata( method_name="your_method", dataset=dataset_name, learning_rate=0.001, epochs=100 )
Save results using standardized format
writer.save_scores( result_type="NODE_ANOMALY_SCORES", # or TEMPORAL_*, EDGE_*, STREAM_* scores=anomaly_scores, # List or numpy array ground_truth=labels # Always include ground truth ) writer.finalize()
Track training progress (optional)
writer.spot("training", epoch=i, loss=loss, time_sec=epoch_time)
Result Types
Choose the appropriate result type for your method:
Result Type |
Description |
Data Format |
|---|---|---|
|
Static graph, node-level |
1D array: |
|
Static graph, edge-level |
1D array per edge |
|
Graph classification |
1D array per graph |
|
Dynamic graphs, node snapshots |
2D array: |
|
Dynamic graphs, edge snapshots |
2D array per timestamp |
|
Temporal graph classification |
2D array |
|
Streaming nodes |
1D array with timestamps |
|
Streaming edges |
1D array with timestamps |
|
Streaming graphs |
1D array with timestamps |
Template
"""
GraFlag-integrated training script for YourMethod.
"""
import sys
import os
import time
from pathlib import Path
# Add your method's source to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
import numpy as np
# Import your method's classes/functions
# from your_module import YourModel
# GraFlag integration
from graflag_runner import ResultWriter, info, warning, error
def load_dataset():
"""Load and preprocess dataset from DATA environment variable."""
data_dir = Path(os.environ.get("DATA"))
info(f"Loading dataset from: {data_dir}")
# Load your data format
# Example: edges, features, labels = load_data(data_dir)
return data
def train_model(data, config):
"""Train the model and return predictions."""
info(f"Training {config['method_name']}...")
# Initialize your model
model = YourModel(**config)
# Train
start_time = time.time()
model.fit(data)
training_time = time.time() - start_time
info(f"Training completed in {training_time:.2f}s")
# Get predictions
anomaly_scores = model.predict(data)
return anomaly_scores, training_time
def main():
"""Main execution function."""
# Get environment variables
data_dir = os.environ.get("DATA")
method_name = os.environ.get("METHOD_NAME")
info("=" * 60)
info(f"{method_name.upper()} Training")
info("=" * 60)
info(f"Data: {data_dir}")
# Initialize ResultWriter
writer = ResultWriter()
# Load dataset
data = load_dataset()
# Configuration from environment variables
config = {
'method_name': method_name,
'learning_rate': float(os.environ.get("_LEARNING_RATE", "0.001")),
'epochs': int(os.environ.get("_EPOCHS", "100")),
# Add more parameters as needed
}
# Add metadata
writer.add_metadata(**config)
# Train model
anomaly_scores, training_time = train_model(data, config)
# Save results
info("Saving results...")
writer.save_scores(
result_type="NODE_ANOMALY_SCORES", # Choose appropriate type
scores=anomaly_scores.tolist(),
ground_truth=labels.tolist() # Always include ground truth
)
# Track training metrics
writer.spot("training", time_sec=training_time)
# Finalize
writer.finalize()
info("=" * 60)
info(f"{method_name.upper()} completed successfully!")
info("=" * 60)
if __name__ == "__main__":
try:
main()
except Exception as e:
error(f"Execution failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
Step 4: Test Integration Locally
Before deploying to the cluster, test your integration:
1. Build Docker Image
cd /path/to/graflag-shared
docker build -f methods/your_method/Dockerfile -t your_method:latest .
2. Test Run Locally
docker run --rm \
-v $(pwd):/shared \
-e DATA=/shared/datasets/your_dataset \
-e EXP=/shared/experiments/test_exp \
-e METHOD_NAME=your_method \
-e COMMAND="python3 train_graflag.py" \
-e _BATCH_SIZE=64 \
-e _EPOCHS=10 \
your_method:latest
3. Verify Output
Check that results.json is created correctly:
cat experiments/test_exp/results.json
Expected structure:
{
"result_type": "NODE_ANOMALY_SCORES",
"scores": [0.1, 0.2, 0.3],
"ground_truth": [0, 0, 1],
"metadata": {
"method_name": "your_method"
}
}
Step 5: Deploy to GraFlag Cluster
Once tested, deploy to the GraFlag cluster:
2. Build and Run
graflag run -m your_method -d your_dataset --build
3. Run with Custom Parameters
graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64
4. Monitor and Evaluate
# Follow logs in real-time
graflag logs -e exp__your_method__your_dataset__TIMESTAMP -f
# Evaluate results
graflag evaluate -e exp__your_method__your_dataset__TIMESTAMP
Advanced Features
1. Streaming Large Results
For methods producing massive datasets, use streaming to avoid memory issues:
from graflag_runner import ResultWriter, StreamableArray
def generate_scores():
"""Generator that yields scores incrementally."""
for batch in large_dataset:
scores = model.predict(batch)
yield scores
writer = ResultWriter()
writer.save_scores(
result_type="NODE_ANOMALY_SCORES",
scores=StreamableArray(generate_scores()) # Wrap generator
)
writer.finalize()
2. Progress Tracking with spot()
Track arbitrary metrics during execution:
# Training metrics (creates training.csv)
writer.spot("training", epoch=i, loss=loss, accuracy=acc, time_sec=t)
# Validation metrics (creates validation.csv)
writer.spot("validation", epoch=i, val_loss=val_loss, val_auc=auc)
# Custom metrics (creates preprocessing.csv)
writer.spot("preprocessing", num_nodes=n, num_edges=e)
Schema is locked after the first call – subsequent calls must provide the same metric keys.
3. Environment Variable Extraction
Use --pass-env-args flag to automatically convert _* env vars to CLI arguments:
# In Dockerfile
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]
This converts:
_BATCH_SIZE=128->--batch_size 128_LEARNING_RATE=0.001->--learning_rate 0.001
Parameter names are lowercased by the runner. If the original method uses mixed-case arguments, add lowercase aliases:
parser.add_argument('--lr_g', '--lr_G', type=float, default=0.0001)
Troubleshooting
Common Issues
1. Import Errors
ModuleNotFoundError: No module named 'your_module'
Solution: Ensure sys.path.insert(0, "src") is before imports, or install package in Dockerfile
2. CUDA/GPU Issues
RuntimeError: CUDA out of memory
Solution: Reduce batch size, use _BATCH_SIZE parameter, or disable GPU with --no-gpu flag
3. Dataset Not Found
FileNotFoundError: Dataset not found
Solution: Check dataset name matches directory in datasets/, use lowercase
4. Results Not Saving
results.json empty or missing
Solution: Ensure writer.finalize() is called, check permissions on experiment directory
5. Container Crashes Silently Solution: Check logs with:
graflag logs -e exp__your_method__dataset__TIMESTAMP -f
6. Boolean Argument Errors
unrecognized arguments: True
Solution: Use str2bool helper instead of action='store_true':
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1', ''):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
raise argparse.ArgumentTypeError('Boolean value expected.')
parser.add_argument('--flag', type=str2bool, nargs='?', const=True, default=False)
Best Practices
Version Control: Pin all dependency versions in Dockerfile
Logging: Use
info(),warning(),error()from graflag_runner for consistent loggingResource Tracking: Let graflag_runner handle monitoring, don’t implement custom tracking
Error Handling: Wrap main() in try-except and call
sys.exit(1)on failureTesting: Always test locally before deploying to cluster
Reproducibility: Set random seeds, include in metadata
Ground Truth: Always include
ground_truthinsave_scores()for evaluation to work
Quick Reference
Environment Variables (Automatically Set)
Variable |
Description |
Example |
|---|---|---|
|
Input dataset directory |
|
|
Experiment output directory |
|
|
Method identifier |
|
|
Command from .env |
|
graflag_runner API
from graflag_runner import ResultWriter, info, warning, error
# Logging
info("Message")
warning("Warning message")
error("Error message")
# Result writing
writer = ResultWriter()
writer.save_scores(result_type="...", scores=[...], ground_truth=[...])
writer.add_metadata(method_name="...", dataset="...", ...)
writer.add_resource_metrics(exec_time_ms=1234.5, peak_memory_mb=512.3, peak_gpu_mb=2048.0)
writer.spot("training", epoch=1, loss=0.5, auc=0.85)
writer.finalize()
CLI Commands
# Build and run
graflag run -m your_method -d your_dataset --build
# Run with custom parameters
graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64
# Replay from saved config
graflag run --from-config service_config.json
# Check logs
graflag logs -e exp__your_method__dataset__timestamp -f
# Stop running experiment
graflag stop -e exp__your_method__dataset__timestamp
# Evaluate results
graflag evaluate -e exp__your_method__dataset__timestamp
Complete Checklist
[ ] Created
methods/your_method/directory[ ] Created
.envwith METHOD_NAME, DESCRIPTION, SOURCE_CODE, COMMAND[ ] Added SUPPORTED_DATASETS if applicable
[ ] Added configurable parameters with
_prefix[ ] Created
Dockerfilewith correct base image and dependencies[ ] Installed
graflag_runnerlibrary in Dockerfile[ ] Created
train_graflag.pyintegration wrapper (Pattern A) or installedgraflag_bond(Pattern B)[ ] Loaded data from
DATAenvironment variable[ ] Saved results with appropriate result_type and ground_truth
[ ] Called
writer.finalize()[ ] Tested locally with Docker
[ ] Verified
results.jsonformat[ ] Deployed to cluster and tested full pipeline
[ ] Evaluated results with
graflag evaluate
Example: Complete Integration (Minimal, Pattern A)
File: methods/mymethod/.env
METHOD_NAME=mymethod
DESCRIPTION=My Custom Graph Anomaly Detector
SOURCE_CODE=https://github.com/me/mymethod
COMMAND=python3 train_graflag.py
_LEARNING_RATE=0.001
_EPOCHS=100
File: methods/mymethod/Dockerfile
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y python3 python3-pip git && \
rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir numpy scipy scikit-learn torch
WORKDIR /app
COPY methods/mymethod/train_graflag.py ./train_graflag.py
COPY libs/ ./libs/
RUN pip install --no-cache-dir ./libs/graflag_runner
CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]
File: methods/mymethod/train_graflag.py
import os
import numpy as np
from pathlib import Path
from graflag_runner import ResultWriter, info
def main():
data_dir = Path(os.environ.get("DATA"))
writer = ResultWriter()
info("Computing anomaly scores...")
scores = np.random.rand(100).tolist()
labels = np.random.randint(0, 2, 100).tolist()
writer.save_scores(
result_type="NODE_ANOMALY_SCORES",
scores=scores,
ground_truth=labels
)
writer.finalize()
info("[OK] Done!")
if __name__ == "__main__":
main()
Test:
docker build -f methods/mymethod/Dockerfile -t mymethod:latest .
docker run --rm -v $(pwd):/shared \
-e DATA=/shared/datasets/test \
-e EXP=/shared/experiments/test \
-e METHOD_NAME=mymethod \
-e COMMAND="python3 train_graflag.py" \
mymethod:latest
Resources
Result Types Reference: See RESULTS_STANDARD
Agent Integration Guide: See AGENT_METHOD_INTEGRATION for AI-assisted integration
Example Methods:
methods/generaldyg/,methods/taddy/,methods/bond_*/graflag_runner Source:
graflag-shared/libs/graflag_runner/graflag_bond Source:
graflag-shared/libs/graflag_bond/(for PyGOD integration)