Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepset ai Haystack OpenAIChatGenerator

From Leeroopedia

Overview

OpenAIChatGenerator is a Haystack component that completes chats using OpenAI's large language models. It is a wrapper around the OpenAI Chat Completions API, accepting a list of ChatMessage objects as input and returning generated ChatMessage replies. It supports the gpt-4 and gpt-5 series models, streaming responses, tool/function calling, structured output formats, and asynchronous execution.

Source Location

  • File: haystack/components/generators/chat/openai.py (Lines 54-300+)
  • Class: OpenAIChatGenerator
  • Component decorator: @component

Import

from haystack.components.generators.chat import OpenAIChatGenerator

External Dependencies

  • openai (Python package): Provides OpenAI, AsyncOpenAI, ChatCompletion, ChatCompletionChunk, ParsedChatCompletion, Stream, and AsyncStream types.
  • pydantic: Used for structured output response format validation via BaseModel.

Constructor

def __init__(
    self,
    api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
    model: str = "gpt-5-mini",
    streaming_callback: StreamingCallbackT | None = None,
    api_base_url: str | None = None,
    organization: str | None = None,
    generation_kwargs: dict[str, Any] | None = None,
    timeout: float | None = None,
    max_retries: int | None = None,
    tools: ToolsType | None = None,
    tools_strict: bool = False,
    http_client_kwargs: dict[str, Any] | None = None,
)

Parameters

  • api_key (Secret): The OpenAI API key. Defaults to reading from the OPENAI_API_KEY environment variable.
  • model (str): The model name to use. Defaults to "gpt-5-mini".
  • streaming_callback (StreamingCallbackT | None): Callback function invoked for each new token during streaming. Receives a StreamingChunk argument.
  • api_base_url (str | None): Optional custom base URL for the API.
  • organization (str | None): Optional OpenAI Organization ID.
  • generation_kwargs (dict[str, Any] | None): Additional parameters passed directly to the OpenAI API. Supported parameters include max_completion_tokens, temperature, top_p, n, stop, presence_penalty, frequency_penalty, logit_bias, and response_format.
  • timeout (float | None): Request timeout in seconds. Defaults to the OPENAI_TIMEOUT environment variable or 30 seconds.
  • max_retries (int | None): Maximum retry attempts on internal errors. Defaults to the OPENAI_MAX_RETRIES environment variable or 5.
  • tools (ToolsType | None): A list of Tool and/or Toolset objects, or a single Toolset, that the model can call.
  • tools_strict (bool): Whether to enable strict schema adherence for tool calls. When True, the model follows tool parameter schemas exactly, but latency may increase. Defaults to False.
  • http_client_kwargs (dict[str, Any] | None): Keyword arguments for configuring custom httpx.Client or httpx.AsyncClient.

Initialization Behavior

  • Resolves the API key and configures timeout/max_retries from environment variables if not explicitly provided.
  • Creates both a synchronous OpenAI client and an asynchronous AsyncOpenAI client.
  • Validates that no duplicate tool names exist among the provided tools.

Run Method (Synchronous)

@component.output_types(replies=list[ChatMessage])
def run(
    self,
    messages: list[ChatMessage],
    streaming_callback: StreamingCallbackT | None = None,
    generation_kwargs: dict[str, Any] | None = None,
    *,
    tools: ToolsType | None = None,
    tools_strict: bool | None = None,
) -> dict:  # Returns {"replies": list[ChatMessage]}

Parameters

  • messages (list[ChatMessage]): The input chat messages representing the conversation history.
  • streaming_callback (StreamingCallbackT | None): Optional runtime streaming callback.
  • generation_kwargs (dict[str, Any] | None): Optional runtime generation parameters that override initialization parameters.
  • tools (ToolsType | None): Optional runtime tools that override initialization tools.
  • tools_strict (bool | None): Optional runtime strict mode override for tool calls.

Returns

  • {"replies": list[ChatMessage]}: A dictionary containing the generated responses as ChatMessage objects. Each reply includes the assistant's text (and/or tool calls) and metadata such as model, index, finish_reason, and usage.

Behavior

  1. Warms up tools if not already warmed up.
  2. Returns an empty replies list if no messages are provided.
  3. Merges initialization and runtime generation kwargs.
  4. Converts messages to OpenAI's expected format.
  5. Prepares tool definitions if tools are configured, optionally enforcing strict schemas.
  6. Selects the appropriate API endpoint:
    • chat.completions.parse for structured output (non-streaming with response_format).
    • chat.completions.create for all other cases.
  7. For streaming: Processes chunks through the callback and assembles them into a final ChatMessage. Limited to n=1.
  8. For non-streaming: Converts each Choice to a ChatMessage.
  9. Checks finish reasons and logs warnings for truncation or content filtering.

Run Method (Asynchronous)

@component.output_types(replies=list[ChatMessage])
async def run_async(
    self,
    messages: list[ChatMessage],
    streaming_callback: StreamingCallbackT | None = None,
    generation_kwargs: dict[str, Any] | None = None,
    *,
    tools: ToolsType | None = None,
    tools_strict: bool | None = None,
) -> dict:  # Returns {"replies": list[ChatMessage]}

The asynchronous variant uses the AsyncOpenAI client and supports AsyncStreamingCallbackT callbacks. It handles asyncio.CancelledError gracefully by ensuring the stream is properly closed using asyncio.shield.

Tool Call Processing

When the model generates tool calls, OpenAIChatGenerator extracts them from the API response:

  • Each tool call is converted to a ToolCall dataclass with id, tool_name, and arguments.
  • Malformed JSON arguments are logged as warnings and the tool call is skipped.
  • During streaming, tool calls are delivered incrementally as ToolCallDelta objects via the streaming callback.

Serialization

def to_dict(self) -> dict[str, Any]

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "OpenAIChatGenerator"

Supports full serialization and deserialization. Pydantic models used as response_format are converted to strict JSON schemas during serialization. Tools and streaming callbacks are serialized by their respective serialization functions.

Usage Example

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

messages = [ChatMessage.from_user("What's Natural Language Processing?")]

client = OpenAIChatGenerator()
response = client.run(messages)
print(response)
# {'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
#   _content=[TextContent(text="Natural Language Processing (NLP) is a branch of AI...")],
#   _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop',
#          'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})]}

API Wrapper Note

This component is a wrapper around the OpenAI Chat Completions API. It translates Haystack's ChatMessage objects to OpenAI's message format, handles the API call (including streaming, tool calling, and structured outputs), and converts the response back to Haystack ChatMessage objects with full metadata.

Related Pages

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment