Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Anthropics Anthropic sdk python Provider API Interaction Impl

From Leeroopedia
Knowledge Sources
Domains Cloud_Deployment, LLM, Infrastructure
Last Updated 2026-02-15 00:00 GMT

Overview

This page documents how the same Messages.create() and Messages.stream() methods are used identically across all cloud provider clients, with provider-specific request and response handling hidden behind the shared interface.

Type: Wrapper Doc

Source: src/anthropic/resources/messages/messages.py:L927-1116

API: Messages.create()

The create() method is defined once in the shared Messages resource class and used by all provider clients:

def create(
    self,
    *,
    max_tokens: int,
    messages: Iterable[MessageParam],
    model: ModelParam,
    inference_geo: Optional[str] | Omit = omit,
    metadata: MetadataParam | Omit = omit,
    output_config: OutputConfigParam | Omit = omit,
    service_tier: Literal["auto", "standard_only"] | Omit = omit,
    stop_sequences: SequenceNotStr[str] | Omit = omit,
    stream: Literal[False] | Literal[True] | Omit = omit,
    system: Union[str, Iterable[TextBlockParam]] | Omit = omit,
    temperature: float | Omit = omit,
    thinking: ThinkingConfigParam | Omit = omit,
    tool_choice: ToolChoiceParam | Omit = omit,
    tools: Iterable[ToolUnionParam] | Omit = omit,
    top_k: int | Omit = omit,
    top_p: float | Omit = omit,
    extra_headers: Headers | None = None,
    extra_query: Query | None = None,
    extra_body: Body | None = None,
    timeout: float | httpx.Timeout | None | NotGiven = not_given,
) -> Message | Stream[RawMessageStreamEvent]:

The method always posts to /v1/messages. The provider client's _prepare_options() hook rewrites this URL before the request is dispatched.

API: Messages.stream()

The stream() method provides a higher-level streaming interface:

def stream(
    self,
    *,
    max_tokens: int,
    messages: Iterable[MessageParam],
    model: ModelParam,
    inference_geo: Optional[str] | Omit = omit,
    metadata: MetadataParam | Omit = omit,
    output_config: OutputConfigParam | Omit = omit,
    output_format: None | JSONOutputFormatParam | type[ResponseFormatT] | Omit = omit,
    container: Optional[str] | Omit = omit,
    service_tier: Literal["auto", "standard_only"] | Omit = omit,
    stop_sequences: SequenceNotStr[str] | Omit = omit,
    system: Union[str, Iterable[TextBlockParam]] | Omit = omit,
    temperature: float | Omit = omit,
    top_k: int | Omit = omit,
    top_p: float | Omit = omit,
    thinking: ThinkingConfigParam | Omit = omit,
    tool_choice: ToolChoiceParam | Omit = omit,
    tools: Iterable[ToolUnionParam] | Omit = omit,
    extra_headers: Headers | None = None,
    extra_query: Query | None = None,
    extra_body: Body | None = None,
    timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
) -> MessageStreamManager[ResponseFormatT]:

Returns a MessageStreamManager context manager for ergonomic event iteration.

Usage Examples Across Providers

AWS Bedrock

from anthropic import AnthropicBedrock

client = AnthropicBedrock(
    aws_access_key="...",
    aws_secret_key="...",
    aws_region="us-east-1",
)

# Non-streaming
message = client.messages.create(
    model="us.anthropic.claude-sonnet-4-20250514-v1:0",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello from Bedrock!"}],
)
print(message.content[0].text)

# Streaming
with client.messages.stream(
    model="us.anthropic.claude-sonnet-4-20250514-v1:0",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Stream from Bedrock!"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Google Cloud Vertex AI

from anthropic import AnthropicVertex

client = AnthropicVertex(
    region="us-east5",
    project_id="my-project",
)

# Non-streaming
message = client.messages.create(
    model="claude-sonnet-4@20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello from Vertex!"}],
)
print(message.content[0].text)

# Streaming
with client.messages.stream(
    model="claude-sonnet-4@20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Stream from Vertex!"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Azure AI Foundry

from anthropic import AnthropicFoundry

client = AnthropicFoundry(
    resource="my-resource",
    api_key="...",
)

# Non-streaming
message = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello from Azure!"}],
)
print(message.content[0].text)

# Streaming
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Stream from Azure!"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Streaming Protocol Differences

Bedrock: AWS EventStream

Bedrock uses the AWS EventStream binary protocol instead of standard SSE. The SDK handles this via a custom AWSEventStreamDecoder (from src/anthropic/lib/bedrock/_stream_decoder.py):

class AWSEventStreamDecoder:
    def __init__(self) -> None:
        from botocore.parsers import EventStreamJSONParser
        self.parser = EventStreamJSONParser()

    def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
        from botocore.eventstream import EventStreamBuffer
        event_stream_buffer = EventStreamBuffer()
        for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

    async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
        from botocore.eventstream import EventStreamBuffer
        event_stream_buffer = EventStreamBuffer()
        async for chunk in iterator:
            event_stream_buffer.add_data(chunk)
            for event in event_stream_buffer:
                message = self._parse_message_from_event(event)
                if message:
                    yield ServerSentEvent(data=message, event="completion")

The AnthropicBedrock client returns this decoder from _make_sse_decoder(), replacing the standard SSE decoder in the streaming pipeline.

Vertex AI and Foundry: Standard SSE

Both Vertex AI and Foundry use standard Server-Sent Events, which is the same protocol used by the direct Anthropic API. No custom decoder is needed.

Request Lifecycle

The complete request lifecycle for a provider API call:

  1. Caller invokes client.messages.create(model=..., ...).
  2. Messages.create() constructs FinalRequestOptions with URL /v1/messages and the full request body.
  3. The provider client's _prepare_options() intercepts the options:
    • Rewrites the URL path (Bedrock and Vertex).
    • Injects anthropic_version into the body (Bedrock and Vertex).
    • Injects authentication headers (Foundry).
  4. The provider client's _prepare_request() signs or authenticates the request:
    • Bedrock: SigV4 signing.
    • Vertex: Bearer token injection.
    • Foundry: Handled in _prepare_options().
  5. The HTTP request is dispatched via httpx.
  6. For streaming, the response bytes are decoded by the appropriate decoder (AWS EventStream or SSE).
  7. The response is parsed into Message or Stream[RawMessageStreamEvent] objects.

Provider Feature Support Matrix

Feature Direct API Bedrock Vertex Foundry
messages.create() Yes Yes Yes Yes
messages.stream() Yes Yes Yes Yes
messages.count_tokens() Yes No Yes Yes
Message Batches Yes No No No
Models endpoint Yes N/A N/A No (returns None)

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment