Implementation:Dagster io Dagster OpenAI Resource Wrapper
| Field | Value |
|---|---|
| Implementation Name | OpenAI Resource Wrapper |
| Source | python_modules/libraries/dagster-openai/dagster_openai/resources.py:L162
|
| Repository | dagster-io/dagster |
| Domains | AI, API_Integration |
Overview
Concrete resource wrapper for the OpenAI API provided by the dagster-openai integration library.
Description
The OpenAIResource class extends ConfigurableResource to wrap the openai Python library. It provides Dagster-compatible configuration for the OpenAI API key and optional parameters (organization, project, base URL). The resource yields an openai.Client instance via its get_client() context manager and automatically wraps API endpoints (Completions, Chat, Embeddings) to log usage metadata (token counts, call counts) in the asset metadata.
Usage
Use OpenAIResource when your pipeline needs to interact with the OpenAI API. The resource handles authentication, client initialization, and optional usage tracking. Configure it once in your Definitions and inject it into any asset that needs OpenAI access.
Code Reference
Source Location
python_modules/libraries/dagster-openai/dagster_openai/resources.py:L162
Signature
class OpenAIResource(ConfigurableResource):
"""Wrapper over the openai library."""
api_key: str = Field(description="OpenAI API key. See https://platform.openai.com/api-keys")
organization: Optional[str] = Field(default=None)
project: Optional[str] = Field(default=None)
base_url: Optional[str] = Field(default=None)
def setup_for_execution(self, context: InitResourceContext) -> None:
self._client = Client(
api_key=self.api_key,
organization=self.organization,
project=self.project,
base_url=self.base_url,
)
@contextmanager
def get_client(
self, context: Union[AssetExecutionContext, AssetCheckExecutionContext, OpExecutionContext]
) -> Generator[Client, None, None]:
"""Yields an openai.Client for interacting with the OpenAI API."""
...
Import
from dagster_openai import OpenAIResource
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | api_key | str (via EnvVar) |
OpenAI API key for authentication |
| Input | organization | Optional[str] |
OpenAI organization ID (optional) |
| Input | project | Optional[str] |
OpenAI project ID (optional) |
| Input | base_url | Optional[str] |
Custom base URL for the OpenAI API (optional) |
| Output | Client | openai.Client |
Configured OpenAI client yielded via get_client() context manager
|
Usage Examples
Basic Configuration
import dagster as dg
from dagster_openai import OpenAIResource
defs = dg.Definitions(
resources={
"openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
},
)
Asset Using OpenAI
from dagster import AssetExecutionContext, asset
from dagster_openai import OpenAIResource
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Say this is a test"}],
)
Embeddings Example
import dagster as dg
from dagster_openai import OpenAIResource
@dg.asset(kinds={"openai"})
def embeddings(context: dg.AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
response = client.embeddings.create(
model="text-embedding-3-small",
input=["Hello world"],
)
return dg.MaterializeResult(
metadata={"dimensions": len(response.data[0].embedding)}
)