Implementation:Mlfoundations Open flamingo All gather json dump
Overview
User-defined pattern combining PyTorch distributed gathering with JSON serialization for aggregating multi-benchmark evaluation results.
Description
This is a Pattern Doc. The result aggregation pattern: (1) each rank calls torch.distributed.all_gather_object() to share its predictions with all other ranks, (2) predictions are merged and de-duplicated by question_id/image_id, (3) rank 0 writes merged predictions to a temp JSON file, (4) metric functions (compute_cider, compute_vqa_accuracy) read the JSON and compute scores, (5) final results are structured as {dataset: [{shots, trials: [scores], mean, stddev}]} and saved to args.results_file.
Usage
Used within each evaluate_* function after distributed prediction generation.
Code Reference
Source: Repository https://github.com/mlfoundations/open_flamingo, File: open_flamingo/eval/evaluate.py Lines L393-725 (main evaluation orchestrator)
Interface pattern:
# Step 1: Gather predictions from all ranks
all_predictions = [None] * args.world_size
torch.distributed.all_gather_object(all_predictions, local_predictions)
# Step 2: Merge and de-duplicate on rank 0
if args.rank == 0:
merged = [p for rank_preds in all_predictions for p in rank_preds]
# Remove duplicates by unique ID
# Step 3: Save to JSON and compute metrics
with open(results_path, "w") as f:
json.dump(merged, f)
score = compute_metric(results_path, annotations_path)
# Step 4: Aggregate across trials
results[dataset] = {"shots": num_shots, "trials": trial_scores,
"mean": np.mean(trial_scores), "stddev": np.std(trial_scores)}
json.dump(results, open(args.results_file, "w"))
Import: import torch.distributed and import json
I/O Contract
Inputs:
| Name | Type | Required | Description |
|---|---|---|---|
| local_predictions | List[dict] | Yes | This rank's predictions |
| args.world_size | int | Yes | Number of distributed ranks |
| args.results_file | str | Yes | Path for final results JSON |
Outputs:
Results JSON file with per-benchmark scores including mean and stddev across trials.
Usage Examples
The gather-merge-save pattern in practice:
# Each rank produces local predictions during evaluation
local_predictions = []
for batch in dataloader:
prediction = model.generate(batch)
local_predictions.append({"question_id": batch["id"], "answer": prediction})
# Gather from all ranks
all_predictions = [None] * args.world_size
torch.distributed.all_gather_object(all_predictions, local_predictions)
# Rank 0 merges, de-duplicates, saves, and computes metrics
if args.rank == 0:
merged = [p for rank_preds in all_predictions for p in rank_preds]
seen_ids = set()
unique = []
for p in merged:
if p["question_id"] not in seen_ids:
seen_ids.add(p["question_id"])
unique.append(p)
with open(results_path, "w") as f:
json.dump(unique, f)
score = compute_metric(results_path, annotations_path)
Related Pages
Principle:Mlfoundations_Open_flamingo_Distributed_Result_Aggregation