Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Dagster io Dagster OpenAI Resource Wrapper

From Leeroopedia


Field Value
Implementation Name OpenAI Resource Wrapper
Source python_modules/libraries/dagster-openai/dagster_openai/resources.py:L162
Repository dagster-io/dagster
Domains AI, API_Integration

Overview

Concrete resource wrapper for the OpenAI API provided by the dagster-openai integration library.

Description

The OpenAIResource class extends ConfigurableResource to wrap the openai Python library. It provides Dagster-compatible configuration for the OpenAI API key and optional parameters (organization, project, base URL). The resource yields an openai.Client instance via its get_client() context manager and automatically wraps API endpoints (Completions, Chat, Embeddings) to log usage metadata (token counts, call counts) in the asset metadata.

Usage

Use OpenAIResource when your pipeline needs to interact with the OpenAI API. The resource handles authentication, client initialization, and optional usage tracking. Configure it once in your Definitions and inject it into any asset that needs OpenAI access.

Code Reference

Source Location

python_modules/libraries/dagster-openai/dagster_openai/resources.py:L162

Signature

class OpenAIResource(ConfigurableResource):
    """Wrapper over the openai library."""

    api_key: str = Field(description="OpenAI API key. See https://platform.openai.com/api-keys")
    organization: Optional[str] = Field(default=None)
    project: Optional[str] = Field(default=None)
    base_url: Optional[str] = Field(default=None)

    def setup_for_execution(self, context: InitResourceContext) -> None:
        self._client = Client(
            api_key=self.api_key,
            organization=self.organization,
            project=self.project,
            base_url=self.base_url,
        )

    @contextmanager
    def get_client(
        self, context: Union[AssetExecutionContext, AssetCheckExecutionContext, OpExecutionContext]
    ) -> Generator[Client, None, None]:
        """Yields an openai.Client for interacting with the OpenAI API."""
        ...

Import

from dagster_openai import OpenAIResource

I/O Contract

Direction Name Type Description
Input api_key str (via EnvVar) OpenAI API key for authentication
Input organization Optional[str] OpenAI organization ID (optional)
Input project Optional[str] OpenAI project ID (optional)
Input base_url Optional[str] Custom base URL for the OpenAI API (optional)
Output Client openai.Client Configured OpenAI client yielded via get_client() context manager

Usage Examples

Basic Configuration

import dagster as dg
from dagster_openai import OpenAIResource

defs = dg.Definitions(
    resources={
        "openai": OpenAIResource(api_key=dg.EnvVar("OPENAI_API_KEY")),
    },
)

Asset Using OpenAI

from dagster import AssetExecutionContext, asset
from dagster_openai import OpenAIResource

@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": "Say this is a test"}],
        )

Embeddings Example

import dagster as dg
from dagster_openai import OpenAIResource

@dg.asset(kinds={"openai"})
def embeddings(context: dg.AssetExecutionContext, openai: OpenAIResource):
    with openai.get_client(context) as client:
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=["Hello world"],
        )
    return dg.MaterializeResult(
        metadata={"dimensions": len(response.data[0].embedding)}
    )

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment