Implementation:Groq Groq python Completions Create Stream
| Knowledge Sources | |
|---|---|
| Domains | NLP, Streaming |
| Last Updated | 2026-02-15 16:00 GMT |
Overview
Concrete tool for executing streaming chat completion requests with token-by-token SSE delivery provided by the Groq Python SDK.
Description
When stream=True is passed to Completions.create(), the method returns a Stream[ChatCompletionChunk] (sync) or AsyncStream[ChatCompletionChunk] (async) instead of a ChatCompletion object. The Stream wraps the SSE byte stream from the HTTP response and yields parsed ChatCompletionChunk objects as they arrive.
The method uses @overload decorators to provide distinct type signatures for stream=True vs stream=False.
Usage
Use this when you need real-time token delivery. Access via client.chat.completions.create(stream=True). Iterate over the returned Stream object using a for loop (sync) or async for (async).
Code Reference
Source Location
- Repository: groq-python
- File: src/groq/resources/chat/completions.py
- Lines: L115-176 (stream=True overload), L241-509 (implementation)
Signature
class Completions(SyncAPIResource):
@overload
def create(
self,
*,
messages: Iterable[ChatCompletionMessageParam],
model: Union[str, Literal[...]],
stream: Literal[True],
temperature: Optional[float] | Omit = omit,
max_completion_tokens: Optional[int] | Omit = omit,
# ... other optional parameters
) -> Stream[ChatCompletionChunk]:
...
Import
from groq import Groq
# Access via: client.chat.completions.create(stream=True)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| messages | Iterable[ChatCompletionMessageParam] | Yes | The conversation messages |
| model | str or Literal[...] | Yes | Model identifier |
| stream | Literal[True] | Yes | Must be True to enable streaming |
| temperature | Optional[float] | No | Sampling temperature 0-2 |
| max_completion_tokens | Optional[int] | No | Max tokens to generate |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Stream[ChatCompletionChunk] | Iterable stream of chunk objects; use for loop to iterate |
Usage Examples
Basic Streaming
from groq import Groq
client = Groq()
stream = client.chat.completions.create(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing"},
],
model="llama-3.3-70b-versatile",
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="")
Async Streaming
import asyncio
from groq import AsyncGroq
async def main():
client = AsyncGroq()
stream = await client.chat.completions.create(
messages=[{"role": "user", "content": "Write a haiku"}],
model="llama-3.3-70b-versatile",
stream=True,
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="")
asyncio.run(main())