Multi-node training

Traditional multi-node distributed training requires manual coordination—discovering node IPs, assigning ranks, configuring network communication, and setting up shared storage. Flow eliminates this complexity through automatic coordination.

Instead of managing infrastructure, you focus on your training code:

@app.function(
    gpu="h100:8", 
    num_instances=4,
    distributed_mode="auto"
)
def distributed_training():
    # Flow automatically sets: RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT
    dist.init_process_group(backend="nccl")
    # Your training code here...

Flow handles node discovery, rank assignment, environment setup, shared storage, and debugging tools. Here's how each piece works.


Automatic Coordination

Flow's rendezvous system eliminates manual node coordination. When you specify num_instances > 1, Flow automatically handles node discovery, rank assignment, and environment setup.

From Single-Node to Multi-Node

Convert single-node training to multi-node by adding two parameters:

from flow import FlowApp
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

app = FlowApp()

@app.function(
    gpu="h100:8", 
    num_instances=4,                    # Flow coordinates 4 nodes automatically
    distributed_mode="auto",            # Enable automatic rendezvous
    image="nvcr.io/nvidia/pytorch:23.10-py3"
)
def distributed_training():
    """32-GPU distributed training with automatic coordination."""
    
    # Flow automatically sets these environment variables:
    node_rank = int(os.environ["FLOW_NODE_RANK"])    # 0, 1, 2, 3
    num_nodes = int(os.environ["FLOW_NUM_NODES"])    # 4  
    main_ip = os.environ["FLOW_MAIN_IP"]             # Master node IP
    
    # Standard PyTorch distributed variables (also set by Flow):
    # RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT
    
    print(f"Node {node_rank}/{num_nodes} - Master: {main_ip}")
    
    # Standard PyTorch distributed setup—no changes needed
    dist.init_process_group(backend="nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    
    # Your existing model training code
    model = create_model().cuda()
    model = DDP(model, device_ids=[local_rank])
    
    for epoch in range(10):
        loss = train_epoch(model)
        if dist.get_rank() == 0:  # Only master logs
            print(f"Epoch {epoch}: loss={loss:.4f}")
    
    return {"node": node_rank, "status": "complete"}

# Launch 32 GPUs with automatic coordination
result = distributed_training.remote()

How Flow's Rendezvous Works

When you submit the task, Flow's coordination system:

  1. Discovers all instances via the provider API

  2. Assigns deterministic ranks based on instance IDs

  3. Elects a leader (rank 0) and resolves its private IP

  4. Sets environment variables for PyTorch distributed training

  5. Synchronizes startup so all nodes begin together

Your training code uses standard PyTorch distributed patterns—dist.init_process_group(), DDP, dist.barrier()—without any Flow-specific modifications.

Environment Variables Set by Flow

Flow automatically configures the distributed training environment:

# Flow-specific coordination
FLOW_NODE_RANK=0        # This node's rank (0, 1, 2, 3)
FLOW_NUM_NODES=4        # Total number of nodes  
FLOW_MAIN_IP=10.0.1.5   # Private IP of rank 0 node

# PyTorch distributed training
RANK=0                  # Global process rank (0-31 for 32 GPUs)
LOCAL_RANK=0            # GPU index within this node (0-7)
WORLD_SIZE=32           # Total number of processes/GPUs
MASTER_ADDR=10.0.1.5    # Address for process group coordination
MASTER_PORT=29500       # Port for process group coordination

No manual configuration required—Flow handles the entire coordination process automatically.


Shared Storage

Multi-node training requires storage that all nodes can access simultaneously—shared datasets, model checkpoints, and training logs. Flow lets you specify volumes that automatically mount across all nodes in your deployment.

File Storage for Multi-Node Access

When you specify interface="file" in your volume configuration, Flow ensures the storage is accessible across all nodes:

@app.function(
    gpu="h100:8",
    num_instances=4,
    volumes={
        "/shared/datasets": {
            "name": "training-data",
            "size_gb": 1000,
            "interface": "file"        # Shared access across nodes
        },
        "/shared/checkpoints": {
            "name": "model-checkpoints", 
            "size_gb": 500,
            "interface": "file"        # All nodes can read/write
        }
    }
)
def training_with_shared_storage():
    """Distributed training with shared datasets and checkpoints."""
    
    import os
    from pathlib import Path
    
    node_rank = int(os.environ["FLOW_NODE_RANK"])
    
    # All nodes see the same files
    datasets_path = Path("/shared/datasets")
    checkpoints_path = Path("/shared/checkpoints") 
    
    dist.init_process_group(backend="nccl")
    
    # Load dataset (accessible from all nodes)
    dataset = load_dataset_from_path(datasets_path)
    model = create_model().cuda()
    model = DDP(model)
    
    # Coordinated checkpointing
    for epoch in range(10):
        loss = train_epoch(model, dataset)
        
        # Only master node saves checkpoints
        if node_rank == 0:
            checkpoint_path = checkpoints_path / f"epoch_{epoch}.pt" 
            torch.save(model.state_dict(), checkpoint_path)
            print(f"Checkpoint saved: {checkpoint_path}")
        
        # All nodes wait for checkpoint save
        dist.barrier()
    
    return {"epochs_completed": 10, "checkpoints_saved": True}

result = training_with_shared_storage.remote()

S3 Dataset Integration

For read-only datasets, you can mount S3 buckets that appear on all nodes:

@app.function(
    gpu="h100:8", 
    num_instances=4,
    distributed_mode="auto"
)
def training_with_s3_data():
    """All nodes get identical S3 mount access."""
    
    # S3 data available at /data on all nodes
    dataset_path = "/data"
    
    dist.init_process_group(backend="nccl")
    
    # Load dataset from S3 mount
    dataset = load_dataset_from_s3_mount(dataset_path)
    model = create_model().cuda()
    model = DDP(model)
    
    # Train with shared S3 data
    for epoch in range(10):
        loss = train_epoch(model, dataset)
    
    return {"training": "complete"}

# Submit with S3 mount (automatically available on all nodes)
from flow import Flow
flow = Flow()
task = flow.run(
    training_with_s3_data._build_task_config(""),
    mounts={"/data": "s3://ml-datasets/imagenet"}
)

Volume Configuration

Flow supports different storage configurations:

  • File storage (interface="file"): Shared access across all nodes. Use for datasets, checkpoints, and logs that need multi-node read/write access.

  • S3 mounts: Read-only access to S3 buckets, mounted on all nodes. Useful for large datasets without persistent storage costs.

When you specify volumes in a multi-node function, Flow ensures they appear simultaneously on every node, eliminating manual synchronization between nodes.


Debugging Multi-Node Training

When distributed training fails, you need to inspect individual nodes—check GPU utilization, NCCL communication, memory usage, and process status. Flow provides node-specific SSH access and log aggregation to simplify multi-node debugging.

Node-Specific SSH Access

Flow lets you SSH directly to any node in your multi-node deployment:

# SSH to specific nodes in your cluster
flow ssh training-task-xyz --node 0    # Master node
flow ssh training-task-xyz --node 1    # Worker node 1
flow ssh training-task-xyz --node 2    # Worker node 2  
flow ssh training-task-xyz --node 3    # Worker node 3

# Run commands on specific nodes
flow ssh training-task-xyz --node 1 --command "nvidia-smi"
flow ssh training-task-xyz --node 2 --command "ps aux | grep python"

Debugging Workflows

Check GPU utilization across all nodes to identify performance issues:

# Monitor GPU usage across all nodes
for node in {0..3}; do
    echo "=== Node $node GPU Status ==="
    flow ssh training-task-xyz --node $node --command "nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader"
done

# Check NCCL communication logs
for node in {0..3}; do
    echo "=== Node $node NCCL Status ==="
    flow ssh training-task-xyz --node $node --command "grep 'NCCL' /var/log/training.log | tail -5"
done

Log Aggregation

Flow aggregates logs from all nodes with automatic node identification:

# Get logs from all nodes, timestamped and labeled
task = flow.get_task("training-task-xyz")
logs = task.logs()

# Example output:
# [Node 0] [10:30:15] NCCL: Connected to all peers successfully
# [Node 1] [10:30:16] NCCL: Connection timeout to 10.0.1.5:29500  
# [Node 2] [10:30:16] NCCL: Connected to all peers successfully
# [Node 3] [10:30:16] NCCL: Connected to all peers successfully

# Get logs from specific nodes
node_1_logs = task.logs(node=1)  # Debug problematic node
print("Node 1 specific logs:", node_1_logs)

# Stream logs in real-time
for line in task.logs(follow=True):
    print(line)

Common Debugging Scenarios

NCCL Communication Issues: Use node-specific SSH to check network connectivity and NCCL environment variables on individual nodes.

Uneven GPU Utilization: SSH to underutilized nodes to check process status, memory usage, and data loading bottlenecks.

Hanging Training: Check if specific nodes are stuck in barriers or collective operations using per-node process inspection.

Flow's node-specific access eliminates the need to manage separate SSH connections or correlate logs manually across multiple instances.


Production Configuration

Production multi-node training requires cost controls and performance optimization. Flow provides built-in safeguards and configuration options for production workloads.

Cost Protection

Flow includes automatic cost protection mechanisms for large-scale training:

@app.function(
    gpu="h100-80gb.sxm.8x",            # 8x H100 per node
    num_instances=8,                    # 64 GPUs total
    
    # Cost protection
    priority="med",                     # Balanced pricing tier
    max_price_per_hour=50.0,           # $50 per instance (total: 8 × $50 = $400/hour)
    max_run_time_hours=12,             # Auto-terminate after 12 hours
    
    # Production image and environment
    image="nvcr.io/nvidia/pytorch:24.01-py3",
    environment={
        "NCCL_DEBUG": "WARN",
        "NCCL_TREE_THRESHOLD": "0",    # Force ring algorithms for large clusters
        "CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7"
    }
)
def production_training(learning_rate: float = 1e-4, batch_size: int = 32):
    # Training code here...
    return {"final_loss": epoch_loss, "epochs": 100, "gpus_used": world_size}

# Monitor costs during training
task = production_training.spawn(learning_rate=5e-5)
print(f"Current cost: ${task.estimated_cost:.2f}")
print(f"Cost cap: ${task.max_price_per_hour * task.num_instances:.2f}/hour")

Performance Optimization

Flow automatically provisions high-speed networking for multi-node workloads. For explicit control, specify interconnect requirements:

@app.function(
    gpu="h100:8",
    num_instances=16,                        # 128 GPUs
    internode_interconnect="InfiniBand",     # Force InfiniBand networking
    intranode_interconnect="SXM5",           # Force SXM5 within nodes
    environment={
        "NCCL_IB_DISABLE": "0",              # Enable InfiniBand for NCCL
        "NCCL_NET_GDR_LEVEL": "3"            # GPU Direct RDMA
    }
)
def high_performance_training():
    """Optimized for maximum multi-node performance."""
    pass

The complete source code for this guide and all examples are available in the Flow repository on GitHub.

Last updated