Implementation:Anthropics Anthropic sdk python Messages Create
Appearance
| Knowledge Sources | |
|---|---|
| Domains | API_Client, LLM |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
This page documents the Messages.create() method, which is the primary entry point for sending messages to the Anthropic API. It handles parameter validation, timeout calculation, request transformation, HTTP dispatch, and response parsing for both streaming and non-streaming modes.
API Signature
class Messages(SyncAPIResource):
@required_args(["max_tokens", "messages", "model"], ["max_tokens", "messages", "model", "stream"])
def create(
self,
*,
max_tokens: int,
messages: Iterable[MessageParam],
model: ModelParam,
inference_geo: Optional[str] | Omit = omit,
metadata: MetadataParam | Omit = omit,
output_config: OutputConfigParam | Omit = omit,
service_tier: Literal["auto", "standard_only"] | Omit = omit,
stop_sequences: SequenceNotStr[str] | Omit = omit,
stream: Literal[False] | Literal[True] | Omit = omit,
system: Union[str, Iterable[TextBlockParam]] | Omit = omit,
temperature: float | Omit = omit,
thinking: ThinkingConfigParam | Omit = omit,
tool_choice: ToolChoiceParam | Omit = omit,
tools: Iterable[ToolUnionParam] | Omit = omit,
top_k: int | Omit = omit,
top_p: float | Omit = omit,
extra_headers: Headers | None = None,
extra_query: Query | None = None,
extra_body: Body | None = None,
timeout: float | httpx.Timeout | None | NotGiven = not_given,
) -> Message | Stream[RawMessageStreamEvent]
Source Location
- File:
src/anthropic/resources/messages/messages.py - Overloads: Lines 101-924 (three
@overloadsignatures for non-streaming, streaming, and bool-typed stream) - Implementation: Lines 926-1003
- Async variant:
AsyncMessages.createin the same file
Import
from anthropic import Anthropic
client = Anthropic()
# Access via the messages resource
response = client.messages.create(...)
Parameters
Required Parameters
| Parameter | Type | Description |
|---|---|---|
max_tokens |
int |
Maximum tokens to generate. Models may stop earlier. |
messages |
Iterable[MessageParam] |
Conversation history as alternating user/assistant messages. |
model |
ModelParam |
Model identifier (e.g., "claude-sonnet-4-20250514").
|
Optional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
stream |
Literal[True] | Omit | omit (False) |
Enables SSE streaming response. |
system |
Omit | omit |
System prompt for context and instructions. |
temperature |
Omit | omit (API default: 1.0) |
Randomness control (0.0-1.0). |
thinking |
Omit | omit |
Extended thinking configuration. |
tools |
Omit | omit |
Tool definitions for tool use. |
tool_choice |
Omit | omit |
How the model should use tools. |
stop_sequences |
Omit | omit |
Custom sequences that halt generation. |
top_k |
Omit | omit |
Top-K sampling parameter. |
top_p |
Omit | omit |
Nucleus sampling parameter. |
metadata |
Omit | omit |
Request metadata (user_id, etc.). |
timeout |
httpx.Timeout | None | NotGiven | not_given |
Per-request timeout override. |
Output
- Non-streaming (
stream=Falseor omitted): Returns aMessagePydantic model - Streaming (
stream=True): Returns aStream[RawMessageStreamEvent]iterable
Implementation Logic
@required_args(["max_tokens", "messages", "model"], ["max_tokens", "messages", "model", "stream"])
def create(self, *, max_tokens, messages, model, stream=omit, ...):
# 1. Auto-adjust timeout for non-streaming requests
if not stream and not is_given(timeout) and self._client.timeout == DEFAULT_TIMEOUT:
timeout = self._client._calculate_nonstreaming_timeout(
max_tokens, MODEL_NONSTREAMING_TOKENS.get(model, None)
)
# 2. Warn about deprecated models
if model in DEPRECATED_MODELS:
warnings.warn(
f"The model '{model}' is deprecated and will reach end-of-life on "
f"{DEPRECATED_MODELS[model]}...",
DeprecationWarning, stacklevel=3,
)
# 3. Warn about thinking configuration for specific models
if model in MODELS_TO_WARN_WITH_THINKING_ENABLED and thinking and thinking["type"] == "enabled":
warnings.warn("Use 'thinking.type=adaptive' instead...", UserWarning, stacklevel=3)
# 4. Transform parameters and POST to /v1/messages
return self._post(
"/v1/messages",
body=maybe_transform(
{"max_tokens": max_tokens, "messages": messages, "model": model, ...},
MessageCreateParamsStreaming if stream else MessageCreateParamsNonStreaming,
),
options=make_request_options(
extra_headers=extra_headers, extra_query=extra_query,
extra_body=extra_body, timeout=timeout,
),
cast_to=Message,
stream=stream or False,
stream_cls=Stream[RawMessageStreamEvent],
)
Overload Signatures
The method provides three @overload signatures for precise return-type narrowing:
# Overload 1: Non-streaming (default)
@overload
def create(self, *, ..., stream: Literal[False] | Omit = omit, ...) -> Message: ...
# Overload 2: Streaming
@overload
def create(self, *, ..., stream: Literal[True], ...) -> Stream[RawMessageStreamEvent]: ...
# Overload 3: Dynamic stream parameter
@overload
def create(self, *, ..., stream: bool, ...) -> Message | Stream[RawMessageStreamEvent]: ...
Async Variant
AsyncMessages.create() provides the identical interface but returns coroutines:
import asyncio
from anthropic import AsyncAnthropic
async def main():
client = AsyncAnthropic()
message = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}],
)
print(message.content[0].text)
asyncio.run(main())
Usage Examples
from anthropic import Anthropic
client = Anthropic()
# Basic non-streaming request
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Hello, Claude!"}
],
)
print(message.content[0].text)
# With system prompt and temperature
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system="You are a helpful coding assistant.",
temperature=0.3,
messages=[
{"role": "user", "content": "Write a Python fibonacci function."}
],
)
# Streaming request
with client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
stream=True,
messages=[{"role": "user", "content": "Tell me a story."}],
) as stream:
for event in stream:
# Process each RawMessageStreamEvent
pass
Dependencies
- httpx -- HTTP transport (
httpx.Timeout, request execution) - pydantic -- Response model parsing (
Message) - anthropic._streaming --
Stream,AsyncStreamwrappers - anthropic._base_client --
make_request_options,SyncAPIResource._post - anthropic._utils --
maybe_transform,required_args
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment