Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator BaseSyntheticStage

From Leeroopedia
Knowledge Sources
Domains Synthetic Data, LLM Integration, Async Processing
Last Updated 2026-02-14 00:00 GMT

Overview

BaseSyntheticStage is the base processing stage for LLM-based synthetic data generation in the Nemotron-CC pipeline, handling prompt formatting, synchronous and asynchronous LLM client interaction, and response processing.

Description

BaseSyntheticStage extends ProcessingStage[DocumentBatch, DocumentBatch] as a Python dataclass. It provides the core logic for generating synthetic text data by sending formatted prompts to an LLM and collecting the responses into a DataFrame column.

The stage accepts a system_prompt, a prompt template (which uses {document} as a placeholder for the input field value), an input_field to read from, and an output_field to write results to. It supports both synchronous (LLMClient) and asynchronous (AsyncLLMClient) clients. On __post_init__, it detects the client type and sets the is_async_client flag accordingly.

The process() method converts the batch to a pandas DataFrame, generates LLM responses for each row (either sequentially or concurrently), stores the results in the output field, and returns a new DocumentBatch.

For async processing, the stage handles edge cases where an event loop is already running (e.g., inside Ray async actors) by spawning a separate thread with its own event loop.

Usage

Use BaseSyntheticStage as the parent class when creating new LLM-based synthetic data generation stages. Subclasses only need to specify the system_prompt, prompt template, input_field, and output_field. Existing subclasses include WikipediaParaphrasingStage, DiverseQAStage, DistillStage, ExtractKnowledgeStage, and KnowledgeListStage.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/synthetic/nemotron_cc/base.py
  • Lines: 1-157

Signature

@dataclass
class BaseSyntheticStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    system_prompt: str = None
    prompt: str = None
    input_field: str = None
    output_field: str = None
    client: AsyncLLMClient | LLMClient = None
    model_name: str = None
    generation_config: GenerationConfig | None = None
    name: str = "NemotronCCBaseStage"

    def __post_init__(self) -> None: ...
    def inputs(self) -> tuple[list[str], list[str]]: ...
    def outputs(self) -> tuple[list[str], list[str]]: ...
    def setup(self, _: WorkerMetadata | None = None) -> None: ...
    def process(self, batch: DocumentBatch) -> DocumentBatch: ...
    def _process_llm_prompt(self, sample: dict) -> str: ...
    def _process_llm_response(self, response: list[str]) -> str: ...
    def _process_sync(self, df: pd.DataFrame) -> list[str]: ...
    def _process_async(self, df: pd.DataFrame) -> list[str]: ...
    async def _generate_responses_async(self, df: pd.DataFrame) -> list[str]: ...

Import

from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage

I/O Contract

Inputs

Name Type Required Description
system_prompt str No System prompt sent to the LLM. If None, only a user message is sent.
prompt str Yes Prompt template with a {document} placeholder that gets substituted with the input field value.
input_field str Yes Name of the DataFrame column to read source text from.
output_field str Yes Name of the DataFrame column to write generated text to.
client AsyncLLMClient or LLMClient Yes The LLM client instance used for querying the model.
model_name str Yes Name of the LLM model to query.
generation_config GenerationConfig No Optional configuration for controlling generation parameters (temperature, max tokens, etc.).
batch DocumentBatch Yes The input document batch containing source text in the input field.

Outputs

Name Type Description
result DocumentBatch A document batch with the original data plus a new column containing LLM-generated responses in the output field.

Key Implementation Details

Prompt Formatting

The _process_llm_prompt method formats the prompt template by substituting the input field value:

def _process_llm_prompt(self, sample: dict) -> str:
    if self.input_field not in sample:
        msg = f"Expected input field '{self.input_field}' in sample."
        raise KeyError(msg)
    return self.prompt.format(document=sample[self.input_field])

Synchronous vs Asynchronous Processing

The stage routes processing based on the detected client type:

def process(self, batch: DocumentBatch) -> DocumentBatch:
    df = batch.to_pandas()
    responses = self._process_async(df) if self.is_async_client else self._process_sync(df)
    df[self.output_field] = responses
    return DocumentBatch(...)

Synchronous processing iterates row-by-row with df.apply(). Asynchronous processing uses asyncio.gather to submit all requests concurrently.

Nested Event Loop Handling

The async path detects whether a running event loop already exists (e.g., inside Ray async actors) and falls back to a thread-based approach:

def _process_async(self, df: pd.DataFrame) -> list[str]:
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(self._generate_responses_async(df))
    # Fallback: run in a separate thread with its own event loop
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(asyncio.run, self._generate_responses_async(df))
        return future.result()

Message Construction

When a system_prompt is provided, messages include both system and user roles. Otherwise, only a user message is sent:

if self.system_prompt:
    messages = [
        {"role": "system", "content": self.system_prompt},
        {"role": "user", "content": prompt},
    ]
else:
    messages = [{"role": "user", "content": prompt}]

Usage Examples

Creating a Custom Synthetic Stage

from dataclasses import dataclass
from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage

@dataclass
class MySummarizationStage(BaseSyntheticStage):
    system_prompt: str = "You are a helpful summarizer."
    prompt: str = "Summarize the following text:\n\n{document}"
    input_field: str = "text"
    output_field: str = "summary"

Using with an Async Client

from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage
from nemo_curator.models.client.llm_client import AsyncLLMClient

stage = BaseSyntheticStage(
    system_prompt="You are a helpful assistant.",
    prompt="Rewrite this text:\n\n{document}",
    input_field="text",
    output_field="rewritten",
    client=my_async_client,
    model_name="nemotron-4-340b",
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment