graflag.docker_ops

Docker operations for GraFlag using Docker SDK.

class graflag.docker_ops.ReservedEnvVars(*values)[source]

Bases: Enum

Reserved environment variable names that should not be overridden by method parameters.

DATA = 'DATA'
EXP = 'EXP'
METHOD_NAME = 'METHOD_NAME'
COMMAND = 'COMMAND'
MONITOR_INTERVAL = 'MONITOR_INTERVAL'
classmethod get_names()[source]
class graflag.docker_ops.DockerManager(ssh_manager, config, hosts_file: str = 'hosts.yml')[source]

Bases: object

Handle Docker Swarm operations via Docker SDK with SSH tunnel.

property client: DockerClient

Lazy-initialize Docker client via SSH tunnel.

close()[source]

Close Docker client and SSH tunnel.

setup_swarm_manager()[source]

Initialize Docker Swarm on manager node.

get_swarm_token() str[source]

Get Docker Swarm worker join token.

setup_workers(token: str)[source]

Setup worker nodes to join the swarm (requires SSH to each worker).

get_nodes() List[Dict][source]

Get list of swarm nodes.

setup_local_registry()[source]

Setup local Docker registry service on manager.

build_method_image(method_name: str, tag: str = 'latest') str[source]

Build method Docker image and push to local registry.

Uses SSH because the build context resides on the remote host.

Returns:

Combined build and push log output.

build_evaluator_image() str[source]

Build graflag-evaluator image and push to registry.

Returns:

Registry image path.

create_service(exp_name: str, method_name: str, dataset: str, tag: str = 'latest', gpu_required: bool = True, method_params: dict = None) str[source]

Create Docker service for experiment.

create_evaluation_service(experiment_name: str) str[source]

Create Docker service to run evaluation.

list_services() List[Dict][source]

List all Docker services.

get_service_names() set[source]

Get set of all service names.

stop_service(service_name: str)[source]

Stop and remove a service.

cleanup_finished_service(service_name: str)[source]

Remove a finished service (safe if it doesn’t exist).

remove_evaluation_service(experiment_name: str)[source]

Remove evaluation service for an experiment.

get_service_logs(service_name: str, tail: int = 100) List[str][source]

Get recent logs for a service.

Uses SSH + Docker CLI because the Docker SDK log streaming is unreliable for swarm services.

follow_service_logs(service_name: str)[source]

Follow service logs in real-time until task finishes.

Uses SSH + Docker CLI subprocess because the Docker SDK’s follow mode does not stream reliably for swarm services.

service_exists(service_name: str) bool[source]

Check if a Docker service exists.

is_service_failed(service_name: str) bool[source]

Check if a service exists but all its tasks have failed.

get_cluster_status() Dict[source]

Get Docker Swarm cluster status.