Implementation:Togethercomputer Together python Batches Get Batch
| Attribute | Value |
|---|---|
| Type | Implementation |
| Domains | Batch_Processing, Inference, API_Client |
| Repository | togethercomputer/together-python |
| Source | src/together/resources/batch.py:L42-88 |
| Last Updated | 2026-02-15 16:00 GMT |
Overview
Implementation of batch job monitoring via three methods on the Batches class: get_batch(), list_batches(), and cancel_batch(). These enable polling for job status, listing all jobs, and cancelling running jobs.
Code Reference
get_batch
def get_batch(self, batch_job_id: str) -> BatchJob:
requestor = api_requestor.APIRequestor(
client=self._client,
)
response, _, _ = requestor.request(
options=TogetherRequest(
method="GET",
url=f"batches/{batch_job_id}",
),
stream=False,
)
assert isinstance(response, TogetherResponse)
return BatchJob(**response.data)
Source: src/together/resources/batch.py:L42-56
list_batches
def list_batches(self) -> List[BatchJob]:
requestor = api_requestor.APIRequestor(
client=self._client,
)
response, _, _ = requestor.request(
options=TogetherRequest(
method="GET",
url="batches",
),
stream=False,
)
assert isinstance(response, TogetherResponse)
jobs = response.data or []
return [BatchJob(**job) for job in jobs]
Source: src/together/resources/batch.py:L58-73
cancel_batch
def cancel_batch(self, batch_job_id: str) -> BatchJob:
requestor = api_requestor.APIRequestor(
client=self._client,
)
response, _, _ = requestor.request(
options=TogetherRequest(
method="POST",
url=f"batches/{batch_job_id}/cancel",
),
stream=False,
)
return BatchJob(**response.data)
Source: src/together/resources/batch.py:L75-88
API Signatures
Batches.get_batch(batch_job_id: str) -> BatchJob
Batches.list_batches() -> List[BatchJob]
Batches.cancel_batch(batch_job_id: str) -> BatchJob
| Method | Parameters | HTTP Method | Endpoint | Return Type |
|---|---|---|---|---|
get_batch |
batch_job_id: str |
GET | batches/{batch_job_id} |
BatchJob
|
list_batches |
(none) | GET | batches |
List[BatchJob]
|
cancel_batch |
batch_job_id: str |
POST | batches/{batch_job_id}/cancel |
BatchJob
|
I/O Contract
Input (get_batch):
batch_job_id-- The unique identifier of the batch job to retrieve.
Input (cancel_batch):
batch_job_id-- The unique identifier of the batch job to cancel.
Output: A BatchJob object (or list of BatchJob objects for list_batches) with the following key fields:
| Field | Type | Description |
|---|---|---|
id |
string | Unique batch job identifier |
status |
BatchJobStatus | Current job status |
progress |
float | Completion progress (0.0 to 1.0) |
output_file_id |
string or None | Output file ID (available when COMPLETED)
|
error_file_id |
string or None | Error file ID (if errors occurred) |
error |
string or None | Error message (if the job failed) |
completed_at |
datetime or None | Completion timestamp |
input_file_id |
string | ID of the input file |
endpoint |
string | The target endpoint |
created_at |
datetime | Job creation timestamp |
job_deadline |
datetime | Deadline for job completion |
Key Behavior
Response parsing differences:
get_batch()parses the response directly:BatchJob(**response.data).list_batches()handles an empty response gracefully:response.data or [], then iterates to construct eachBatchJob.cancel_batch()returns the updatedBatchJobwith transitional status (typicallyCANCELING).
No pagination: The list_batches() method does not accept pagination parameters. It returns all batch jobs for the account in a single response.
Usage Examples
Polling for Batch Job Completion
import time
from together import Together
client = Together()
batch_job_id = "batch-abc123"
while True:
batch_job = client.batches.get_batch(batch_job_id)
print(f"Status: {batch_job.status}, Progress: {batch_job.progress:.1%}")
if batch_job.status == "COMPLETED":
print(f"Output file ID: {batch_job.output_file_id}")
break
elif batch_job.status in ("FAILED", "EXPIRED", "CANCELLED"):
print(f"Job ended with status: {batch_job.status}")
if batch_job.error:
print(f"Error: {batch_job.error}")
break
time.sleep(60) # Poll every 60 seconds
Listing All Batch Jobs
from together import Together
client = Together()
jobs = client.batches.list_batches()
for job in jobs:
print(f"Job {job.id}: status={job.status}, progress={job.progress:.1%}")
Cancelling a Batch Job
from together import Together
client = Together()
cancelled_job = client.batches.cancel_batch("batch-abc123")
print(f"Job status after cancel request: {cancelled_job.status}")