Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Togethercomputer Together python Batches Get Batch

From Leeroopedia
Attribute Value
Type Implementation
Domains Batch_Processing, Inference, API_Client
Repository togethercomputer/together-python
Source src/together/resources/batch.py:L42-88
Last Updated 2026-02-15 16:00 GMT

Overview

Implementation of batch job monitoring via three methods on the Batches class: get_batch(), list_batches(), and cancel_batch(). These enable polling for job status, listing all jobs, and cancelling running jobs.

Code Reference

get_batch

def get_batch(self, batch_job_id: str) -> BatchJob:
    requestor = api_requestor.APIRequestor(
        client=self._client,
    )

    response, _, _ = requestor.request(
        options=TogetherRequest(
            method="GET",
            url=f"batches/{batch_job_id}",
        ),
        stream=False,
    )

    assert isinstance(response, TogetherResponse)
    return BatchJob(**response.data)

Source: src/together/resources/batch.py:L42-56

list_batches

def list_batches(self) -> List[BatchJob]:
    requestor = api_requestor.APIRequestor(
        client=self._client,
    )

    response, _, _ = requestor.request(
        options=TogetherRequest(
            method="GET",
            url="batches",
        ),
        stream=False,
    )

    assert isinstance(response, TogetherResponse)
    jobs = response.data or []
    return [BatchJob(**job) for job in jobs]

Source: src/together/resources/batch.py:L58-73

cancel_batch

def cancel_batch(self, batch_job_id: str) -> BatchJob:
    requestor = api_requestor.APIRequestor(
        client=self._client,
    )

    response, _, _ = requestor.request(
        options=TogetherRequest(
            method="POST",
            url=f"batches/{batch_job_id}/cancel",
        ),
        stream=False,
    )

    return BatchJob(**response.data)

Source: src/together/resources/batch.py:L75-88

API Signatures

Batches.get_batch(batch_job_id: str) -> BatchJob
Batches.list_batches() -> List[BatchJob]
Batches.cancel_batch(batch_job_id: str) -> BatchJob
Method Parameters HTTP Method Endpoint Return Type
get_batch batch_job_id: str GET batches/{batch_job_id} BatchJob
list_batches (none) GET batches List[BatchJob]
cancel_batch batch_job_id: str POST batches/{batch_job_id}/cancel BatchJob

I/O Contract

Input (get_batch):

  • batch_job_id -- The unique identifier of the batch job to retrieve.

Input (cancel_batch):

  • batch_job_id -- The unique identifier of the batch job to cancel.

Output: A BatchJob object (or list of BatchJob objects for list_batches) with the following key fields:

Field Type Description
id string Unique batch job identifier
status BatchJobStatus Current job status
progress float Completion progress (0.0 to 1.0)
output_file_id string or None Output file ID (available when COMPLETED)
error_file_id string or None Error file ID (if errors occurred)
error string or None Error message (if the job failed)
completed_at datetime or None Completion timestamp
input_file_id string ID of the input file
endpoint string The target endpoint
created_at datetime Job creation timestamp
job_deadline datetime Deadline for job completion

Key Behavior

Response parsing differences:

  • get_batch() parses the response directly: BatchJob(**response.data).
  • list_batches() handles an empty response gracefully: response.data or [], then iterates to construct each BatchJob.
  • cancel_batch() returns the updated BatchJob with transitional status (typically CANCELING).

No pagination: The list_batches() method does not accept pagination parameters. It returns all batch jobs for the account in a single response.

Usage Examples

Polling for Batch Job Completion

import time
from together import Together

client = Together()

batch_job_id = "batch-abc123"

while True:
    batch_job = client.batches.get_batch(batch_job_id)
    print(f"Status: {batch_job.status}, Progress: {batch_job.progress:.1%}")

    if batch_job.status == "COMPLETED":
        print(f"Output file ID: {batch_job.output_file_id}")
        break
    elif batch_job.status in ("FAILED", "EXPIRED", "CANCELLED"):
        print(f"Job ended with status: {batch_job.status}")
        if batch_job.error:
            print(f"Error: {batch_job.error}")
        break

    time.sleep(60)  # Poll every 60 seconds

Listing All Batch Jobs

from together import Together

client = Together()

jobs = client.batches.list_batches()
for job in jobs:
    print(f"Job {job.id}: status={job.status}, progress={job.progress:.1%}")

Cancelling a Batch Job

from together import Together

client = Together()

cancelled_job = client.batches.cancel_batch("batch-abc123")
print(f"Job status after cancel request: {cancelled_job.status}")

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment