Implementation:Datajuicer Data juicer RayVLLMEnginePipeline Config
| Knowledge Sources | |
|---|---|
| Domains | NLP, Distributed_Computing, LLM |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
Concrete tool for configuring Ray-based vLLM inference pipelines for LLM-powered data generation provided by the Data-Juicer framework.
Description
RayVLLMEnginePipeline is the base class for Ray-based LLM inference pipelines. LLMRayVLLMEnginePipeline extends it for text LLM inference, and VLMRayVLLMEnginePipeline for vision-language models. These classes configure vLLMEngineProcessorConfig from ray.data.llm with model source, engine kwargs, and sampling parameters. They handle preprocessing (building chat messages from dataset columns), inference (via Ray Data's build_llm_processor), and postprocessing (extracting responses back to dataset columns).
Usage
Use as a pipeline operator in YAML configs. Configure with model name, engine kwargs (tensor_parallel_size, max_model_len), and sampling params (temperature, top_p).
Code Reference
Source Location
- Repository: data-juicer
- File: data_juicer/ops/pipeline/ray_vllm_pipeline.py (base), data_juicer/ops/pipeline/llm_inference_with_ray_vllm_pipeline.py (LLM), data_juicer/ops/pipeline/vlm_inference_with_ray_vllm_pipeline.py (VLM)
- Lines: ray_vllm_pipeline.py:L10-43, llm_inference_with_ray_vllm_pipeline.py:L17-185
Signature
class RayVLLMEnginePipeline(Pipeline):
def __init__(self, accelerator_type=None, *args, **kwargs):
"""
Base class for Ray vLLM inference pipelines.
Args:
accelerator_type: GPU type (e.g. 'A100', 'H100').
"""
class LLMRayVLLMEnginePipeline(RayVLLMEnginePipeline):
def __init__(
self,
api_or_hf_model: str = None,
is_hf_model: bool = True,
system_prompt: str = '',
sampling_params: dict = None,
engine_kwargs: dict = None,
*args, **kwargs
):
"""
Args:
api_or_hf_model: Model name/path (HuggingFace or API).
is_hf_model: True for HuggingFace models, False for API.
system_prompt: System message for chat.
sampling_params: Generation parameters (temperature, top_p, etc.).
engine_kwargs: vLLM engine parameters (tensor_parallel_size, etc.).
"""
def run(self, dataset, *, exporter=None, tracer=None, reduce=True):
"""
Run inference on a Ray dataset.
Args:
dataset: Ray dataset with query column.
exporter: Optional exporter.
tracer: Optional tracer.
reduce: Reduce results.
Returns:
Processed Ray dataset with response column.
"""
Import
from data_juicer.ops.pipeline.llm_inference_with_ray_vllm_pipeline import LLMRayVLLMEnginePipeline
from data_juicer.ops.pipeline.vlm_inference_with_ray_vllm_pipeline import VLMRayVLLMEnginePipeline
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| api_or_hf_model | str | Yes | Model name/path |
| is_hf_model | bool | No | HuggingFace (True) or API (False) |
| engine_kwargs | dict | No | vLLM engine configuration |
| sampling_params | dict | No | Generation parameters |
| dataset | RayDataset | Yes (run) | Dataset with query column |
Outputs
| Name | Type | Description |
|---|---|---|
| dataset | RayDataset | Dataset with response column added |
Usage Examples
YAML Pipeline Configuration
# pipeline_with_llm.yaml
executor_type: ray
process:
- llm_inference_with_ray_vllm_pipeline:
api_or_hf_model: Qwen/Qwen2.5-7B-Instruct
is_hf_model: true
system_prompt: "You are a helpful assistant."
engine_kwargs:
tensor_parallel_size: 2
max_model_len: 4096
sampling_params:
temperature: 0.7
top_p: 0.9
max_tokens: 512