Implementation:NVIDIA NeMo Curator BaseSyntheticStage
| Knowledge Sources | |
|---|---|
| Domains | Synthetic Data, LLM Integration, Async Processing |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
BaseSyntheticStage is the base processing stage for LLM-based synthetic data generation in the Nemotron-CC pipeline, handling prompt formatting, synchronous and asynchronous LLM client interaction, and response processing.
Description
BaseSyntheticStage extends ProcessingStage[DocumentBatch, DocumentBatch] as a Python dataclass. It provides the core logic for generating synthetic text data by sending formatted prompts to an LLM and collecting the responses into a DataFrame column.
The stage accepts a system_prompt, a prompt template (which uses {document} as a placeholder for the input field value), an input_field to read from, and an output_field to write results to. It supports both synchronous (LLMClient) and asynchronous (AsyncLLMClient) clients. On __post_init__, it detects the client type and sets the is_async_client flag accordingly.
The process() method converts the batch to a pandas DataFrame, generates LLM responses for each row (either sequentially or concurrently), stores the results in the output field, and returns a new DocumentBatch.
For async processing, the stage handles edge cases where an event loop is already running (e.g., inside Ray async actors) by spawning a separate thread with its own event loop.
Usage
Use BaseSyntheticStage as the parent class when creating new LLM-based synthetic data generation stages. Subclasses only need to specify the system_prompt, prompt template, input_field, and output_field. Existing subclasses include WikipediaParaphrasingStage, DiverseQAStage, DistillStage, ExtractKnowledgeStage, and KnowledgeListStage.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/stages/synthetic/nemotron_cc/base.py - Lines: 1-157
Signature
@dataclass
class BaseSyntheticStage(ProcessingStage[DocumentBatch, DocumentBatch]):
system_prompt: str = None
prompt: str = None
input_field: str = None
output_field: str = None
client: AsyncLLMClient | LLMClient = None
model_name: str = None
generation_config: GenerationConfig | None = None
name: str = "NemotronCCBaseStage"
def __post_init__(self) -> None: ...
def inputs(self) -> tuple[list[str], list[str]]: ...
def outputs(self) -> tuple[list[str], list[str]]: ...
def setup(self, _: WorkerMetadata | None = None) -> None: ...
def process(self, batch: DocumentBatch) -> DocumentBatch: ...
def _process_llm_prompt(self, sample: dict) -> str: ...
def _process_llm_response(self, response: list[str]) -> str: ...
def _process_sync(self, df: pd.DataFrame) -> list[str]: ...
def _process_async(self, df: pd.DataFrame) -> list[str]: ...
async def _generate_responses_async(self, df: pd.DataFrame) -> list[str]: ...
Import
from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| system_prompt | str |
No | System prompt sent to the LLM. If None, only a user message is sent. |
| prompt | str |
Yes | Prompt template with a {document} placeholder that gets substituted with the input field value.
|
| input_field | str |
Yes | Name of the DataFrame column to read source text from. |
| output_field | str |
Yes | Name of the DataFrame column to write generated text to. |
| client | AsyncLLMClient or LLMClient |
Yes | The LLM client instance used for querying the model. |
| model_name | str |
Yes | Name of the LLM model to query. |
| generation_config | GenerationConfig |
No | Optional configuration for controlling generation parameters (temperature, max tokens, etc.). |
| batch | DocumentBatch |
Yes | The input document batch containing source text in the input field. |
Outputs
| Name | Type | Description |
|---|---|---|
| result | DocumentBatch |
A document batch with the original data plus a new column containing LLM-generated responses in the output field. |
Key Implementation Details
Prompt Formatting
The _process_llm_prompt method formats the prompt template by substituting the input field value:
def _process_llm_prompt(self, sample: dict) -> str:
if self.input_field not in sample:
msg = f"Expected input field '{self.input_field}' in sample."
raise KeyError(msg)
return self.prompt.format(document=sample[self.input_field])
Synchronous vs Asynchronous Processing
The stage routes processing based on the detected client type:
def process(self, batch: DocumentBatch) -> DocumentBatch:
df = batch.to_pandas()
responses = self._process_async(df) if self.is_async_client else self._process_sync(df)
df[self.output_field] = responses
return DocumentBatch(...)
Synchronous processing iterates row-by-row with df.apply(). Asynchronous processing uses asyncio.gather to submit all requests concurrently.
Nested Event Loop Handling
The async path detects whether a running event loop already exists (e.g., inside Ray async actors) and falls back to a thread-based approach:
def _process_async(self, df: pd.DataFrame) -> list[str]:
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(self._generate_responses_async(df))
# Fallback: run in a separate thread with its own event loop
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, self._generate_responses_async(df))
return future.result()
Message Construction
When a system_prompt is provided, messages include both system and user roles. Otherwise, only a user message is sent:
if self.system_prompt:
messages = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": prompt},
]
else:
messages = [{"role": "user", "content": prompt}]
Usage Examples
Creating a Custom Synthetic Stage
from dataclasses import dataclass
from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage
@dataclass
class MySummarizationStage(BaseSyntheticStage):
system_prompt: str = "You are a helpful summarizer."
prompt: str = "Summarize the following text:\n\n{document}"
input_field: str = "text"
output_field: str = "summary"
Using with an Async Client
from nemo_curator.stages.synthetic.nemotron_cc.base import BaseSyntheticStage
from nemo_curator.models.client.llm_client import AsyncLLMClient
stage = BaseSyntheticStage(
system_prompt="You are a helpful assistant.",
prompt="Rewrite this text:\n\n{document}",
input_field="text",
output_field="rewritten",
client=my_async_client,
model_name="nemotron-4-340b",
)
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_NemotronCC_Stages - Concrete subclasses using this base class
- NVIDIA_NeMo_Curator_DataDesignerStage - Alternative synthetic data stage using NeMo Data Designer
- NVIDIA_NeMo_Curator_QAMultilingualSyntheticStage - Related multilingual synthetic generation stage