Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Datajuicer Data juicer AnnotationMapper

From Leeroopedia
Knowledge Sources
Domains Annotation, Human-in-the-Loop, Label Studio Integration
Last Updated 2026-02-14 16:00 GMT

Overview

Defines the base classes BaseAnnotationMapper and LabelStudioAnnotationMapper for human annotation operations that integrate with Label Studio, providing event-driven annotation task management with notification support.

Description

This module provides core infrastructure for human-in-the-loop annotation workflows within the Data-Juicer pipeline.

BaseAnnotationMapper is an abstract base class that extends Mapper with EventDrivenMixin and NotificationMixin. It manages the full lifecycle of annotation tasks:

  • Task creation -- Formats samples into annotation tasks and creates them in batches on the annotation platform
  • Batch processing -- Groups samples into configurable batch sizes with maximum tasks per batch
  • Polling and waiting -- Optionally waits for annotations with configurable timeout and poll intervals
  • Notification events -- Sends notifications (email, Slack, DingTalk) at various stages including task creation, batch creation, annotation completion, and errors
  • Event-driven architecture -- Registers handlers for annotation lifecycle events (TASK_CREATED, BATCH_CREATED, ANNOTATION_COMPLETED, BATCH_ANNOTATION_COMPLETED, ERROR_OCCURRED)

LabelStudioAnnotationMapper is a concrete implementation that integrates with Label Studio via its SDK. It:

  • Creates and manages Label Studio projects
  • Imports tasks in batches using the Label Studio API
  • Polls for completed annotations using filtered task queries
  • Provides project and task URLs in notifications
  • Handles serialization/deserialization (pickling) by reconnecting the Label Studio client on unpickle

Both classes operate as batched operators, processing column-oriented sample dictionaries.

Usage

Use these classes when building data processing pipelines that require human annotation as a processing step. Extend BaseAnnotationMapper for custom annotation platforms, or use LabelStudioAnnotationMapper directly for Label Studio-based workflows.

Code Reference

Source Location

Signature

class BaseAnnotationMapper(EventDrivenMixin, NotificationMixin, Mapper, ABC):
    _batched_op = True

    def __init__(
        self,
        project_name_prefix: str = "DataJuicer_Annotation",
        wait_for_annotations: bool = False,
        timeout: int = 3600,
        poll_interval: int = 60,
        samples_per_task: int = 1,
        max_tasks_per_batch: int = 100,
        project_id: Optional[int] = None,
        notification_config: Optional[Dict] = None,
        notification_events: Optional[Dict[str, bool]] = None,
        **kwargs,
    ):

class LabelStudioAnnotationMapper(BaseAnnotationMapper, ABC):
    def __init__(
        self,
        api_url: str = None,
        api_key: str = None,
        label_config: Optional[str] = None,
        **kwargs,
    ):

Import

from data_juicer.ops.mapper.annotation.annotation_mapper import BaseAnnotationMapper
from data_juicer.ops.mapper.annotation.annotation_mapper import LabelStudioAnnotationMapper

I/O Contract

Inputs (BaseAnnotationMapper)

Name Type Required Description
project_name_prefix str No Prefix for the project name. Default: "DataJuicer_Annotation"
wait_for_annotations bool No Whether to wait for annotations to complete. Default: False
timeout int No Maximum time to wait for annotations in seconds. Default: 3600
poll_interval int No Time between annotation status checks in seconds. Default: 60
samples_per_task int No Number of samples in each annotation task. Default: 1
max_tasks_per_batch int No Maximum tasks in a single batch. Default: 100
project_id int No ID of existing project. If None, creates a new project
notification_config Dict No Configuration for notifications (email, Slack, DingTalk)
notification_events Dict[str, bool] No Events that trigger notifications

Inputs (LabelStudioAnnotationMapper)

Name Type Required Description
api_url str No Base URL for Label Studio API
api_key str No API key for authentication
label_config str No XML configuration for the labeling interface

Outputs

Name Type Description
samples Dict Column-oriented dictionary with original data plus any annotation results added by _process_annotation_result

Usage Examples

# Subclass LabelStudioAnnotationMapper for a custom annotation task
class MyAnnotationMapper(LabelStudioAnnotationMapper):
    def _format_task(self, samples):
        return {"data": {"text": samples[0]["text"]}}

    def _process_annotation_result(self, annotation, sample):
        sample["label"] = annotation["result"]
        return sample

# Initialize with Label Studio connection
mapper = MyAnnotationMapper(
    api_url="http://localhost:8080",
    api_key="your-api-key",
    label_config="<View>...</View>",
    wait_for_annotations=True,
    timeout=7200,
    notification_config={"email": {"to": "team@example.com"}},
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment