""" Questionnaire RAG Module ------------------------ Retrieves survey questions from Pinecone vectorstore. Metadata filtering first, semantic search fallback. Returns raw data only - no synthesis. """ import os import json from typing import List, Dict, Any, Optional from pathlib import Path from langchain_openai import OpenAIEmbeddings from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone try: from dotenv import load_dotenv load_dotenv() except ImportError: pass class QuestionInfo: """Structured question information for cross-pipeline coordination.""" def __init__(self, variable_name: str, year: Optional[int] = None, month: Optional[str] = None, poll_date: Optional[str] = None, question_id: Optional[str] = None): self.variable_name = variable_name self.year = year self.month = month self.poll_date = poll_date self.question_id = question_id def to_dict(self) -> Dict[str, Any]: return { "variable_name": self.variable_name, "year": self.year, "month": self.month, "poll_date": self.poll_date, "question_id": self.question_id } class QuestionnaireRAG: """Questionnaire RAG with metadata-first filtering.""" def __init__( self, openai_api_key: str, pinecone_api_key: str, persist_directory: str = "./questionnaire_vectorstores", verbose: bool = False ): self.openai_api_key = openai_api_key self.pinecone_api_key = pinecone_api_key self.persist_directory = persist_directory self.verbose = verbose # Initialize embeddings self.embeddings = OpenAIEmbeddings( model=os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small") ) # Connect to Pinecone index_name = os.getenv("PINECONE_INDEX_NAME", "poll-questionnaire-index") namespace = os.getenv("PINECONE_NAMESPACE") or None pc = Pinecone(api_key=self.pinecone_api_key) self.index = pc.Index(index_name) self.vectorstore = PineconeVectorStore( index=self.index, embedding=self.embeddings, namespace=namespace ) # Load catalog and questions self.poll_catalog = self._load_catalog() self.questions_by_id = self._load_questions_index() if self.verbose: print(f"āœ“ Loaded {len(self.questions_by_id)} questions from {len(self.poll_catalog)} polls") def _load_catalog(self) -> Dict[str, Dict]: """Load poll catalog""" catalog_path = Path(self.persist_directory) / "poll_catalog.json" if not catalog_path.exists(): # Try parent directory if not found parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "poll_catalog.json" if parent_path.exists(): catalog_path = parent_path else: return {} with open(catalog_path, 'r') as f: return json.load(f) def _load_questions_index(self) -> Dict[str, Dict]: """Load questions index""" questions_path = Path(self.persist_directory) / "questions_index.json" if not questions_path.exists(): # Try parent directory if not found parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "questions_index.json" if parent_path.exists(): questions_path = parent_path else: return {} with open(questions_path, 'r') as f: return json.load(f) def _fuzzy_match_survey_name(self, requested_name: str) -> Optional[str]: """Fuzzy match survey name""" available_names = set() for info in self.poll_catalog.values(): available_names.add(info["survey_name"]) normalized_requested = requested_name.lower().replace("_", " ").replace("-", " ") for stored_name in available_names: normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ") if normalized_requested == normalized_stored: return stored_name if normalized_requested in normalized_stored or normalized_stored in normalized_requested: return stored_name requested_words = set(normalized_requested.split()) for stored_name in available_names: normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ") stored_words = set(normalized_stored.split()) if requested_words.issubset(stored_words): return stored_name return None def _build_pinecone_filter(self, filters: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Build Pinecone metadata filter""" if not filters: return None filter_conditions = [] if "year" in filters and filters["year"] is not None: year = int(filters["year"]) if isinstance(filters["year"], str) else filters["year"] filter_conditions.append({"year": {"$eq": year}}) if "month" in filters and filters["month"] is not None: month = filters["month"].capitalize() filter_conditions.append({"month": {"$eq": month}}) if "poll_date" in filters and filters["poll_date"] is not None: filter_conditions.append({"poll_date": {"$eq": filters["poll_date"]}}) if "survey_name" in filters and filters["survey_name"] is not None: matched_name = self._fuzzy_match_survey_name(filters["survey_name"]) if matched_name: filter_conditions.append({"survey_name": {"$eq": matched_name}}) if "question_ids" in filters and filters["question_ids"]: question_ids = filters["question_ids"] if isinstance(question_ids, list) and len(question_ids) > 0: if len(question_ids) == 1: filter_conditions.append({"question_id": {"$eq": question_ids[0]}}) else: filter_conditions.append({"question_id": {"$in": question_ids}}) if "topic" in filters and filters["topic"]: topic = filters["topic"].lower() filter_conditions.append({"topics": {"$in": [topic]}}) if len(filter_conditions) == 0: return None elif len(filter_conditions) == 1: return filter_conditions[0] else: return {"$and": filter_conditions} def retrieve_raw_data( self, question: str, filters: Optional[Dict[str, Any]] = None, k: int = 20 ) -> Dict[str, Any]: """ Retrieve raw questionnaire data. Metadata filtering first, semantic search fallback. Returns: Dict with 'source_questions', 'num_sources', 'filters_applied', 'question_info' """ if self.verbose: print(f"\nšŸ“Š [Questionnaire] Query: {question}") if filters: print(f"šŸ” Filters: {filters}") # Build Pinecone filter pinecone_filter = self._build_pinecone_filter(filters or {}) # Try metadata filtering first docs = [] if pinecone_filter: if self.verbose: print(f"šŸ”§ Using metadata filter: {pinecone_filter}") retriever = self.vectorstore.as_retriever( search_kwargs={"k": k, "filter": pinecone_filter} ) docs = retriever.invoke(question) if self.verbose: print(f"šŸ“„ Retrieved {len(docs)} documents with metadata filter") # Fallback to semantic search if no results if not docs: if self.verbose: print(f"āš ļø No results with metadata filter, falling back to semantic search") retriever = self.vectorstore.as_retriever(search_kwargs={"k": k * 2}) docs = retriever.invoke(question) if self.verbose: print(f"šŸ“„ Retrieved {len(docs)} documents with semantic search") if not docs: return { "source_questions": [], "num_sources": 0, "filters_applied": filters or {}, "question_info": [] } # Reconstruct full questions and extract question_info full_questions = [] seen_ids = set() question_info_list = [] for doc in docs: q_id = doc.metadata.get('question_id') if q_id and q_id not in seen_ids: if q_id in self.questions_by_id: q_data = self.questions_by_id[q_id] full_questions.append(q_data) seen_ids.add(q_id) # Extract question_info question_info_list.append(QuestionInfo( variable_name=q_data.get("variable_name", ""), year=q_data.get("year"), month=q_data.get("month", ""), poll_date=q_data.get("poll_date", ""), question_id=q_id )) # Sort by position full_questions.sort(key=lambda q: (q.get('poll_date', ''), q.get('position', 0))) if self.verbose: print(f"āœ… Extracted {len(question_info_list)} question info entries") return { 'source_questions': full_questions, 'num_sources': len(full_questions), 'filters_applied': filters or {}, 'question_info': [q.to_dict() for q in question_info_list] } def get_available_survey_names(self) -> List[str]: """Get list of unique survey names""" survey_names = set() for info in self.poll_catalog.values(): survey_names.add(info["survey_name"]) return sorted(survey_names) def get_available_polls(self) -> List[Dict[str, Any]]: """Get list of all available polls""" return [ { "poll_date": poll_date, "survey_name": info["survey_name"], "year": info["year"], "month": info.get("month", ""), "num_questions": info["num_questions"] } for poll_date, info in sorted(self.poll_catalog.items()) ]