Implementation:Datajuicer Data juicer AnnotationMapper
| Knowledge Sources | |
|---|---|
| Domains | Annotation, Human-in-the-Loop, Label Studio Integration |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Defines the base classes BaseAnnotationMapper and LabelStudioAnnotationMapper for human annotation operations that integrate with Label Studio, providing event-driven annotation task management with notification support.
Description
This module provides core infrastructure for human-in-the-loop annotation workflows within the Data-Juicer pipeline.
BaseAnnotationMapper is an abstract base class that extends Mapper with EventDrivenMixin and NotificationMixin. It manages the full lifecycle of annotation tasks:
- Task creation -- Formats samples into annotation tasks and creates them in batches on the annotation platform
- Batch processing -- Groups samples into configurable batch sizes with maximum tasks per batch
- Polling and waiting -- Optionally waits for annotations with configurable timeout and poll intervals
- Notification events -- Sends notifications (email, Slack, DingTalk) at various stages including task creation, batch creation, annotation completion, and errors
- Event-driven architecture -- Registers handlers for annotation lifecycle events (TASK_CREATED, BATCH_CREATED, ANNOTATION_COMPLETED, BATCH_ANNOTATION_COMPLETED, ERROR_OCCURRED)
LabelStudioAnnotationMapper is a concrete implementation that integrates with Label Studio via its SDK. It:
- Creates and manages Label Studio projects
- Imports tasks in batches using the Label Studio API
- Polls for completed annotations using filtered task queries
- Provides project and task URLs in notifications
- Handles serialization/deserialization (pickling) by reconnecting the Label Studio client on unpickle
Both classes operate as batched operators, processing column-oriented sample dictionaries.
Usage
Use these classes when building data processing pipelines that require human annotation as a processing step. Extend BaseAnnotationMapper for custom annotation platforms, or use LabelStudioAnnotationMapper directly for Label Studio-based workflows.
Code Reference
Source Location
- Repository: Datajuicer_Data_juicer
- File: data_juicer/ops/mapper/annotation/annotation_mapper.py
- Lines: 1-771
Signature
class BaseAnnotationMapper(EventDrivenMixin, NotificationMixin, Mapper, ABC):
_batched_op = True
def __init__(
self,
project_name_prefix: str = "DataJuicer_Annotation",
wait_for_annotations: bool = False,
timeout: int = 3600,
poll_interval: int = 60,
samples_per_task: int = 1,
max_tasks_per_batch: int = 100,
project_id: Optional[int] = None,
notification_config: Optional[Dict] = None,
notification_events: Optional[Dict[str, bool]] = None,
**kwargs,
):
class LabelStudioAnnotationMapper(BaseAnnotationMapper, ABC):
def __init__(
self,
api_url: str = None,
api_key: str = None,
label_config: Optional[str] = None,
**kwargs,
):
Import
from data_juicer.ops.mapper.annotation.annotation_mapper import BaseAnnotationMapper
from data_juicer.ops.mapper.annotation.annotation_mapper import LabelStudioAnnotationMapper
I/O Contract
Inputs (BaseAnnotationMapper)
| Name | Type | Required | Description |
|---|---|---|---|
| project_name_prefix | str | No | Prefix for the project name. Default: "DataJuicer_Annotation" |
| wait_for_annotations | bool | No | Whether to wait for annotations to complete. Default: False |
| timeout | int | No | Maximum time to wait for annotations in seconds. Default: 3600 |
| poll_interval | int | No | Time between annotation status checks in seconds. Default: 60 |
| samples_per_task | int | No | Number of samples in each annotation task. Default: 1 |
| max_tasks_per_batch | int | No | Maximum tasks in a single batch. Default: 100 |
| project_id | int | No | ID of existing project. If None, creates a new project |
| notification_config | Dict | No | Configuration for notifications (email, Slack, DingTalk) |
| notification_events | Dict[str, bool] | No | Events that trigger notifications |
Inputs (LabelStudioAnnotationMapper)
| Name | Type | Required | Description |
|---|---|---|---|
| api_url | str | No | Base URL for Label Studio API |
| api_key | str | No | API key for authentication |
| label_config | str | No | XML configuration for the labeling interface |
Outputs
| Name | Type | Description |
|---|---|---|
| samples | Dict | Column-oriented dictionary with original data plus any annotation results added by _process_annotation_result |
Usage Examples
# Subclass LabelStudioAnnotationMapper for a custom annotation task
class MyAnnotationMapper(LabelStudioAnnotationMapper):
def _format_task(self, samples):
return {"data": {"text": samples[0]["text"]}}
def _process_annotation_result(self, annotation, sample):
sample["label"] = annotation["result"]
return sample
# Initialize with Label Studio connection
mapper = MyAnnotationMapper(
api_url="http://localhost:8080",
api_key="your-api-key",
label_config="<View>...</View>",
wait_for_annotations=True,
timeout=7200,
notification_config={"email": {"to": "team@example.com"}},
)