Implementation:FlagOpen FlagEmbedding MLVU Anomaly Reco Data
| Knowledge Sources | |
|---|---|
| Domains | Video Understanding, Anomaly Detection, Surveillance |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Benchmark dataset for detecting and recognizing anomalies in surveillance video footage.
Description
The MLVU Anomaly Recognition dataset contains 2602 questions focused on identifying abnormal events in surveillance videos. Questions ask whether anomalies exist in the footage and, if so, what type of anomaly occurred. This tests models' ability to distinguish normal from abnormal behavior and classify specific types of anomalous events such as fighting, theft, vandalism, accidents, and other security-relevant incidents.
The dataset simulates real-world surveillance scenarios where models must:
- Detect presence of anomalous events
- Classify the type of anomaly
- Handle varied surveillance contexts
- Process extended video sequences typical of security footage
Anomaly types include: Fighting, Shoplifting, Robbery, Assault, RoadAccidents, Shooting, Vandalism, Abuse, and Stealing.
Usage
Use this dataset for evaluating anomaly detection in surveillance systems, benchmarking security-related video understanding, or training models for automated incident recognition.
Code Reference
Source Location
- Repository: FlagOpen_FlagEmbedding
- File: research/MLVU/data/6_anomaly_reco.json
Data Structure
{
"video": str, # Surveillance video filename (e.g., "surveil_20.mp4")
"duration": float, # Video duration in seconds
"question": str, # Question about anomaly presence/type
"candidates": List[str], # Four anomaly types or "None"
"answer": str, # Correct anomaly type
"question_type": str # Always "anomaly_reco"
}
Import
import json
# Load anomaly recognition dataset
with open("research/MLVU/data/6_anomaly_reco.json", "r") as f:
anomaly_data = [json.loads(line) for line in f]
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| file_path | str | Yes | Path to the anomaly dataset JSON file |
Outputs
| Field | Type | Description |
|---|---|---|
| video | str | Surveillance video filename |
| duration | float | Video duration in seconds |
| question | str | Question about anomaly detection |
| candidates | List[str] | Four possible anomaly types |
| answer | str | Correct anomaly type |
| question_type | str | Type identifier ("anomaly_reco") |
Usage Examples
import json
from typing import List, Dict
from collections import Counter
# Load anomaly dataset
def load_anomaly_data(file_path: str) -> List[Dict]:
with open(file_path, "r") as f:
return [json.loads(line) for line in f]
data = load_anomaly_data("research/MLVU/data/6_anomaly_reco.json")
# Example entry
example = data[0]
print(f"Video: {example['video']}")
print(f"Duration: {example['duration']:.2f}s")
print(f"Question: {example['question']}")
print(f"Candidates: {example['candidates']}")
print(f"Answer: {example['answer']}")
# Output:
# Video: surveil_20.mp4
# Duration: 485.17s
# Question: Does this surveillance footage contain any anomalies?
# If yes, which kind of anomaly?
# Candidates: ['RoadAccidents', 'Shooting', 'Shoplifting', 'Assault']
# Answer: Shoplifting
# Evaluate anomaly detection
def evaluate_anomaly_detection(model, data: List[Dict]) -> Dict[str, float]:
correct = 0
total = len(data)
# Per-anomaly-type accuracy
per_type_correct = {}
per_type_total = {}
for item in data:
video_path = f"videos/{item['video']}"
# Model prediction
predicted_anomaly = model.detect_anomaly(
video_path,
item['question'],
item['candidates']
)
is_correct = (predicted_anomaly == item['answer'])
correct += is_correct
# Track per anomaly type
anomaly_type = item['answer']
per_type_total[anomaly_type] = per_type_total.get(anomaly_type, 0) + 1
if is_correct:
per_type_correct[anomaly_type] = per_type_correct.get(anomaly_type, 0) + 1
# Calculate per-type accuracy
per_type_acc = {
anom_type: per_type_correct.get(anom_type, 0) / total_count
for anom_type, total_count in per_type_total.items()
}
return {
"overall_acc": correct / total,
**per_type_acc
}
# Analyze anomaly distribution
def analyze_anomaly_types(data: List[Dict]) -> Dict[str, int]:
anomaly_counts = Counter(item['answer'] for item in data)
return dict(anomaly_counts.most_common())
anomaly_dist = analyze_anomaly_types(data)
print("Anomaly type distribution:", anomaly_dist)
# Filter by severity level
def categorize_by_severity(data: List[Dict]) -> Dict[str, List[Dict]]:
high_severity = ["Shooting", "Assault", "RoadAccidents", "Robbery"]
medium_severity = ["Fighting", "Vandalism", "Abuse"]
low_severity = ["Shoplifting", "Stealing"]
categorized = {
"high": [],
"medium": [],
"low": []
}
for item in data:
anomaly = item['answer']
if anomaly in high_severity:
categorized["high"].append(item)
elif anomaly in medium_severity:
categorized["medium"].append(item)
elif anomaly in low_severity:
categorized["low"].append(item)
return categorized
severity_groups = categorize_by_severity(data)
print(f"High severity: {len(severity_groups['high'])}")
print(f"Medium severity: {len(severity_groups['medium'])}")
print(f"Low severity: {len(severity_groups['low'])}")
# Analyze video duration for different anomaly types
def analyze_duration_by_type(data: List[Dict]) -> Dict[str, float]:
type_durations = {}
type_counts = {}
for item in data:
anomaly_type = item['answer']
duration = item['duration']
type_durations[anomaly_type] = type_durations.get(anomaly_type, 0) + duration
type_counts[anomaly_type] = type_counts.get(anomaly_type, 0) + 1
avg_durations = {
anom_type: type_durations[anom_type] / type_counts[anom_type]
for anom_type in type_durations
}
return avg_durations
avg_durations = analyze_duration_by_type(data)
print("Average duration by anomaly type:", avg_durations)