Implementation:Deepset ai Haystack OpenAIChatGenerator
Overview
OpenAIChatGenerator is a Haystack component that completes chats using OpenAI's large language models. It is a wrapper around the OpenAI Chat Completions API, accepting a list of ChatMessage objects as input and returning generated ChatMessage replies. It supports the gpt-4 and gpt-5 series models, streaming responses, tool/function calling, structured output formats, and asynchronous execution.
Source Location
- File:
haystack/components/generators/chat/openai.py(Lines 54-300+) - Class:
OpenAIChatGenerator - Component decorator:
@component
Import
from haystack.components.generators.chat import OpenAIChatGenerator
External Dependencies
- openai (Python package): Provides
OpenAI,AsyncOpenAI,ChatCompletion,ChatCompletionChunk,ParsedChatCompletion,Stream, andAsyncStreamtypes. - pydantic: Used for structured output response format validation via
BaseModel.
Constructor
def __init__(
self,
api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
model: str = "gpt-5-mini",
streaming_callback: StreamingCallbackT | None = None,
api_base_url: str | None = None,
organization: str | None = None,
generation_kwargs: dict[str, Any] | None = None,
timeout: float | None = None,
max_retries: int | None = None,
tools: ToolsType | None = None,
tools_strict: bool = False,
http_client_kwargs: dict[str, Any] | None = None,
)
Parameters
- api_key (
Secret): The OpenAI API key. Defaults to reading from theOPENAI_API_KEYenvironment variable. - model (
str): The model name to use. Defaults to"gpt-5-mini". - streaming_callback (
StreamingCallbackT | None): Callback function invoked for each new token during streaming. Receives aStreamingChunkargument. - api_base_url (
str | None): Optional custom base URL for the API. - organization (
str | None): Optional OpenAI Organization ID. - generation_kwargs (
dict[str, Any] | None): Additional parameters passed directly to the OpenAI API. Supported parameters includemax_completion_tokens,temperature,top_p,n,stop,presence_penalty,frequency_penalty,logit_bias, andresponse_format. - timeout (
float | None): Request timeout in seconds. Defaults to theOPENAI_TIMEOUTenvironment variable or 30 seconds. - max_retries (
int | None): Maximum retry attempts on internal errors. Defaults to theOPENAI_MAX_RETRIESenvironment variable or 5. - tools (
ToolsType | None): A list ofTooland/orToolsetobjects, or a singleToolset, that the model can call. - tools_strict (
bool): Whether to enable strict schema adherence for tool calls. WhenTrue, the model follows tool parameter schemas exactly, but latency may increase. Defaults toFalse. - http_client_kwargs (
dict[str, Any] | None): Keyword arguments for configuring customhttpx.Clientorhttpx.AsyncClient.
Initialization Behavior
- Resolves the API key and configures timeout/max_retries from environment variables if not explicitly provided.
- Creates both a synchronous
OpenAIclient and an asynchronousAsyncOpenAIclient. - Validates that no duplicate tool names exist among the provided tools.
Run Method (Synchronous)
@component.output_types(replies=list[ChatMessage])
def run(
self,
messages: list[ChatMessage],
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict[str, Any] | None = None,
*,
tools: ToolsType | None = None,
tools_strict: bool | None = None,
) -> dict: # Returns {"replies": list[ChatMessage]}
Parameters
- messages (
list[ChatMessage]): The input chat messages representing the conversation history. - streaming_callback (
StreamingCallbackT | None): Optional runtime streaming callback. - generation_kwargs (
dict[str, Any] | None): Optional runtime generation parameters that override initialization parameters. - tools (
ToolsType | None): Optional runtime tools that override initialization tools. - tools_strict (
bool | None): Optional runtime strict mode override for tool calls.
Returns
{"replies": list[ChatMessage]}: A dictionary containing the generated responses asChatMessageobjects. Each reply includes the assistant's text (and/or tool calls) and metadata such asmodel,index,finish_reason, andusage.
Behavior
- Warms up tools if not already warmed up.
- Returns an empty replies list if no messages are provided.
- Merges initialization and runtime generation kwargs.
- Converts messages to OpenAI's expected format.
- Prepares tool definitions if tools are configured, optionally enforcing strict schemas.
- Selects the appropriate API endpoint:
chat.completions.parsefor structured output (non-streaming withresponse_format).chat.completions.createfor all other cases.
- For streaming: Processes chunks through the callback and assembles them into a final
ChatMessage. Limited ton=1. - For non-streaming: Converts each
Choiceto aChatMessage. - Checks finish reasons and logs warnings for truncation or content filtering.
Run Method (Asynchronous)
@component.output_types(replies=list[ChatMessage])
async def run_async(
self,
messages: list[ChatMessage],
streaming_callback: StreamingCallbackT | None = None,
generation_kwargs: dict[str, Any] | None = None,
*,
tools: ToolsType | None = None,
tools_strict: bool | None = None,
) -> dict: # Returns {"replies": list[ChatMessage]}
The asynchronous variant uses the AsyncOpenAI client and supports AsyncStreamingCallbackT callbacks. It handles asyncio.CancelledError gracefully by ensuring the stream is properly closed using asyncio.shield.
Tool Call Processing
When the model generates tool calls, OpenAIChatGenerator extracts them from the API response:
- Each tool call is converted to a
ToolCalldataclass withid,tool_name, andarguments. - Malformed JSON arguments are logged as warnings and the tool call is skipped.
- During streaming, tool calls are delivered incrementally as
ToolCallDeltaobjects via the streaming callback.
Serialization
def to_dict(self) -> dict[str, Any]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "OpenAIChatGenerator"
Supports full serialization and deserialization. Pydantic models used as response_format are converted to strict JSON schemas during serialization. Tools and streaming callbacks are serialized by their respective serialization functions.
Usage Example
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
messages = [ChatMessage.from_user("What's Natural Language Processing?")]
client = OpenAIChatGenerator()
response = client.run(messages)
print(response)
# {'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
# _content=[TextContent(text="Natural Language Processing (NLP) is a branch of AI...")],
# _meta={'model': 'gpt-5-mini', 'index': 0, 'finish_reason': 'stop',
# 'usage': {'prompt_tokens': 15, 'completion_tokens': 36, 'total_tokens': 51}})]}
API Wrapper Note
This component is a wrapper around the OpenAI Chat Completions API. It translates Haystack's ChatMessage objects to OpenAI's message format, handles the API call (including streaming, tool calling, and structured outputs), and converts the response back to Haystack ChatMessage objects with full metadata.