| |
| """Unified benchmarking script for ReCall, ZeroSearch, and R1‑Searcher |
| with optional multi‑threaded execution. |
| |
| Example usage (single‑threaded) |
| ------------------------------- |
| ```bash |
| python eval_benchmark.py \ |
| --dataset frames \ |
| --agent r1-searcher \ |
| --model-url http://0.0.0.0:1233 \ |
| --out-base /tmp/evals \ |
| --mode single |
| ``` |
| |
| Example usage (multi‑threaded, 128 workers) |
| ------------------------------------------ |
| ```bash |
| python eval_benchmark.py \ |
| --dataset frames \ |
| --agent recall \ |
| --model-url http://0.0.0.0:1231 \ |
| --out-base /tmp/evals \ |
| --mode multi \ |
| --workers 128 |
| ``` |
| The script will: |
| 1. Load the specified dataset JSONL file that contains objects with keys |
| `question` and `answer`. |
| 2. Build the chosen agent wrapper (`recall`, `zerosearch`, or `r1-searcher`). |
| 3. Stream one JSONL line per example with *all* details needed for analysis. |
| 4. Optionally run the evaluation loop in parallel using a configurable number |
| of worker threads. |
| 5. Automatically construct the output path as: |
| ``` |
| {out_base}/{model_name}/{dataset}.jsonl |
| ``` |
| where `model_name` is derived from the `--model-url` (characters after the |
| last `/`). |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| import os |
| import pathlib |
| import re |
| import threading |
| import time |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import Dict, List |
|
|
| import unicodedata |
| from openai import OpenAI, APIStatusError |
| from tqdm import tqdm |
|
|
| |
| |
| |
| from re_call import ReCall |
| |
| |
| |
| from pathlib import Path |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| search_env = "from search_api import search_urls, open_url, search_and_parse_query, query_url" |
| search_schemas =[ |
| { |
| "name": "search_urls", |
| "description": "Google search and return links to web-pages with a brief snippet given a text query", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "query": {"type": "string"}, |
| "top_k": {"type": "integer", "default": 10}, |
| }, |
| "required": ["query"], |
| }, |
| }, |
| { |
| "name": "query_url", |
| "description": "Visit webpage and return evidence based retrival for the provided goal", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"}, |
| "goal": {"type": "string", "description": "The specific information goal for visiting webpage"}, |
| }, |
| "required": ["url", "goal"], |
| }, |
| } |
| ] |
|
|
| EXECUTOR_URL = os.environ["HOST_SERPER_URL"] |
| DATA_ROOT = pathlib.Path("./eval_datasets") |
| SEM = threading.Semaphore(3) |
| JUDGE_MODEL = "gpt-4.1-mini" |
|
|
| try: |
| base = Path(__file__).resolve().parent |
| except NameError: |
| base = Path.cwd() |
| |
| TOKENIZER_DIR = (base / "tokenizer-info").resolve() |
|
|
| |
| try: |
| from transformers import AutoTokenizer |
| tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR, trust_remote_code=True) |
| except Exception as e: |
| import sys |
| sys.exit(f"❌ Could not load Qwen3 tokenizer: {e}") |
|
|
| import hashlib |
|
|
| def get_uid(sample: dict) -> str: |
| """Generate a UID using SHA256 hash of question.""" |
| return hashlib.sha256(sample["question"].strip().encode("utf-8")).hexdigest() |
|
|
| |
| |
| |
| def extract_answer_tagged(text: str) -> str: |
|
|
| ANS_RE = re.compile(r"<answer>(.*?)</answer>", re.S) |
| match = ANS_RE.findall(text) |
| if match : |
| return match[-1].strip().lower() |
| else: |
| print("No answer tags found") |
| return text[-200:] |
|
|
| def extract_answer_boxed(response): |
| def remove_boxed(s): |
| if "\\boxed " in s: |
| left = "\\boxed " |
| assert s[:len(left)] == left |
| return s[len(left):] |
|
|
| left = "\\boxed{" |
|
|
| assert s[:len(left)] == left |
| assert s[-1] == "}" |
|
|
| return s[len(left):-1] |
|
|
| def last_boxed_only_string(string): |
| idx = string.rfind("\\boxed") |
| if "\\boxed " in string: |
| return "\\boxed " + string.split("\\boxed ")[-1].split("$")[0] |
| if idx < 0: |
| idx = string.rfind("\\fbox") |
| if idx < 0: |
| return None |
|
|
| i = idx |
| right_brace_idx = None |
| num_left_braces_open = 0 |
| while i < len(string): |
| if string[i] == "{": |
| num_left_braces_open += 1 |
| if string[i] == "}": |
| num_left_braces_open -= 1 |
| if num_left_braces_open == 0: |
| right_brace_idx = i |
| break |
| i += 1 |
|
|
| if right_brace_idx is None: |
| retval = None |
| else: |
| retval = string[idx:right_brace_idx + 1] |
|
|
| return retval |
| answer = remove_boxed(last_boxed_only_string(response)) |
| return answer |
|
|
|
|
|
|
| JUDGE_SYS = """ |
| You are an impartial judge evaluating the correctness of a model's answer against a ground-truth answer for a given question. Your task is to: |
| 1. Compare the model's answer to the ground-truth answer. |
| 2. Determine if the model's answer is correct or incorrect. |
| |
| **Input Format:** |
| - Question: {question} |
| - Ground Truth: {ground_truth} |
| - Model Answer: {model_answer} |
| |
| **Output Format:** |
| correct/incorrect/unknown |
| |
| **Guidelines:** |
| - The model's answer is correct if it matches the ground-truth answer in meaning and content it is case-insensitive, ignore minor punctuation or formatting differences. |
| - If the model's answer contains additional information, it is still correct as long as the core answer matches the ground truth. |
| - Be precise output a single word correct / incorrect / unknown and **nothing else** |
| - For MCQ questions match the option ID A. B. C. or D. if its correct the answer is correct. |
| """ |
| |
|
|
|
|
| |
|
|
| def _oa() -> OpenAI: |
| th = threading.current_thread() |
| if not hasattr(th, "_oa"): |
| th._oa = OpenAI() |
| return th._oa |
|
|
|
|
| def judge(q: str, gt: str, pred: str) -> str: |
| if pred == "": |
| return "unknown" |
| prompt = JUDGE_SYS.format(question=q, ground_truth=gt, model_answer=pred) |
| try: |
| with SEM: |
| resp = _oa().chat.completions.create( |
| model=JUDGE_MODEL, |
| messages=[ |
| {"role": "system", "content": JUDGE_SYS}, |
| {"role": "user", "content": prompt}, |
| ], |
| temperature=0.0, |
| max_tokens=100, |
| ) |
| return resp.choices[0].message.content.strip().lower() |
| except APIStatusError: |
| return "unknown" |
|
|
|
|
| |
| |
| |
| def build_agent(kind: str, model_url: str): |
| kind = kind.lower() |
| print(kind) |
| if kind == "recall": |
| return ReCall(executor_url=EXECUTOR_URL) |
| else: |
| raise ValueError(f"Unknown agent kind: {kind}") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| def evaluate_example(example: Dict[str, str], agent_kind: str, model_url: str) -> Dict[str, str]: |
| """Run one example through the pipeline and return result row.""" |
| question = example["question"].strip() |
| answer_gt = example["answer"].strip() |
| idx = example["id"].strip() |
|
|
|
|
| |
| agent = build_agent(agent_kind, model_url=model_url) |
|
|
| if agent_kind == "recall" and model_url == "deepseek-ai/DeepSeek-R1": |
| |
| |
| transcript, tool_calls = agent.run_deepseek( |
| env=search_env, |
| func_schemas=search_schemas, |
| question=question, |
| model_name="deepseek-ai/DeepSeek-R1", |
| temperature=0.6, |
| max_tokens=40960, |
| |
| ) |
| elif agent_kind == "recall": |
| transcript, tool_calls, chat = agent.run( |
| env=search_env, |
| func_schemas=search_schemas, |
| question=question, |
| model_url=model_url, |
| temperature=0.6, |
| max_new_tokens=40960, |
| tokenizer = tokenizer |
| ) |
| |
| else: |
| transcript, tool_calls = agent.run(question) |
|
|
| if agent_kind in [ |
| "r1-searcher", |
| "zerosearch", |
| |
| ]: |
| pred = extract_answer_tagged(transcript) |
| if agent_kind in [ |
| "recall", |
| "SDS" |
| "o1-searcher" |
| ]: |
| try: |
| pred = extract_answer_boxed(transcript) |
| except: |
| print("falling to last string") |
| pred = transcript[-200:] |
| else: |
| try: |
| pred = extract_answer_boxed(transcript) |
| except: |
| print("falling to last string") |
| pred = transcript[-200:] |
|
|
| verdict = judge(question, answer_gt.lower(), pred.lower()) |
|
|
| return { |
| "id": idx, |
| "question": question, |
| "answer_gt": answer_gt, |
| "model_answer": pred, |
| "judge": verdict, |
| "tool_calls": tool_calls, |
| "transcript": transcript, |
| "chat": chat |
| } |
|
|
| |
| |
| |
| def build_output_path(out_base, agent, dataset, name) -> pathlib.Path: |
| """Construct output path as {out_base}/{model_name}/{dataset}.jsonl.""" |
| return out_base / f"{agent}" / f"{dataset}-{name}.jsonl" |
|
|
| def normalize(s: str) -> str: |
| return unicodedata.normalize("NFKD", s.strip().lower()) |
|
|
| def load_existing_results(path: pathlib.Path) -> tuple[list[dict], set[str]]: |
| results = [] |
| uids = set() |
| if not path.exists(): |
| return results, uids |
| with open(path, "r", encoding="utf-8") as f: |
| for line in f: |
| try: |
| row = json.loads(line) |
| if row['model_answer'] != "": |
| results.append(row) |
| uids.add(row["id"]) |
| except Exception: |
| continue |
| return results, uids |
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Benchmark QA agents on a dataset (single or multi‑threaded)") |
| parser.add_argument("--dataset", required=True, help="dataset name (frames, …)") |
| parser.add_argument("--agent", required=True, choices=["recall", "zerosearch", "r1-searcher", "o1-search", "SDS", "deepseek-r1"], help="agent wrapper") |
| parser.add_argument("--out", required=True, help="base directory for outputs") |
| parser.add_argument("--model-url", required=False, help="URL of the model server") |
| parser.add_argument("--limit", type=int, default=0, help="optional cap on number of questions") |
| parser.add_argument("--mode", choices=["single", "multi"], default="single", help="execution mode") |
| parser.add_argument("--workers", type=int, default=8, help="number of worker threads for multi‑mode") |
| parser.add_argument("--name", type=str, default="", help="suffix for save dir") |
|
|
| args = parser.parse_args() |
|
|
| |
| |
| |
| ds_path = DATA_ROOT / f"{args.dataset}.jsonl" |
| if not ds_path.exists(): |
| raise FileNotFoundError(ds_path) |
|
|
| with ds_path.open() as f: |
| data = [json.loads(line) for line in f] |
|
|
| |
| |
| |
| out_base = pathlib.Path(args.out).expanduser().resolve() |
| out_path = build_output_path(out_base, args.agent, args.dataset, args.name) |
| print(out_path) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| if args.limit: |
| data = data[: args.limit] |
| |
|
|
| correct = 0 |
| start_time = time.perf_counter() |
|
|
|
|
| |
| |
| |
| if args.mode == "single": |
| with open(out_path, "w", encoding="utf-8") as fout: |
| for ex in tqdm(data, desc="QA loop (single)"): |
|
|
| row = evaluate_example(ex, args.agent, args.model_url) |
| if row["judge"] == "correct": |
| correct += 1 |
| |
| row.update({"agent": args.agent, "dataset": args.dataset}) |
| fout.write(json.dumps(row, ensure_ascii=False) + "\n") |
| fout.flush() |
|
|
| |
| |
| |
| else: |
| workers = max(1, args.workers) |
| logging.info("Running in multi‑threaded mode with %d workers", workers) |
| with ThreadPoolExecutor(max_workers=workers) as executor, open(out_path, "a", encoding="utf-8") as fout: |
| futures = {executor.submit(evaluate_example, ex, args.agent, args.model_url): ex for ex in data} |
| for fut in tqdm(as_completed(futures), total=len(futures), desc="QA loop (multi)"): |
| try: |
| row = fut.result() |
| except Exception as exc: |
| logging.exception("Evaluation failed: %s", exc) |
| continue |
| |
| if row["judge"] == "correct": |
| correct += 1 |
| row.update({"agent": args.agent, "dataset": args.dataset}) |
| fout.write(json.dumps(row, ensure_ascii=False) + "\n") |
| fout.flush() |
|
|
| elapsed = time.perf_counter() - start_time |
| accuracy = correct / len(data) if data else 0.0 |
| print(f"Accuracy: {correct}/{len(data)} = {accuracy:.1%}") |
| print(f"Elapsed time: {elapsed:.2f}s ({elapsed/len(data):.2f}s per example)") |
|
|
|
|
| if __name__ == "__main__": |
| main() |