# flow\.sdk.client

Unified GPU workload orchestration.

A concise, explicit interface for submitting, monitoring, and managing GPU workloads across providers. Designed for the common path with clear escape hatches.

#### Client

Clean SDK facade for Flow - simple for 90%, powerful for 10%.

This client provides a simplified interface to Flow's functionality, abstracting away complexity while maintaining power user access.

**Client.cancel**

```python
cancel(self, task_id: str, force: bool = False) -> bool
```

Cancel a running task.

**Parameters:**

* **task\_id**: Task to cancel
* **force**: Force termination

**Returns:**

True if cancelled successfully

**Client.provider\_port**

```python
provider_port(self)
```

Get direct access to provider port (power users).

**Returns:**

Provider port for advanced operations

**Client.reserve**

```python
reserve(
    self,
    gpu: str,
    duration_hours: int = 1,
    auto_renew: bool = False
) -> str
```

Reserve GPU capacity.

**Parameters:**

* **gpu**: GPU type to reserve
* **duration\_hours**: Reservation duration
* **auto\_renew**: Auto-renew reservation

**Returns:**

Reservation ID

**Client.run**

```python
run(
    self,
    command: str | list[str] | TaskSpec | TaskConfig,
    gpu: Optional[str] = None,
    cpus: Optional[int] = None,
    memory_gb: Optional[int] = None,
    wait: bool = False,
    kwargs
) -> Task
```

Run a task with specified resources.

**Parameters:**

* **command**: Command to run or task specification
* **gpu**: GPU type (e.g., "a100", "v100")
* **cpus**: Number of CPUs
* **memory\_gb**: Memory in GB
* **wait**: Wait for task completion
* \*\***kwargs**: Additional task configuration

**Returns:**

Task handle for monitoring and management

**Client.ssh**

```python
ssh(self, task_id: str) -> str
```

Get SSH command for task.

**Parameters:**

* **task\_id**: Task to connect to

**Returns:**

SSH command string

**Client.status**

```python
status(self, task_id: Optional[str] = None) -> list[Task]
```

Get status of task(s).

**Parameters:**

* **task\_id**: Specific task ID or None for all tasks

**Returns:**

List of tasks with current status

**Client.volumes**

```python
volumes(self) -> list[Volume]
```

List available volumes.

**Returns:**

List of volumes

#### Flow

Primary client for submitting and managing GPU jobs.

* Simple for 90% of use cases (one obvious way to run a task)
* Explicit configuration via `TaskConfig` when needed
* Clean access to logs, SSH, volumes, and instance discovery

**Flow\.cancel**

```python
cancel(self, task_id: str) -> None
```

Request cancellation of a running or pending task.

**Flow\.close**

```python
close(self) -> None
```

Release provider resources (idempotent).

**Flow\.create\_platform\_ssh\_key**

```python
create_platform_ssh_key(self, name: str, public_key: str) -> dict[str, str]
```

Create a platform SSH key via provider (preferred).

Falls back to SSH key manager ensure path when available.

**Flow\.create\_volume**

```python
create_volume(
    self,
    size_gb: int,
    name: str | None = None,
    interface: Literal['block', 'file'] = 'block',
    region: str | None = None
) -> Volume
```

Create a persistent volume.

**Parameters:**

* **size\_gb**: Capacity in GB.
* **name**: Optional display name (used in `volume://name`).
* **interface**: "block" (exclusive attach) or "file" (multi-attach).
* **region**: Optional region to create the volume in. When omitted, the provider's configured/default region is used.

**Returns:**

Volume.

**Examples:**

```python
Create and attach volumes to a task:
    >>> data = flow.create_volume(500, name="datasets")
    >>> ckpt = flow.create_volume(100, name="checkpoints")
    >>> cfg = TaskConfig(
    ...     name="train",
    ...     instance_type="a100",
    ...     command="python train.py",
    ...     volumes=[
    ...         {"volume_id": data.volume_id, "mount_path": "/data"},
    ...         {"volume_id": ckpt.volume_id, "mount_path": "/ckpts"},
    ...     ],
    ... )
    >>> task = flow.run(cfg)
```

**Flow\.delete\_platform\_ssh\_key**

```python
delete_platform_ssh_key(self, key_id: str) -> bool
```

Delete a platform SSH key via provider (preferred), else manager.

**Flow\.delete\_volume**

```python
delete_volume(self, volume_id: str) -> None
```

Delete a volume permanently (no recovery).

**Examples:**

```python
flow.delete_volume("vol_abc123")
```

**Flow\.dev\_context**

```python
dev_context(self, auto_stop: bool = False) -> DevEnvironment
```

Context manager for the dev VM.

**Parameters:**

* **auto\_stop**: Stop the VM on context exit.

**Returns:**

DevEnvironment.

**Flow\.ensure\_default\_ssh\_key**

```python
ensure_default_ssh_key(self) -> str | None
```

Ensure a default SSH key exists for the current project when supported.

Returns the key id when created or already present; None when unsupported.

**Flow\.find\_instances**

```python
find_instances(self, requirements: InstanceRequirements, limit: int = 10) -> list[AvailableInstance]
```

Return available instances that match the given constraints.

**Examples:**

```python
flow.find_instances({"gpu_type": "a100", "max_price": 8.0}, limit=5)
```

**Flow\.get\_provider\_init**

```python
get_provider_init(self) -> IProviderInit
```

Return the provider's initialization interface.

**Flow\.get\_remote\_operations**

```python
get_remote_operations(self) -> object
```

Return the provider's remote operations interface.

**Flow\.get\_reservation**

```python
get_reservation(self, reservation_id: str) -> Any | None
```

Get a reservation if supported by the provider.

Returns None when unsupported.

**Flow\.get\_ssh\_key\_manager**

```python
get_ssh_key_manager(self)
```

Return the provider's SSH key manager interface.

**Flow\.get\_ssh\_tunnel\_manager**

```python
get_ssh_tunnel_manager(self) -> object
```

Return the provider's SSH tunnel manager.

**Flow\.get\_task**

```python
get_task(self, task_id: str) -> Task
```

Return a `Task` handle for an existing job.

**Examples:**

```python
t = flow.get_task(task_id)
>>> print(t.status)
```

**Flow\.get\_task\_ssh\_connection\_info**

```python
get_task_ssh_connection_info(self, task_id: str) -> tuple[Path | None, str]
```

Return (ssh\_key\_path, error\_message) for a task, if provider supports it.

**Flow\.get\_user**

```python
get_user(self, user_id: str) -> Any | None
```

Get user info if provider exposes it; otherwise None.

Falls back to raw HTTP via provider.http when available.

**Flow\.get\_user\_teammates**

```python
get_user_teammates(self, user_id: str) -> Any
```

Return teammates for a given user when supported; else \[].

**Flow\.list\_platform\_ssh\_keys**

```python
list_platform_ssh_keys(self) -> list[dict[str, str]]
```

List platform SSH keys via provider (preferred).

Falls back to provider init interface if provider does not implement the SSH key methods. If the current provider lacks SSH key support (e.g., local), fall back to calling the Mithril API directly using the configured auth token and base URL. This keeps SSH key management available regardless of the active compute provider.

**Flow\.list\_projects**

```python
list_projects(self) -> list[dict[str, str]]
```

List provider projects accessible to the current credentials.

**Flow\.list\_reservations**

```python
list_reservations(self, params: dict[str, Any] | None = None) -> list[Any]
```

List reservations if supported by the provider.

Facet-first; falls back to provider method when available. Returns an empty list when unsupported.

**Flow\.list\_ssh\_keys**

```python
list_ssh_keys(self, project_id: str | None = None) -> list[dict[str, str]]
```

List SSH keys (optionally filtered by project).

**Flow\.list\_tasks**

```python
list_tasks(
    self,
    status: TaskStatus | list[TaskStatus] | None = None,
    limit: int = 10,
    force_refresh: bool = False
) -> list[Task]
```

List recent tasks, optionally filtered by status.

**Examples:**

```python
List running tasks and print their names:
    >>> from flow.sdk.models import TaskStatus
    >>> for t in flow.list_tasks(status=TaskStatus.RUNNING):
    ...     print(t.name)
```

**Flow\.list\_volumes**

```python
list_volumes(self, limit: int = 100) -> list[Volume]
```

List volumes for the current project (newest first).

**Examples:**

```python
for v in flow.list_volumes():
...     print(v.name, v.size_gb)
```

**Flow\.logs**

```python
logs(
    self,
    task_id: str,
    follow: bool = False,
    tail: int = 100,
    stderr: bool = False,
    source: str | None = None,
    stream: str | None = None
) -> str | Iterator[str]
```

Return recent logs or stream them in real time.

**Parameters:**

* **task\_id**: The task to read logs from.
* **follow**: If True, stream logs until the task completes.
* **tail**: Number of trailing lines to fetch when `follow` is False.
* **stderr**: If True, select stderr (may be merged by some providers).

**Returns:**

str | Iterator\[str]: A string (when `follow=False`) or an iterator of lines.

**Examples:**

```python
Fetch and print the last 50 lines:
    >>> print(flow.logs(task_id, tail=50))
```

```python
    Stream logs and stop after an error:
    >>> for line in flow.logs(task_id, follow=True):
    ...     if "ERROR" in line:
    ...         break
```

**Flow\.mount\_volume**

```python
mount_volume(
    self,
    volume_id: str,
    task_id: str,
    mount_point: str | None = None
) -> bool
```

Attach a volume to a task's configuration at an optional mount point.

**Examples:**

```python
Mount by name to the default path:
    >>> flow.mount_volume("datasets", task_id)
```

```python
    Mount by ID to a custom path:
    >>> flow.mount_volume("vol_abc123", task_id, mount_point="/volumes/inputs")
```

**Flow\.normalize\_instance\_request**

```python
normalize_instance_request(self, gpu_count: int, gpu_type: str | None = None) -> tuple[str, int, str | None]
```

Normalize a GPU request to an instance\_type and instance count.

* Provider-first: attempts provider.normalize\_instance\_request(). Accepts both legacy tuple shape `(instance_type, num_instances, warning)` and dict shape `{"instance_type": str, "num_instances": int, ...}`.
* Robust fallback: uses a simple heuristic (2x/4x/8x) when unsupported or when provider returns an unexpected shape.

**Flow\.reservation\_availability**

```python
reservation_availability(self, params: dict[str, Any]) -> list[dict[str, Any]]
```

Return reservation availability if supported by the provider.

**Parameters:**

* **params**: Keys include instance\_type, region, earliest\_start\_time, latest\_end\_time

**Returns:**

A list of slots (as dicts). Empty list when unsupported or on error.

**Flow\.resolve\_ssh\_endpoint**

```python
resolve_ssh_endpoint(self, task_id: str, node: int | None = None) -> tuple[str, int]
```

Resolve SSH endpoint (host, port) for a task (provider fallback).

**Flow\.run**

```python
run(
    self,
    task: TaskConfig | str | Path,
    wait: bool = False,
    mounts: str | dict[str, str] | None = None
) -> Task
```

Submit a task.

**Parameters:**

* **task**: `TaskConfig`, path to YAML, or string path for YAML.
* **wait**: If True, block until the task is running before returning.
* **mounts**: Optional data mounts; string or mapping of target->source.

**Returns:**

Task: Handle for status, logs, SSH, cancel, etc.

**Examples:**

```python
Command as a string with an explicit instance type:
    >>> task = flow.run("python train.py --epochs 10", instance_type="a100", wait=True)
```

```python
    Full TaskConfig with volumes and limit price:
    >>> from flow.sdk.models import TaskConfig, VolumeSpec
    >>> cfg = TaskConfig(
    ...     name="train",
    ...     instance_type="4xa100",
    ...     command=["python", "-m", "torch.distributed.run", "--nproc_per_node=4", "train.py"],
    ...     volumes=[VolumeSpec(size_gb=200, mount_path="/data", name="train-data")],
    ...     max_price_per_hour=12.0,
    ... )
    >>> task = flow.run(cfg)
```

```python
    Capability-based selection (cheapest GPU with >= 40GB):
    >>> cfg = TaskConfig(name="infer", min_gpu_memory_gb=40, command="python serve.py")
    >>> task = flow.run(cfg)
```

**Flow\.shell**

```python
shell(
    self,
    task_id: str,
    command: str | None = None,
    node: int | None = None,
    progress_context = None,
    record: bool = False
) -> None
```

Open an interactive shell or run a command on the task instance.

**Examples:**

```python
Open a shell:
    >>> flow.shell(task_id)
```

```python
    Run a one-off command:
    >>> flow.shell(task_id, command="nvidia-smi")
```

**Flow\.status**

```python
status(self, task_id: str) -> str
```

Return the task status string (pending, running, completed, failed, cancelled).

**Flow\.submit**

```python
submit(
    self,
    command: str,
    gpu: str | None = None,
    mounts: str | dict[str, str] | None = None,
    instance_type: str | None = None,
    wait: bool = False
) -> Task
```

Submit a shell command with minimal configuration.

**Parameters:**

* **command**: Passed to the container shell.
* **gpu**: e.g. "a100", "a100:4", or "gpu:40gb". Ignored if `instance_type` is set.
* **mounts**: Optional data mounts (string or mapping of target->source).
* **instance\_type**: Explicit override of the instance type.
* **wait**: If True, block until the task completes.

**Returns:**

Task.

**Examples:**

```python
Quick usage with GPU shorthand:
    >>> task = flow.submit("python train.py", gpu="a100")
```

```python
    Multiple mounts:
    >>> task = flow.submit(
    ...     "torchrun --nproc_per_node=4 train.py",
    ...     gpu="a100:4",
    ...     mounts={
    ...         "/data": "volume://datasets",
    ...         "/models": "s3://bucket/pretrained/",
    ...     },
    ...     wait=True,
    ... )
```

**Flow\.upload\_code\_to\_task**

```python
upload_code_to_task(
    self,
    task_id: str,
    source_dir: Path | None = None,
    timeout: int = 600,
    console: object | None = None,
    target_dir: str = '/workspace',
    progress_reporter: object | None = None,
    git_incremental: bool | None = None
) -> object
```

Upload code to an existing task (facet/provider fallback).

Prefers provider's richer upload (with progress). Falls back to storage facet `upload_directory` if available.

**Flow\.wait\_for\_ssh**

```python
wait_for_ssh(
    self,
    task_id: str,
    timeout: int = 600,
    show_progress: bool = True,
    progress_adapter: object | None = None
) -> Task
```

Block until SSH is ready for the task or time out.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.mithril.ai/flow-cli-and-sdk/sdk-research-preview/flow.sdk.client.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
