Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:FlagOpen FlagEmbedding MLVU Anomaly Reco Data

From Leeroopedia


Knowledge Sources
Domains Video Understanding, Anomaly Detection, Surveillance
Last Updated 2026-02-09 00:00 GMT

Overview

Benchmark dataset for detecting and recognizing anomalies in surveillance video footage.

Description

The MLVU Anomaly Recognition dataset contains 2602 questions focused on identifying abnormal events in surveillance videos. Questions ask whether anomalies exist in the footage and, if so, what type of anomaly occurred. This tests models' ability to distinguish normal from abnormal behavior and classify specific types of anomalous events such as fighting, theft, vandalism, accidents, and other security-relevant incidents.

The dataset simulates real-world surveillance scenarios where models must:

  • Detect presence of anomalous events
  • Classify the type of anomaly
  • Handle varied surveillance contexts
  • Process extended video sequences typical of security footage

Anomaly types include: Fighting, Shoplifting, Robbery, Assault, RoadAccidents, Shooting, Vandalism, Abuse, and Stealing.

Usage

Use this dataset for evaluating anomaly detection in surveillance systems, benchmarking security-related video understanding, or training models for automated incident recognition.

Code Reference

Source Location

Data Structure

{
    "video": str,              # Surveillance video filename (e.g., "surveil_20.mp4")
    "duration": float,         # Video duration in seconds
    "question": str,           # Question about anomaly presence/type
    "candidates": List[str],   # Four anomaly types or "None"
    "answer": str,             # Correct anomaly type
    "question_type": str       # Always "anomaly_reco"
}

Import

import json

# Load anomaly recognition dataset
with open("research/MLVU/data/6_anomaly_reco.json", "r") as f:
    anomaly_data = [json.loads(line) for line in f]

I/O Contract

Inputs

Name Type Required Description
file_path str Yes Path to the anomaly dataset JSON file

Outputs

Field Type Description
video str Surveillance video filename
duration float Video duration in seconds
question str Question about anomaly detection
candidates List[str] Four possible anomaly types
answer str Correct anomaly type
question_type str Type identifier ("anomaly_reco")

Usage Examples

import json
from typing import List, Dict
from collections import Counter

# Load anomaly dataset
def load_anomaly_data(file_path: str) -> List[Dict]:
    with open(file_path, "r") as f:
        return [json.loads(line) for line in f]

data = load_anomaly_data("research/MLVU/data/6_anomaly_reco.json")

# Example entry
example = data[0]
print(f"Video: {example['video']}")
print(f"Duration: {example['duration']:.2f}s")
print(f"Question: {example['question']}")
print(f"Candidates: {example['candidates']}")
print(f"Answer: {example['answer']}")

# Output:
# Video: surveil_20.mp4
# Duration: 485.17s
# Question: Does this surveillance footage contain any anomalies?
#           If yes, which kind of anomaly?
# Candidates: ['RoadAccidents', 'Shooting', 'Shoplifting', 'Assault']
# Answer: Shoplifting

# Evaluate anomaly detection
def evaluate_anomaly_detection(model, data: List[Dict]) -> Dict[str, float]:
    correct = 0
    total = len(data)

    # Per-anomaly-type accuracy
    per_type_correct = {}
    per_type_total = {}

    for item in data:
        video_path = f"videos/{item['video']}"

        # Model prediction
        predicted_anomaly = model.detect_anomaly(
            video_path,
            item['question'],
            item['candidates']
        )

        is_correct = (predicted_anomaly == item['answer'])
        correct += is_correct

        # Track per anomaly type
        anomaly_type = item['answer']
        per_type_total[anomaly_type] = per_type_total.get(anomaly_type, 0) + 1
        if is_correct:
            per_type_correct[anomaly_type] = per_type_correct.get(anomaly_type, 0) + 1

    # Calculate per-type accuracy
    per_type_acc = {
        anom_type: per_type_correct.get(anom_type, 0) / total_count
        for anom_type, total_count in per_type_total.items()
    }

    return {
        "overall_acc": correct / total,
        **per_type_acc
    }

# Analyze anomaly distribution
def analyze_anomaly_types(data: List[Dict]) -> Dict[str, int]:
    anomaly_counts = Counter(item['answer'] for item in data)
    return dict(anomaly_counts.most_common())

anomaly_dist = analyze_anomaly_types(data)
print("Anomaly type distribution:", anomaly_dist)

# Filter by severity level
def categorize_by_severity(data: List[Dict]) -> Dict[str, List[Dict]]:
    high_severity = ["Shooting", "Assault", "RoadAccidents", "Robbery"]
    medium_severity = ["Fighting", "Vandalism", "Abuse"]
    low_severity = ["Shoplifting", "Stealing"]

    categorized = {
        "high": [],
        "medium": [],
        "low": []
    }

    for item in data:
        anomaly = item['answer']
        if anomaly in high_severity:
            categorized["high"].append(item)
        elif anomaly in medium_severity:
            categorized["medium"].append(item)
        elif anomaly in low_severity:
            categorized["low"].append(item)

    return categorized

severity_groups = categorize_by_severity(data)
print(f"High severity: {len(severity_groups['high'])}")
print(f"Medium severity: {len(severity_groups['medium'])}")
print(f"Low severity: {len(severity_groups['low'])}")

# Analyze video duration for different anomaly types
def analyze_duration_by_type(data: List[Dict]) -> Dict[str, float]:
    type_durations = {}
    type_counts = {}

    for item in data:
        anomaly_type = item['answer']
        duration = item['duration']

        type_durations[anomaly_type] = type_durations.get(anomaly_type, 0) + duration
        type_counts[anomaly_type] = type_counts.get(anomaly_type, 0) + 1

    avg_durations = {
        anom_type: type_durations[anom_type] / type_counts[anom_type]
        for anom_type in type_durations
    }

    return avg_durations

avg_durations = analyze_duration_by_type(data)
print("Average duration by anomaly type:", avg_durations)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment