""" Simple data loader for OpenHands Index leaderboard. Loads JSONL files from local directory or GitHub repository. """ import os import pandas as pd import json from pathlib import Path class SimpleLeaderboardViewer: """Simple replacement for agent-eval's LeaderboardViewer.""" def __init__(self, data_dir: str, config: str, split: str): """ Args: data_dir: Path to data directory config: Config name (e.g., "1.0.0-dev1") split: Split name (e.g., "validation" or "test") """ self.data_dir = Path(data_dir) self.config = config self.split = split self.config_path = self.data_dir / config # Load suite configuration config_file = self.config_path / "agenteval.json" if config_file.exists(): with open(config_file) as f: suite_config = json.load(f) self.suite_config = suite_config["suite_config"] else: self.suite_config = { "name": "openhands-index", "version": config, "splits": [] } # Build tag map from config - organize benchmarks by category self.tag_map = {} self.benchmark_to_categories = {} # Maps benchmark name to its categories # Try loading from config first config_has_mappings = False for split_config in self.suite_config.get("splits", []): if split_config["name"] == split: for task in split_config.get("tasks", []): task_name = task["name"] # Store which categories this benchmark belongs to self.benchmark_to_categories[task_name] = [] for tag in task.get("tags", []): # Skip "Overall" and the benchmark's own name if tag != "Overall" and tag != task_name: # This is a category tag if tag not in self.tag_map: self.tag_map[tag] = [] if task_name not in self.tag_map[tag]: self.tag_map[tag].append(task_name) self.benchmark_to_categories[task_name].append(tag) config_has_mappings = True # FALLBACK: If no mappings loaded from config, use hard-coded category mappings if not config_has_mappings: print("[DATA_LOADER] No agenteval.json found, using fallback category mappings") fallback_mappings = { 'swe-bench': ['Bug Fixing'], 'swe-bench-multimodal': ['Frontend Development'], 'commit0': ['App Creation'], 'multi-swe-bench': ['Bug Fixing'], 'swt-bench': ['Test Generation'], 'gaia': ['Information Gathering'], } for benchmark, categories in fallback_mappings.items(): self.benchmark_to_categories[benchmark] = categories for category in categories: if category not in self.tag_map: self.tag_map[category] = [] if benchmark not in self.tag_map[category]: self.tag_map[category].append(benchmark) def _load_from_agent_dirs(self): """Load data from new agent-centric directory structure (results/YYYYMMDD_model/).""" results_dir = self.config_path / "results" if not results_dir.exists(): return None # Fall back to old format all_records = [] # Iterate through each agent directory for agent_dir in results_dir.iterdir(): if not agent_dir.is_dir(): continue metadata_file = agent_dir / "metadata.json" scores_file = agent_dir / "scores.json" if not metadata_file.exists() or not scores_file.exists(): continue # Load metadata and scores with open(metadata_file) as f: metadata = json.load(f) with open(scores_file) as f: scores = json.load(f) # Create one record per benchmark (mimicking old JSONL format) for score_entry in scores: record = { 'agent_version': metadata.get('agent_version', 'Unknown'), 'llm_base': metadata.get('model', 'unknown'), 'openness': metadata.get('openness', 'unknown'), 'submission_time': metadata.get('submission_time', ''), 'score': score_entry.get('score'), 'metric': score_entry.get('metric', 'unknown'), 'total_cost': score_entry.get('total_cost'), 'total_runtime': score_entry.get('total_runtime'), 'tags': [score_entry.get('benchmark')], } all_records.append(record) if not all_records: return None # Fall back to old format return pd.DataFrame(all_records) def _load(self): """Load data from agent-centric directories and return DataFrame and tag map.""" df = self._load_from_agent_dirs() if df is None: # Return empty dataframe with error message return pd.DataFrame({ "Message": [f"No data found for split '{self.split}' in results directory"] }), {} # Process the dataframe try: # Transform to expected format for leaderboard # Group by agent to aggregate results across datasets transformed_records = [] for agent_version in df['agent_version'].unique(): agent_records = df[df['agent_version'] == agent_version] # Build a single record for this agent first_record = agent_records.iloc[0] # Normalize openness to "open" or "closed" from aliases import OPENNESS_MAPPING raw_openness = first_record['openness'] normalized_openness = OPENNESS_MAPPING.get(raw_openness, raw_openness) record = { # Core agent info - use final display names 'Openhands version': agent_version, # Will become "OpenHands Version" 'Language model': first_record['llm_base'], # Will become "Language Model" 'openness': normalized_openness, # Will become "Openness" (simplified to "open" or "closed") 'date': first_record['submission_time'], # Will become "Date" # Additional columns expected by the transformer 'id': first_record.get('id', agent_version), # Will become "Id" 'source': first_record.get('source', ''), # Will become "Source" 'logs': first_record.get('logs', ''), # Will become "Logs" } # Add per-dataset scores and costs dataset_scores = [] dataset_costs = [] # Track category-level data for aggregation category_data = {} # {category: {'scores': [...], 'costs': [...]}} for _, row in agent_records.iterrows(): tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']] for tag in tags: # Add columns for this specific dataset/benchmark record[f'{tag} score'] = row['score'] record[f'{tag} cost'] = row['total_cost'] dataset_scores.append(row['score']) dataset_costs.append(row['total_cost']) # Track category-level data for aggregation if tag in self.benchmark_to_categories: for category in self.benchmark_to_categories[tag]: if category not in category_data: category_data[category] = {'scores': [], 'costs': []} category_data[category]['scores'].append(row['score']) category_data[category]['costs'].append(row['total_cost']) # Calculate category-level aggregates category_avg_scores = [] category_avg_costs = [] for category, data in category_data.items(): if data['scores']: avg_score = sum(data['scores']) / len(data['scores']) record[f'{category} score'] = avg_score category_avg_scores.append(avg_score) if data['costs']: avg_cost = sum(data['costs']) / len(data['costs']) record[f'{category} cost'] = avg_cost category_avg_costs.append(avg_cost) # Calculate overall score and cost as macro-average of category averages # This ensures each category contributes equally regardless of benchmark count if category_avg_scores: record['overall score'] = sum(category_avg_scores) / len(category_avg_scores) else: record['overall score'] = None if category_avg_costs: record['overall cost'] = sum(category_avg_costs) / len(category_avg_costs) else: record['overall cost'] = None transformed_records.append(record) transformed_df = pd.DataFrame(transformed_records) # Build tag map if not already built if not self.tag_map: # Create simple tag map from the data all_tags = set() for _, row in df.iterrows(): tags = row['tags'] if isinstance(row['tags'], list) else [row['tags']] all_tags.update(tags) # Simple mapping: each tag maps to itself self.tag_map = {tag: [tag] for tag in sorted(all_tags)} # DEBUG: Print sample of loaded data print(f"[DATA_LOADER] Loaded {len(transformed_df)} agents") if len(transformed_df) > 0: sample_cols = ['agent_name', 'overall_score', 'overall_cost'] available_cols = [c for c in sample_cols if c in transformed_df.columns] print(f"[DATA_LOADER] Sample row: {transformed_df[available_cols].iloc[0].to_dict()}") return transformed_df, self.tag_map except Exception as e: import traceback traceback.print_exc() return pd.DataFrame({ "Message": [f"Error loading data: {e}"] }), {} def get_dataframe(self): """Get the raw dataframe.""" df, _ = self._load() return df def load_mock_data_locally(data_dir: str = "mock_results"): """ Load mock data from local directory for testing. Args: data_dir: Path to mock results directory Returns: Dictionary mapping split names to SimpleLeaderboardViewer instances """ viewers = {} data_path = Path(data_dir) if not data_path.exists(): print(f"Warning: Mock data directory '{data_dir}' not found") return viewers # Find all config directories for config_dir in data_path.iterdir(): if config_dir.is_dir(): config_name = config_dir.name # Find all JSONL files (each represents a split) for jsonl_file in config_dir.glob("*.jsonl"): split_name = jsonl_file.stem viewer = SimpleLeaderboardViewer( data_dir=str(data_path), config=config_name, split=split_name ) viewers[split_name] = viewer return viewers