Implementation:Norrrrrrr lyn WAInjectBench text ensemble detect
| Knowledge Sources | |
|---|---|
| Domains | Ensemble_Learning, NLP, Security |
| Last Updated | 2026-02-14 16:00 GMT |
Overview
Concrete tool for aggregating text detection results via set union across all detector outputs, provided by the WAInjectBench detector_text.ensemble module.
Description
The detect function in detector_text/ensemble.py reads all *.jsonl files in the result directory (excluding its own previous output ensemble.jsonl), groups entries by data_name, unions the detect_ids sets using defaultdict, determines malicious/benign status from the presence of "tpr"/"fpr" keys, and recomputes detection rates.
Usage
Called when --detector ensemble is passed to main_text.py. Individual detector results must already exist in the result directory.
Code Reference
Source Location
- Repository: WAInjectBench
- File: detector_text/ensemble.py (L7-60)
Signature
def detect(result_dir: str) -> List[Dict]:
"""
Ensemble detector:
- Reads all JSONL results from detectors in result_dir
- Combines detect_ids by union
- Recomputes TPR/FPR
- Returns a list of results (same format as other detectors)
"""
data_map = defaultdict(lambda: {"detect_ids": set(), "total_num": 0, "is_malicious": None})
for file in Path(result_dir).glob("*.jsonl"):
if file.name == "ensemble.jsonl":
continue
with open(file, "r", encoding="utf-8") as fin:
for line in fin:
entry = json.loads(line)
data_name = entry["data_name"]
data_map[data_name]["detect_ids"].update(entry.get("detect_ids", []))
if data_map[data_name]["total_num"] == 0:
data_map[data_name]["total_num"] = entry.get("total_num", 0)
if "tpr" in entry:
data_map[data_name]["is_malicious"] = True
elif "fpr" in entry:
data_map[data_name]["is_malicious"] = False
results = []
for data_name, info in data_map.items():
detect_ids = list(info["detect_ids"])
total_num = info["total_num"]
is_malicious = info["is_malicious"]
if total_num > 0:
if is_malicious:
rate_key, rate_value = "tpr", round(len(detect_ids) / total_num, 4)
else:
rate_key, rate_value = "fpr", round(len(detect_ids) / total_num, 4)
else:
rate_key, rate_value = "tpr", 0.0 if is_malicious else "fpr", 0.0
results.append({
"data_name": data_name,
rate_key: rate_value,
"detect_ids": detect_ids,
"total_num": total_num,
})
return results
Import
from detector_text import ensemble
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| result_dir | str | Yes | Directory containing per-detector .jsonl result files |
Outputs
| Name | Type | Description |
|---|---|---|
| results | List[Dict] | Union-aggregated results with recomputed TPR/FPR, same format as individual detectors |
Usage Examples
Running Ensemble Aggregation
from detector_text import ensemble
# Assumes individual detector results already exist in result/text/
results = ensemble.detect("result/text")
for r in results:
metric = "tpr" if "tpr" in r else "fpr"
print(f"{r['data_name']}: {metric}={r[metric]}, detected={len(r['detect_ids'])}/{r['total_num']}")