""" Conversation Relevance Checker ------------------------------- Determines if current question is related to previous conversation and identifies what data can be reused to minimize redundant API calls. """ import os from typing import List, Dict, Any, Optional, Literal from pathlib import Path from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from pydantic import BaseModel, Field def _load_prompt_file(filename: str) -> str: """Load a prompt file from the prompts directory""" prompt_dir = Path(__file__).parent / "prompts" prompt_path = prompt_dir / filename if not prompt_path.exists(): raise FileNotFoundError(f"Prompt file not found: {prompt_path}") return prompt_path.read_text(encoding="utf-8") class ReusableData(BaseModel): """Indicates what data can be reused from previous conversation""" questions: bool = False toplines: bool = False crosstabs: bool = False class RelevanceResult(BaseModel): """Structured relevance assessment result""" is_related: bool relation_type: Literal[ "same_topic_different_demo", "same_topic_different_time", "trend_analysis", "new_topic" ] reusable_data: ReusableData time_period_changed: bool reasoning: str class ConversationRelevanceChecker: """ Checks relevance between current question and conversation history. Uses LLM to determine if previous data can be reused. """ def __init__(self, llm, verbose: bool = False): """ Initialize relevance checker. Args: llm: LangChain LLM instance (ChatOpenAI) verbose: Whether to print debug information """ self.llm = llm self.verbose = verbose # Load relevance check prompt try: self.prompt_template = _load_prompt_file("relevance_check_prompt.txt") except FileNotFoundError: # Fallback to inline prompt if file doesn't exist yet self.prompt_template = self._get_default_prompt() def _get_default_prompt(self) -> str: """Fallback prompt template if file doesn't exist""" return """You are analyzing conversation continuity in a multi-turn survey data analysis system. Your task: Determine if the current question is related to previous conversation and what data can be reused. ## CONVERSATION HISTORY {conversation_summary} ## PREVIOUSLY RETRIEVED DATA {previous_data_summary} ## CURRENT QUESTION {current_question} ## ANALYSIS REQUIRED 1. **Is the current question related to the previous conversation?** - YES if: Same topic, same questions, same time period (even if different demographic) - YES if: Asking for trend/analysis of already-shown data - NO if: Completely different topic - NO if: Same topic but different time period (e.g., June 2025 → February 2025) 2. **Relation Type** (if related): - `same_topic_different_demo`: Same topic/questions, asking for different demographic breakdown - `trend_analysis`: Asking for analysis/trends from already-retrieved data - `same_topic_different_time`: Same topic but different time period - `new_topic`: Completely different topic 3. **Reusable Data**: - `questions`: true if same questions can be reused (same topic, same time period) - `toplines`: true if overall frequencies already retrieved and still relevant - `crosstabs`: true if demographic breakdowns already retrieved and still relevant 4. **Time Period Changed**: - true if current question asks about different year/month than previous - false if time period is same or not specified Respond with structured output.""" def _build_conversation_summary(self, conversation_history: List) -> str: """Build a summary of conversation history for the prompt""" summary_lines = [] for msg in conversation_history: if isinstance(msg, HumanMessage): summary_lines.append(f"USER: {msg.content}") elif isinstance(msg, AIMessage): # Truncate long AI responses content = msg.content if len(content) > 300: content = content[:300] + "... (truncated)" summary_lines.append(f"ASSISTANT: {content}") return "\n".join(summary_lines) if summary_lines else "No previous conversation" def _build_previous_data_summary(self, previous_stage_results: List) -> str: """Build a summary of previously retrieved data""" if not previous_stage_results: return "No previous data retrieved" summary_lines = [] for i, stage_result in enumerate(previous_stage_results, 1): summary_lines.append(f"Stage {i}:") # Questionnaire results if stage_result.questionnaire_results: q_res = stage_result.questionnaire_results num_questions = len(q_res.get("source_questions", [])) question_info = q_res.get("question_info", []) if question_info: sample_vars = [q.get("variable_name", "unknown") for q in question_info[:3]] sample_vars_str = ", ".join(sample_vars) if len(question_info) > 3: sample_vars_str += f" ... and {len(question_info) - 3} more" # Extract time period info time_info = [] if question_info[0].get("year"): time_info.append(str(question_info[0]["year"])) if question_info[0].get("month"): time_info.append(question_info[0]["month"]) time_str = " ".join(time_info) if time_info else "unspecified time" summary_lines.append(f" - Retrieved {num_questions} question(s) from {time_str}") summary_lines.append(f" - Variables: {sample_vars_str}") # Toplines results if stage_result.toplines_results: t_res = stage_result.toplines_results num_docs = len(t_res.get("retrieved_docs", [])) summary_lines.append(f" - Retrieved {num_docs} topline document(s)") # Crosstabs results if stage_result.crosstabs_results: c_res = stage_result.crosstabs_results if "crosstab_docs_by_variable" in c_res: num_vars = len(c_res["crosstab_docs_by_variable"]) summary_lines.append(f" - Retrieved crosstabs for {num_vars} variable(s)") return "\n".join(summary_lines) if summary_lines else "No data summary available" def check_relevance( self, current_question: str, conversation_history: List, previous_stage_results: List ) -> Dict[str, Any]: """ Check relevance of current question to previous conversation. Args: current_question: The current user question conversation_history: List of previous messages (HumanMessage, AIMessage) previous_stage_results: List of StageResult objects from previous turns Returns: Dict with relevance assessment (is_related, relation_type, reusable_data, etc.) """ if self.verbose: print("\nšŸ” Checking conversation relevance...") # Build prompt inputs conversation_summary = self._build_conversation_summary(conversation_history) previous_data_summary = self._build_previous_data_summary(previous_stage_results) # Use simple string replacement instead of .format() to avoid issues with curly braces prompt = self.prompt_template.replace("{conversation_summary}", conversation_summary) prompt = prompt.replace("{previous_data_summary}", previous_data_summary) prompt = prompt.replace("{current_question}", current_question) # Get structured output from LLM try: relevance_checker = self.llm.with_structured_output(RelevanceResult) result = relevance_checker.invoke([ SystemMessage(content="You are a conversation continuity analyzer for survey data systems."), HumanMessage(content=prompt) ]) if self.verbose: print(f" Related: {result.is_related}") print(f" Type: {result.relation_type}") print(f" Reusable: questions={result.reusable_data.questions}, " f"toplines={result.reusable_data.toplines}, " f"crosstabs={result.reusable_data.crosstabs}") print(f" Time changed: {result.time_period_changed}") print(f" Reasoning: {result.reasoning}") return { "is_related": result.is_related, "relation_type": result.relation_type, "reusable_data": { "questions": result.reusable_data.questions, "toplines": result.reusable_data.toplines, "crosstabs": result.reusable_data.crosstabs }, "time_period_changed": result.time_period_changed, "reasoning": result.reasoning } except Exception as e: if self.verbose: print(f" āš ļø Error checking relevance: {e}") # Return safe default (treat as new topic) return { "is_related": False, "relation_type": "new_topic", "reusable_data": { "questions": False, "toplines": False, "crosstabs": False }, "time_period_changed": False, "reasoning": f"Error during relevance check: {str(e)}" }