openhands-index / simple_data_loader.py
openhands
Fix score calculation to match AstaBench methodology and update categories
e734bf6
raw
history blame
11 kB
"""
Simple data loader for OpenHands Index leaderboard.
Loads JSONL files from local directory or GitHub repository.
"""
import os
import pandas as pd
import json
from pathlib import Path
class SimpleLeaderboardViewer:
"""Simple replacement for agent-eval's LeaderboardViewer."""
def __init__(self, data_dir: str, config: str, split: str):
"""
Args:
data_dir: Path to data directory
config: Config name (e.g., "1.0.0-dev1")
split: Split name (e.g., "validation" or "test")
"""
self.data_dir = Path(data_dir)
self.config = config
self.split = split
self.config_path = self.data_dir / config
# Load suite configuration
config_file = self.config_path / "agenteval.json"
if config_file.exists():
with open(config_file) as f:
suite_config = json.load(f)
self.suite_config = suite_config["suite_config"]
else:
self.suite_config = {
"name": "openhands-index",
"version": config,
"splits": []
}
# Build tag map from config - organize benchmarks by category
self.tag_map = {}
self.benchmark_to_categories = {} # Maps benchmark name to its categories
for split_config in self.suite_config.get("splits", []):
if split_config["name"] == split:
for task in split_config.get("tasks", []):
task_name = task["name"]
# Store which categories this benchmark belongs to
self.benchmark_to_categories[task_name] = []
for tag in task.get("tags", []):
# Skip "Overall" and the benchmark's own name
if tag != "Overall" and tag != task_name:
# This is a category tag
if tag not in self.tag_map:
self.tag_map[tag] = []
if task_name not in self.tag_map[tag]:
self.tag_map[tag].append(task_name)
self.benchmark_to_categories[task_name].append(tag)
def _load_from_agent_dirs(self):
"""Load data from new agent-centric directory structure (results/YYYYMMDD_model/)."""
results_dir = self.config_path / "results"
if not results_dir.exists():
return None # Fall back to old format
all_records = []
# Iterate through each agent directory
for agent_dir in results_dir.iterdir():
if not agent_dir.is_dir():
continue
metadata_file = agent_dir / "metadata.json"
scores_file = agent_dir / "scores.json"
if not metadata_file.exists() or not scores_file.exists():
continue
# Load metadata and scores
with open(metadata_file) as f:
metadata = json.load(f)
with open(scores_file) as f:
scores = json.load(f)
# Create one record per benchmark (mimicking old JSONL format)
for score_entry in scores:
record = {
'agent_version': metadata.get('agent_version', 'Unknown'),
'llm_base': metadata.get('model', 'unknown'),
'openness': metadata.get('openness', 'unknown'),
'submission_time': metadata.get('submission_time', ''),
'score': score_entry.get('score'),
'metric': score_entry.get('metric', 'unknown'),
'total_cost': score_entry.get('total_cost'),
'total_runtime': score_entry.get('total_runtime'),
'tags': [score_entry.get('benchmark')],
}
all_records.append(record)
if not all_records:
return None # Fall back to old format
return pd.DataFrame(all_records)
def _load(self):
"""Load data from agent-centric directories and return DataFrame and tag map."""
df = self._load_from_agent_dirs()
if df is None:
# Return empty dataframe with error message
return pd.DataFrame({
"Message": [f"No data found for split '{self.split}' in results directory"]
}), {}
# Process the dataframe
try:
# Transform to expected format for leaderboard
# Group by agent to aggregate results across datasets
transformed_records = []
for agent_version in df['agent_version'].unique():
agent_records = df[df['agent_version'] == agent_version]
# Build a single record for this agent
first_record = agent_records.iloc[0]
# Normalize openness to "open" or "closed"
from aliases import OPENNESS_MAPPING
raw_openness = first_record['openness']
normalized_openness = OPENNESS_MAPPING.get(raw_openness, raw_openness)
record = {
# Core agent info - use final display names
'Openhands version': agent_version, # Will become "OpenHands Version"
'Language model': first_record['llm_base'], # Will become "Language Model"
'openness': normalized_openness, # Will become "Openness" (simplified to "open" or "closed")
'date': first_record['submission_time'], # Will become "Date"
# Additional columns expected by the transformer
'id': first_record.get('id', agent_version), # Will become "Id"
'source': first_record.get('source', ''), # Will become "Source"
'logs': first_record.get('logs', ''), # Will become "Logs"
}
# Add per-dataset scores and costs
dataset_scores = []
dataset_costs = []
# Track category-level data for aggregation
category_data = {} # {category: {'scores': [...], 'costs': [...]}}
for _, row in agent_records.iterrows():
tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']]
for tag in tags:
# Add columns for this specific dataset/benchmark
record[f'{tag} score'] = row['score']
record[f'{tag} cost'] = row['total_cost']
dataset_scores.append(row['score'])
dataset_costs.append(row['total_cost'])
# Track category-level data for aggregation
if tag in self.benchmark_to_categories:
for category in self.benchmark_to_categories[tag]:
if category not in category_data:
category_data[category] = {'scores': [], 'costs': []}
category_data[category]['scores'].append(row['score'])
category_data[category]['costs'].append(row['total_cost'])
# Calculate category-level aggregates
category_avg_scores = []
category_avg_costs = []
for category, data in category_data.items():
if data['scores']:
avg_score = sum(data['scores']) / len(data['scores'])
record[f'{category} score'] = avg_score
category_avg_scores.append(avg_score)
if data['costs']:
avg_cost = sum(data['costs']) / len(data['costs'])
record[f'{category} cost'] = avg_cost
category_avg_costs.append(avg_cost)
# Calculate overall score and cost as macro-average of category averages
# This ensures each category contributes equally regardless of benchmark count
if category_avg_scores:
record['overall score'] = sum(category_avg_scores) / len(category_avg_scores)
else:
record['overall score'] = None
if category_avg_costs:
record['overall cost'] = sum(category_avg_costs) / len(category_avg_costs)
else:
record['overall cost'] = None
transformed_records.append(record)
transformed_df = pd.DataFrame(transformed_records)
# Build tag map if not already built
if not self.tag_map:
# Create simple tag map from the data
all_tags = set()
for _, row in df.iterrows():
tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']]
all_tags.update(tags)
# Simple mapping: each tag maps to itself
self.tag_map = {tag: [tag] for tag in sorted(all_tags)}
return transformed_df, self.tag_map
except Exception as e:
import traceback
traceback.print_exc()
return pd.DataFrame({
"Message": [f"Error loading data: {e}"]
}), {}
def get_dataframe(self):
"""Get the raw dataframe."""
df, _ = self._load()
return df
def load_mock_data_locally(data_dir: str = "mock_results"):
"""
Load mock data from local directory for testing.
Args:
data_dir: Path to mock results directory
Returns:
Dictionary mapping split names to SimpleLeaderboardViewer instances
"""
viewers = {}
data_path = Path(data_dir)
if not data_path.exists():
print(f"Warning: Mock data directory '{data_dir}' not found")
return viewers
# Find all config directories
for config_dir in data_path.iterdir():
if config_dir.is_dir():
config_name = config_dir.name
# Find all JSONL files (each represents a split)
for jsonl_file in config_dir.glob("*.jsonl"):
split_name = jsonl_file.stem
viewer = SimpleLeaderboardViewer(
data_dir=str(data_path),
config=config_name,
split=split_name
)
viewers[split_name] = viewer
return viewers