# GraFlag Method Integration Guide Complete guide for integrating new graph anomaly detection methods into GraFlag. --- ## Overview GraFlag supports two integration patterns: - **Pattern A** (`--pass-env-args`): For methods with their own training script using argparse. The runner converts `_`-prefixed env vars to CLI arguments. - **Pattern B** (direct env): For library-based methods (e.g., PyGOD via `graflag_bond`). The method reads env vars directly. Integrating a new method involves creating a standardized directory structure with 2-3 key files: 1. **`.env`** - Method configuration and parameters (required) 2. **`Dockerfile`** - Container environment setup (required) 3. **`train_graflag.py`** - GraFlag integration wrapper (Pattern A only) --- ## Directory Structure ``` methods/ +-- your_method_name/ # Lowercase, alphanumeric + underscore +-- .env # Method configuration (REQUIRED) +-- Dockerfile # Container setup (REQUIRED) +-- train_graflag.py # Integration wrapper (Pattern A) +-- src/ # Original source code (optional) +-- *.py # Additional helper files (optional) ``` **Example: Pattern A method (generaldyg)** ``` methods/generaldyg/ +-- .env # Configuration with _* parameters +-- Dockerfile # CUDA 12.1 + PyTorch 2.1.2 +-- train_graflag.py # Training wrapper with ResultWriter +-- dataset_all.py # Helper for loading full dataset +-- src/ # Cloned from GitHub at build time ``` **Example: Pattern B method (bond_dominant)** ``` methods/bond_dominant/ +-- .env # Configuration with _* parameters +-- Dockerfile # CUDA + PyGOD + graflag_bond ``` --- ## Step 1: Create `.env` Configuration File The `.env` file defines method metadata and configurable parameters. ### Template (Pattern A) ```bash METHOD_NAME=your_method_name DESCRIPTION=Brief description of the method SOURCE_CODE=https://github.com/author/repo SUPPORTED_DATASETS=dataset1,dataset2 COMMAND=python3 train_graflag.py # Method-specific parameters (prefix with underscore) # These will be: # 1. Available as environment variables in container # 2. Auto-extracted to CLI args if --pass-env-args is used # 3. Listed in GUI for user configuration _BATCH_SIZE=128 _N_EPOCHS=200 _LEARNING_RATE=0.0001 _HIDDEN_DIM=256 _DROPOUT=0.4 _SEED=42 ``` ### Template (Pattern B -- bond_* methods) ```bash METHOD_NAME=bond_dominant DESCRIPTION=Deep Anomaly Detection on Attributed Networks SOURCE_CODE=https://github.com/pygod-team/pygod SUPPORTED_DATASETS=bond_* COMMAND=python3 -m graflag_bond.train _HID_DIM=64 _NUM_LAYERS=4 _DROPOUT=0 _WEIGHT_DECAY=0 _LR=0.004 _EPOCH=100 _GPU=0 _BATCH_SIZE=0 ``` ### Key Fields | Field | Required | Description | Example | |-------|----------|-------------|---------| | `METHOD_NAME` | Yes | Unique method identifier (lowercase) | `generaldyg` | | `DESCRIPTION` | Yes | Short description | `A Generalizable Anomaly Detection Method` | | `SOURCE_CODE` | Yes | GitHub repo or paper link | `https://github.com/...` | | `COMMAND` | Yes | Entry point command | `python3 train_graflag.py` | | `SUPPORTED_DATASETS` | No | Compatible datasets (comma-separated, wildcards ok) | `bond_*` | | `_PARAMETER` | No | User-configurable parameters (prefix with `_`) | `_BATCH_SIZE=128` | ### Parameter Naming Convention - **Prefix with `_`**: All configurable parameters must start with underscore - **Uppercase**: Use uppercase for consistency: `_LEARNING_RATE`, `_BATCH_SIZE` - **Reserved names**: These are set by the orchestrator and cannot be overridden: `DATA`, `EXP`, `METHOD_NAME`, `COMMAND`, `MONITOR_INTERVAL` --- ## Step 2: Create Dockerfile The Dockerfile defines the containerized execution environment. ### Template (Pattern A) ```dockerfile FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04 ENV DEBIAN_FRONTEND=noninteractive # Install Python and system dependencies RUN apt-get update && apt-get install -y \ python3 python3-pip git \ && rm -rf /var/lib/apt/lists/* RUN pip install --no-cache-dir --upgrade pip # Install Python dependencies specific to your method RUN pip install --no-cache-dir \ numpy scipy scikit-learn networkx pandas tqdm # Install PyTorch with CUDA support (adjust version as needed) RUN pip install --no-cache-dir \ torch torchvision torchaudio \ --index-url https://download.pytorch.org/whl/cu121 # Install PyTorch Geometric (if needed) # RUN pip install --no-cache-dir torch-geometric # RUN pip install --no-cache-dir \ # torch-scatter torch-sparse \ # -f https://data.pyg.org/whl/torch-2.1.0+cu121.html WORKDIR /app # Clone source code from GitHub RUN git clone https://github.com/your/repo src # Copy GraFlag integration files COPY methods/your_method/train_graflag.py ./train_graflag.py # Copy and install graflag_runner library COPY libs/ ./libs/ RUN pip install --no-cache-dir ./libs/graflag_runner # Entry point: --pass-env-args converts _* env vars to CLI args CMD ["python3", "-m", "graflag_runner", "--pass-env-args"] ``` ### Template (Pattern B -- bond_* methods) ```dockerfile FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04 ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y \ python3 python3-pip git \ && rm -rf /var/lib/apt/lists/* RUN pip install --no-cache-dir --upgrade pip RUN pip install --no-cache-dir \ torch torchvision torchaudio \ --index-url https://download.pytorch.org/whl/cu121 RUN pip install --no-cache-dir torch-geometric pygod WORKDIR /app # Install GraFlag libraries (runner + bond wrapper) COPY libs/ ./libs/ RUN pip install --no-cache-dir ./libs/graflag_runner RUN pip install --no-cache-dir ./libs/graflag_bond # No --pass-env-args: graflag_bond reads env vars directly CMD ["python3", "-m", "graflag_runner"] ``` ### Key Components 1. **Base Image**: Choose appropriate CUDA version for your method's requirements - `nvidia/cuda:12.1.0-runtime-ubuntu22.04` (CUDA 12.1, newer methods) - `nvidia/cuda:11.1.1-runtime-ubuntu20.04` (CUDA 11.1, older methods) 2. **Dependencies**: Install all required Python packages 3. **Source Code**: Either clone from GitHub or copy local files 4. **GraFlag Libraries**: Always copy and install `graflag_runner` ```dockerfile COPY libs/ ./libs/ RUN pip install --no-cache-dir ./libs/graflag_runner ``` 5. **Entry Point**: Use `graflag_runner` wrapper for automatic monitoring - Pattern A: `CMD ["python3", "-m", "graflag_runner", "--pass-env-args"]` - Pattern B: `CMD ["python3", "-m", "graflag_runner"]` **Note**: The build context is the entire `graflag-shared/` directory, so `COPY libs/` and `COPY methods/` paths work relative to it. --- ## Step 3: Create Training Wrapper (`train_graflag.py`) This is only needed for **Pattern A** methods. Pattern B methods use `graflag_bond.train` directly. ### Core Requirements 1. **Import GraFlag utilities** ```python from graflag_runner import ResultWriter, info, warning, error ``` 2. **Access environment variables** ```python import os data_dir = os.environ.get("DATA") # Input dataset path exp_dir = os.environ.get("EXP") # Output directory method_name = os.environ.get("METHOD_NAME") ``` 3. **Initialize ResultWriter** ```python writer = ResultWriter() ``` 4. **Add metadata** (optional but recommended) ```python writer.add_metadata( method_name="your_method", dataset=dataset_name, learning_rate=0.001, epochs=100 ) ``` 5. **Save results using standardized format** ```python writer.save_scores( result_type="NODE_ANOMALY_SCORES", # or TEMPORAL_*, EDGE_*, STREAM_* scores=anomaly_scores, # List or numpy array ground_truth=labels # Always include ground truth ) writer.finalize() ``` 6. **Track training progress** (optional) ```python writer.spot("training", epoch=i, loss=loss, time_sec=epoch_time) ``` ### Result Types Choose the appropriate result type for your method: | Result Type | Description | Data Format | |------------|-------------|-------------| | `NODE_ANOMALY_SCORES` | Static graph, node-level | 1D array: `[0.1, 0.2, ...]` | | `EDGE_ANOMALY_SCORES` | Static graph, edge-level | 1D array per edge | | `GRAPH_ANOMALY_SCORES` | Graph classification | 1D array per graph | | `TEMPORAL_NODE_ANOMALY_SCORES` | Dynamic graphs, node snapshots | 2D array: `[[t0_scores], [t1_scores]]` | | `TEMPORAL_EDGE_ANOMALY_SCORES` | Dynamic graphs, edge snapshots | 2D array per timestamp | | `TEMPORAL_GRAPH_ANOMALY_SCORES` | Temporal graph classification | 2D array | | `NODE_STREAM_ANOMALY_SCORES` | Streaming nodes | 1D array with timestamps | | `EDGE_STREAM_ANOMALY_SCORES` | Streaming edges | 1D array with timestamps | | `GRAPH_STREAM_ANOMALY_SCORES` | Streaming graphs | 1D array with timestamps | ### Template ```python """ GraFlag-integrated training script for YourMethod. """ import sys import os import time from pathlib import Path # Add your method's source to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) import numpy as np # Import your method's classes/functions # from your_module import YourModel # GraFlag integration from graflag_runner import ResultWriter, info, warning, error def load_dataset(): """Load and preprocess dataset from DATA environment variable.""" data_dir = Path(os.environ.get("DATA")) info(f"Loading dataset from: {data_dir}") # Load your data format # Example: edges, features, labels = load_data(data_dir) return data def train_model(data, config): """Train the model and return predictions.""" info(f"Training {config['method_name']}...") # Initialize your model model = YourModel(**config) # Train start_time = time.time() model.fit(data) training_time = time.time() - start_time info(f"Training completed in {training_time:.2f}s") # Get predictions anomaly_scores = model.predict(data) return anomaly_scores, training_time def main(): """Main execution function.""" # Get environment variables data_dir = os.environ.get("DATA") method_name = os.environ.get("METHOD_NAME") info("=" * 60) info(f"{method_name.upper()} Training") info("=" * 60) info(f"Data: {data_dir}") # Initialize ResultWriter writer = ResultWriter() # Load dataset data = load_dataset() # Configuration from environment variables config = { 'method_name': method_name, 'learning_rate': float(os.environ.get("_LEARNING_RATE", "0.001")), 'epochs': int(os.environ.get("_EPOCHS", "100")), # Add more parameters as needed } # Add metadata writer.add_metadata(**config) # Train model anomaly_scores, training_time = train_model(data, config) # Save results info("Saving results...") writer.save_scores( result_type="NODE_ANOMALY_SCORES", # Choose appropriate type scores=anomaly_scores.tolist(), ground_truth=labels.tolist() # Always include ground truth ) # Track training metrics writer.spot("training", time_sec=training_time) # Finalize writer.finalize() info("=" * 60) info(f"{method_name.upper()} completed successfully!") info("=" * 60) if __name__ == "__main__": try: main() except Exception as e: error(f"Execution failed: {e}") import traceback traceback.print_exc() sys.exit(1) ``` --- ## Step 4: Test Integration Locally Before deploying to the cluster, test your integration: ### 1. Build Docker Image ```bash cd /path/to/graflag-shared docker build -f methods/your_method/Dockerfile -t your_method:latest . ``` ### 2. Test Run Locally ```bash docker run --rm \ -v $(pwd):/shared \ -e DATA=/shared/datasets/your_dataset \ -e EXP=/shared/experiments/test_exp \ -e METHOD_NAME=your_method \ -e COMMAND="python3 train_graflag.py" \ -e _BATCH_SIZE=64 \ -e _EPOCHS=10 \ your_method:latest ``` ### 3. Verify Output Check that `results.json` is created correctly: ```bash cat experiments/test_exp/results.json ``` Expected structure: ```json { "result_type": "NODE_ANOMALY_SCORES", "scores": [0.1, 0.2, 0.3], "ground_truth": [0, 0, 1], "metadata": { "method_name": "your_method" } } ``` --- ## Step 5: Deploy to GraFlag Cluster Once tested, deploy to the GraFlag cluster: ### 1. Sync Method to Shared Directory ```bash graflag sync --path methods/your_method ``` ### 2. Build and Run ```bash graflag run -m your_method -d your_dataset --build ``` ### 3. Run with Custom Parameters ```bash graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64 ``` ### 4. Monitor and Evaluate ```bash # Follow logs in real-time graflag logs -e exp__your_method__your_dataset__TIMESTAMP -f # Evaluate results graflag evaluate -e exp__your_method__your_dataset__TIMESTAMP ``` --- ## Advanced Features ### 1. Streaming Large Results For methods producing massive datasets, use streaming to avoid memory issues: ```python from graflag_runner import ResultWriter, StreamableArray def generate_scores(): """Generator that yields scores incrementally.""" for batch in large_dataset: scores = model.predict(batch) yield scores writer = ResultWriter() writer.save_scores( result_type="NODE_ANOMALY_SCORES", scores=StreamableArray(generate_scores()) # Wrap generator ) writer.finalize() ``` ### 2. Progress Tracking with `spot()` Track arbitrary metrics during execution: ```python # Training metrics (creates training.csv) writer.spot("training", epoch=i, loss=loss, accuracy=acc, time_sec=t) # Validation metrics (creates validation.csv) writer.spot("validation", epoch=i, val_loss=val_loss, val_auc=auc) # Custom metrics (creates preprocessing.csv) writer.spot("preprocessing", num_nodes=n, num_edges=e) ``` Schema is locked after the first call -- subsequent calls must provide the same metric keys. ### 3. Environment Variable Extraction Use `--pass-env-args` flag to automatically convert `_*` env vars to CLI arguments: ```dockerfile # In Dockerfile CMD ["python3", "-m", "graflag_runner", "--pass-env-args"] ``` This converts: - `_BATCH_SIZE=128` -> `--batch_size 128` - `_LEARNING_RATE=0.001` -> `--learning_rate 0.001` Parameter names are **lowercased** by the runner. If the original method uses mixed-case arguments, add lowercase aliases: ```python parser.add_argument('--lr_g', '--lr_G', type=float, default=0.0001) ``` --- ## Troubleshooting ### Common Issues **1. Import Errors** ``` ModuleNotFoundError: No module named 'your_module' ``` **Solution**: Ensure `sys.path.insert(0, "src")` is before imports, or install package in Dockerfile **2. CUDA/GPU Issues** ``` RuntimeError: CUDA out of memory ``` **Solution**: Reduce batch size, use `_BATCH_SIZE` parameter, or disable GPU with `--no-gpu` flag **3. Dataset Not Found** ``` FileNotFoundError: Dataset not found ``` **Solution**: Check dataset name matches directory in `datasets/`, use lowercase **4. Results Not Saving** ``` results.json empty or missing ``` **Solution**: Ensure `writer.finalize()` is called, check permissions on experiment directory **5. Container Crashes Silently** **Solution**: Check logs with: ```bash graflag logs -e exp__your_method__dataset__TIMESTAMP -f ``` **6. Boolean Argument Errors** ``` unrecognized arguments: True ``` **Solution**: Use `str2bool` helper instead of `action='store_true'`: ```python def str2bool(v): if isinstance(v, bool): return v if v.lower() in ('yes', 'true', 't', 'y', '1', ''): return True elif v.lower() in ('no', 'false', 'f', 'n', '0'): return False raise argparse.ArgumentTypeError('Boolean value expected.') parser.add_argument('--flag', type=str2bool, nargs='?', const=True, default=False) ``` --- ## Best Practices 1. **Version Control**: Pin all dependency versions in Dockerfile 2. **Logging**: Use `info()`, `warning()`, `error()` from graflag_runner for consistent logging 3. **Resource Tracking**: Let graflag_runner handle monitoring, don't implement custom tracking 4. **Error Handling**: Wrap main() in try-except and call `sys.exit(1)` on failure 5. **Testing**: Always test locally before deploying to cluster 6. **Reproducibility**: Set random seeds, include in metadata 7. **Ground Truth**: Always include `ground_truth` in `save_scores()` for evaluation to work --- ## Quick Reference ### Environment Variables (Automatically Set) | Variable | Description | Example | |----------|-------------|---------| | `DATA` | Input dataset directory | `/shared/datasets/uci` | | `EXP` | Experiment output directory | `/shared/experiments/exp__...` | | `METHOD_NAME` | Method identifier | `generaldyg` | | `COMMAND` | Command from .env | `python3 train_graflag.py` | ### graflag_runner API ```python from graflag_runner import ResultWriter, info, warning, error # Logging info("Message") warning("Warning message") error("Error message") # Result writing writer = ResultWriter() writer.save_scores(result_type="...", scores=[...], ground_truth=[...]) writer.add_metadata(method_name="...", dataset="...", ...) writer.add_resource_metrics(exec_time_ms=1234.5, peak_memory_mb=512.3, peak_gpu_mb=2048.0) writer.spot("training", epoch=1, loss=0.5, auc=0.85) writer.finalize() ``` ### CLI Commands ```bash # Build and run graflag run -m your_method -d your_dataset --build # Run with custom parameters graflag run -m your_method -d your_dataset --params EPOCHS=50 BATCH_SIZE=64 # Replay from saved config graflag run --from-config service_config.json # Check logs graflag logs -e exp__your_method__dataset__timestamp -f # Stop running experiment graflag stop -e exp__your_method__dataset__timestamp # Evaluate results graflag evaluate -e exp__your_method__dataset__timestamp ``` --- ## Complete Checklist - [ ] Created `methods/your_method/` directory - [ ] Created `.env` with METHOD_NAME, DESCRIPTION, SOURCE_CODE, COMMAND - [ ] Added SUPPORTED_DATASETS if applicable - [ ] Added configurable parameters with `_` prefix - [ ] Created `Dockerfile` with correct base image and dependencies - [ ] Installed `graflag_runner` library in Dockerfile - [ ] Created `train_graflag.py` integration wrapper (Pattern A) or installed `graflag_bond` (Pattern B) - [ ] Loaded data from `DATA` environment variable - [ ] Saved results with appropriate result_type and ground_truth - [ ] Called `writer.finalize()` - [ ] Tested locally with Docker - [ ] Verified `results.json` format - [ ] Deployed to cluster and tested full pipeline - [ ] Evaluated results with `graflag evaluate` --- ## Example: Complete Integration (Minimal, Pattern A) **File: `methods/mymethod/.env`** ```bash METHOD_NAME=mymethod DESCRIPTION=My Custom Graph Anomaly Detector SOURCE_CODE=https://github.com/me/mymethod COMMAND=python3 train_graflag.py _LEARNING_RATE=0.001 _EPOCHS=100 ``` **File: `methods/mymethod/Dockerfile`** ```dockerfile FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04 ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y python3 python3-pip git && \ rm -rf /var/lib/apt/lists/* RUN pip install --no-cache-dir numpy scipy scikit-learn torch WORKDIR /app COPY methods/mymethod/train_graflag.py ./train_graflag.py COPY libs/ ./libs/ RUN pip install --no-cache-dir ./libs/graflag_runner CMD ["python3", "-m", "graflag_runner", "--pass-env-args"] ``` **File: `methods/mymethod/train_graflag.py`** ```python import os import numpy as np from pathlib import Path from graflag_runner import ResultWriter, info def main(): data_dir = Path(os.environ.get("DATA")) writer = ResultWriter() info("Computing anomaly scores...") scores = np.random.rand(100).tolist() labels = np.random.randint(0, 2, 100).tolist() writer.save_scores( result_type="NODE_ANOMALY_SCORES", scores=scores, ground_truth=labels ) writer.finalize() info("[OK] Done!") if __name__ == "__main__": main() ``` **Test:** ```bash docker build -f methods/mymethod/Dockerfile -t mymethod:latest . docker run --rm -v $(pwd):/shared \ -e DATA=/shared/datasets/test \ -e EXP=/shared/experiments/test \ -e METHOD_NAME=mymethod \ -e COMMAND="python3 train_graflag.py" \ mymethod:latest ``` --- ## Resources - **Result Types Reference**: See [RESULTS_STANDARD](RESULTS_STANDARD.md) - **Agent Integration Guide**: See [AGENT_METHOD_INTEGRATION](AGENT_METHOD_INTEGRATION.md) for AI-assisted integration - **Example Methods**: `methods/generaldyg/`, `methods/taddy/`, `methods/bond_*/` - **graflag_runner Source**: `graflag-shared/libs/graflag_runner/` - **graflag_bond Source**: `graflag-shared/libs/graflag_bond/` (for PyGOD integration)