flow.sdk.client

Unified GPU workload orchestration.

A concise, explicit interface for submitting, monitoring, and managing GPU workloads across providers. Designed for the common path with clear escape hatches.

CatalogEntry

Instance catalog entry dictionary.

Client

Clean SDK facade for Flow - simple for 90%, powerful for 10%.

This client provides a simplified interface to Flow's functionality, abstracting away complexity while maintaining power user access.

Client.cancel

cancel(self, task_id: str, force: bool = False) -> bool

Cancel a running task.

Parameters:

  • task_id: Task to cancel

  • force: Force termination

Returns:

True if cancelled successfully

Client.provider_port

provider_port(self)

Get direct access to provider port (power users).

Returns:

Provider port for advanced operations

Client.reserve

reserve(
    self,
    gpu: str,
    duration_hours: int = 1,
    auto_renew: bool = False
) -> str

Reserve GPU capacity.

Parameters:

  • gpu: GPU type to reserve

  • duration_hours: Reservation duration

  • auto_renew: Auto-renew reservation

Returns:

Reservation ID

Client.run

run(
    self,
    command: str | list[str] | TaskSpec | TaskConfig,
    gpu: Optional[str] = None,
    cpus: Optional[int] = None,
    memory_gb: Optional[int] = None,
    wait: bool = False,
    kwargs
) -> Task

Run a task with specified resources.

Parameters:

  • command: Command to run or task specification

  • gpu: GPU type (e.g., "a100", "v100")

  • cpus: Number of CPUs

  • memory_gb: Memory in GB

  • wait: Wait for task completion

  • **kwargs: Additional task configuration

Returns:

Task handle for monitoring and management

Client.ssh

ssh(self, task_id: str) -> str

Get SSH command for task.

Parameters:

  • task_id: Task to connect to

Returns:

SSH command string

Client.status

status(self, task_id: Optional[str] = None) -> list[Task]

Get status of task(s).

Parameters:

  • task_id: Specific task ID or None for all tasks

Returns:

List of tasks with current status

Client.volumes

volumes(self) -> list[Volume]

List available volumes.

Returns:

List of volumes

Flow

Primary client for submitting and managing GPU jobs.

  • Simple for 90% of use cases (one obvious way to run a task)

  • Explicit configuration via TaskConfig when needed

  • Clean access to logs, SSH, volumes, and instance discovery

Flow.cancel

cancel(self, task_id: str) -> None

Request cancellation of a running or pending task.

Flow.close

close(self) -> None

Release provider resources (idempotent).

Flow.create_platform_ssh_key

create_platform_ssh_key(self, name: str, public_key: str) -> dict[str, str]

Create a platform SSH key via provider (preferred).

Falls back to SSH key manager ensure path when available.

Flow.create_volume

create_volume(
    self,
    size_gb: int,
    name: str | None = None,
    interface: Literal['block', 'file'] = 'block',
    region: str | None = None
) -> Volume

Create a persistent volume.

Parameters:

  • size_gb: Capacity in GB.

  • name: Optional display name (used in volume://name).

  • interface: "block" (exclusive attach) or "file" (multi-attach).

  • region: Optional region to create the volume in. When omitted, the provider's configured/default region is used.

Returns:

Volume.

Examples:

Create and attach volumes to a task:
    >>> data = flow.create_volume(500, name="datasets")
    >>> ckpt = flow.create_volume(100, name="checkpoints")
    >>> cfg = TaskConfig(
    ...     name="train",
    ...     instance_type="a100",
    ...     command="python train.py",
    ...     volumes=[
    ...         {"volume_id": data.volume_id, "mount_path": "/data"},
    ...         {"volume_id": ckpt.volume_id, "mount_path": "/ckpts"},
    ...     ],
    ... )
    >>> task = flow.run(cfg)

Flow.delete_platform_ssh_key

delete_platform_ssh_key(self, key_id: str) -> bool

Delete a platform SSH key via provider (preferred), else manager.

Flow.delete_volume

delete_volume(self, volume_id: str) -> None

Delete a volume permanently (no recovery).

Examples:

flow.delete_volume("vol_abc123")

Flow.dev_context

dev_context(self, auto_stop: bool = False) -> DevEnvironment

Context manager for the dev VM.

Parameters:

  • auto_stop: Stop the VM on context exit.

Returns:

DevEnvironment.

Flow.ensure_default_ssh_key

ensure_default_ssh_key(self) -> str | None

Ensure a default SSH key exists for the current project when supported.

Returns the key id when created or already present; None when unsupported.

Flow.find_instances

find_instances(self, requirements: InstanceRequirements, limit: int = 10) -> list[AvailableInstance]

Return available instances that match the given constraints.

Examples:

flow.find_instances({"gpu_type": "a100", "max_price": 8.0}, limit=5)

Flow.get_provider_init

get_provider_init(self) -> IProviderInit

Return the provider's initialization interface.

Flow.get_remote_operations

get_remote_operations(self) -> object

Return the provider's remote operations interface.

Flow.get_reservation

get_reservation(self, reservation_id: str) -> Any | None

Get a reservation if supported by the provider.

Returns None when unsupported.

Flow.get_ssh_key_manager

get_ssh_key_manager(self)

Return the provider's SSH key manager interface.

Flow.get_ssh_tunnel_manager

get_ssh_tunnel_manager(self) -> object

Return the provider's SSH tunnel manager.

Flow.get_task

get_task(self, task_id: str) -> Task

Return a Task handle for an existing job.

Examples:

t = flow.get_task(task_id)
>>> print(t.status)

Flow.get_task_ssh_connection_info

get_task_ssh_connection_info(self, task_id: str) -> tuple[Path | None, str]

Return (ssh_key_path, error_message) for a task, if provider supports it.

Flow.get_user

get_user(self, user_id: str) -> Any | None

Get user info if provider exposes it; otherwise None.

Falls back to raw HTTP via provider.http when available.

Flow.get_user_teammates

get_user_teammates(self, user_id: str) -> Any

Return teammates for a given user when supported; else [].

Flow.list_platform_ssh_keys

list_platform_ssh_keys(self) -> list[dict[str, str]]

List platform SSH keys via provider (preferred).

Falls back to provider init interface if provider does not implement the SSH key methods. If the current provider lacks SSH key support (e.g., local), fall back to calling the Mithril API directly using the configured auth token and base URL. This keeps SSH key management available regardless of the active compute provider.

Flow.list_projects

list_projects(self) -> list[dict[str, str]]

List provider projects accessible to the current credentials.

Flow.list_reservations

list_reservations(self, params: dict[str, Any] | None = None) -> list[Any]

List reservations if supported by the provider.

Facet-first; falls back to provider method when available. Returns an empty list when unsupported.

Flow.list_ssh_keys

list_ssh_keys(self, project_id: str | None = None) -> list[dict[str, str]]

List SSH keys (optionally filtered by project).

Flow.list_tasks

list_tasks(
    self,
    status: TaskStatus | list[TaskStatus] | None = None,
    limit: int = 10,
    force_refresh: bool = False
) -> list[Task]

List recent tasks, optionally filtered by status.

Examples:

List running tasks and print their names:
    >>> from flow.sdk.models import TaskStatus
    >>> for t in flow.list_tasks(status=TaskStatus.RUNNING):
    ...     print(t.name)

Flow.list_volumes

list_volumes(self, limit: int = 100) -> list[Volume]

List volumes for the current project (newest first).

Examples:

for v in flow.list_volumes():
...     print(v.name, v.size_gb)

Flow.logs

logs(
    self,
    task_id: str,
    follow: bool = False,
    tail: int = 100,
    stderr: bool = False,
    source: str | None = None,
    stream: str | None = None
) -> str | Iterator[str]

Return recent logs or stream them in real time.

Parameters:

  • task_id: The task to read logs from.

  • follow: If True, stream logs until the task completes.

  • tail: Number of trailing lines to fetch when follow is False.

  • stderr: If True, select stderr (may be merged by some providers).

Returns:

str | Iterator[str]: A string (when follow=False) or an iterator of lines.

Examples:

Fetch and print the last 50 lines:
    >>> print(flow.logs(task_id, tail=50))
    Stream logs and stop after an error:
    >>> for line in flow.logs(task_id, follow=True):
    ...     if "ERROR" in line:
    ...         break

Flow.mount_volume

mount_volume(
    self,
    volume_id: str,
    task_id: str,
    mount_point: str | None = None
) -> bool

Attach a volume to a task's configuration at an optional mount point.

Examples:

Mount by name to the default path:
    >>> flow.mount_volume("datasets", task_id)
    Mount by ID to a custom path:
    >>> flow.mount_volume("vol_abc123", task_id, mount_point="/volumes/inputs")

Flow.normalize_instance_request

normalize_instance_request(self, gpu_count: int, gpu_type: str | None = None) -> tuple[str, int, str | None]

Normalize a GPU request to an instance_type and instance count.

  • Provider-first: attempts provider.normalize_instance_request(). Accepts both legacy tuple shape (instance_type, num_instances, warning) and dict shape {"instance_type": str, "num_instances": int, ...}.

  • Robust fallback: uses a simple heuristic (2x/4x/8x) when unsupported or when provider returns an unexpected shape.

Flow.reservation_availability

reservation_availability(self, params: dict[str, Any]) -> list[dict[str, Any]]

Return reservation availability if supported by the provider.

Parameters:

  • params: Keys include instance_type, region, earliest_start_time, latest_end_time

Returns:

A list of slots (as dicts). Empty list when unsupported or on error.

Flow.resolve_ssh_endpoint

resolve_ssh_endpoint(self, task_id: str, node: int | None = None) -> tuple[str, int]

Resolve SSH endpoint (host, port) for a task (provider fallback).

Flow.run

run(
    self,
    task: TaskConfig | str | Path,
    wait: bool = False,
    mounts: str | dict[str, str] | None = None
) -> Task

Submit a task.

Parameters:

  • task: TaskConfig, path to YAML, or string path for YAML.

  • wait: If True, block until the task is running before returning.

  • mounts: Optional data mounts; string or mapping of target->source.

Returns:

Task: Handle for status, logs, SSH, cancel, etc.

Examples:

Command as a string with an explicit instance type:
    >>> task = flow.run("python train.py --epochs 10", instance_type="a100", wait=True)
    Full TaskConfig with volumes and limit price:
    >>> from flow.sdk.models import TaskConfig, VolumeSpec
    >>> cfg = TaskConfig(
    ...     name="train",
    ...     instance_type="4xa100",
    ...     command=["python", "-m", "torch.distributed.run", "--nproc_per_node=4", "train.py"],
    ...     volumes=[VolumeSpec(size_gb=200, mount_path="/data", name="train-data")],
    ...     max_price_per_hour=12.0,
    ... )
    >>> task = flow.run(cfg)
    Capability-based selection (cheapest GPU with >= 40GB):
    >>> cfg = TaskConfig(name="infer", min_gpu_memory_gb=40, command="python serve.py")
    >>> task = flow.run(cfg)

Flow.shell

shell(
    self,
    task_id: str,
    command: str | None = None,
    node: int | None = None,
    progress_context = None,
    record: bool = False
) -> None

Open an interactive shell or run a command on the task instance.

Examples:

Open a shell:
    >>> flow.shell(task_id)
    Run a one-off command:
    >>> flow.shell(task_id, command="nvidia-smi")

Flow.status

status(self, task_id: str) -> str

Return the task status string (pending, running, completed, failed, cancelled).

Flow.submit

submit(
    self,
    command: str,
    gpu: str | None = None,
    mounts: str | dict[str, str] | None = None,
    instance_type: str | None = None,
    wait: bool = False
) -> Task

Submit a shell command with minimal configuration.

Parameters:

  • command: Passed to the container shell.

  • gpu: e.g. "a100", "a100:4", or "gpu:40gb". Ignored if instance_type is set.

  • mounts: Optional data mounts (string or mapping of target->source).

  • instance_type: Explicit override of the instance type.

  • wait: If True, block until the task completes.

Returns:

Task.

Examples:

Quick usage with GPU shorthand:
    >>> task = flow.submit("python train.py", gpu="a100")
    Multiple mounts:
    >>> task = flow.submit(
    ...     "torchrun --nproc_per_node=4 train.py",
    ...     gpu="a100:4",
    ...     mounts={
    ...         "/data": "volume://datasets",
    ...         "/models": "s3://bucket/pretrained/",
    ...     },
    ...     wait=True,
    ... )

Flow.upload_code_to_task

upload_code_to_task(
    self,
    task_id: str,
    source_dir: Path | None = None,
    timeout: int = 600,
    console: object | None = None,
    target_dir: str = '/workspace',
    progress_reporter: object | None = None,
    git_incremental: bool | None = None
) -> object

Upload code to an existing task (facet/provider fallback).

Prefers provider's richer upload (with progress). Falls back to storage facet upload_directory if available.

Flow.wait_for_ssh

wait_for_ssh(
    self,
    task_id: str,
    timeout: int = 600,
    show_progress: bool = True,
    progress_adapter: object | None = None
) -> Task

Block until SSH is ready for the task or time out.

GPUInstanceDict

GPU instance dictionary returned by _find_gpus_by_memory().

InstanceRequirements

Instance requirements dictionary for find_instances().

TaskDict

Task dictionary returned by list() method.

Last updated