Spaces:
Running
Running
| """ | |
| Questionnaire RAG Module | |
| ------------------------ | |
| Retrieves survey questions from Pinecone vectorstore. | |
| Metadata filtering first, semantic search fallback. | |
| Returns raw data only - no synthesis. | |
| """ | |
| import os | |
| import json | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_pinecone import PineconeVectorStore | |
| from pinecone import Pinecone | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass | |
| class QuestionInfo: | |
| """Structured question information for cross-pipeline coordination.""" | |
| def __init__(self, variable_name: str, year: Optional[int] = None, | |
| month: Optional[str] = None, poll_date: Optional[str] = None, | |
| question_id: Optional[str] = None): | |
| self.variable_name = variable_name | |
| self.year = year | |
| self.month = month | |
| self.poll_date = poll_date | |
| self.question_id = question_id | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "variable_name": self.variable_name, | |
| "year": self.year, | |
| "month": self.month, | |
| "poll_date": self.poll_date, | |
| "question_id": self.question_id | |
| } | |
| class QuestionnaireRAG: | |
| """Questionnaire RAG with metadata-first filtering.""" | |
| def __init__( | |
| self, | |
| openai_api_key: str, | |
| pinecone_api_key: str, | |
| persist_directory: str = "./questionnaire_vectorstores", | |
| verbose: bool = False | |
| ): | |
| self.openai_api_key = openai_api_key | |
| self.pinecone_api_key = pinecone_api_key | |
| self.persist_directory = persist_directory | |
| self.verbose = verbose | |
| # Initialize embeddings | |
| self.embeddings = OpenAIEmbeddings( | |
| model=os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small") | |
| ) | |
| # Connect to Pinecone | |
| index_name = os.getenv("PINECONE_INDEX_NAME", "poll-questionnaire-index") | |
| namespace = os.getenv("PINECONE_NAMESPACE") or None | |
| pc = Pinecone(api_key=self.pinecone_api_key) | |
| self.index = pc.Index(index_name) | |
| self.vectorstore = PineconeVectorStore( | |
| index=self.index, | |
| embedding=self.embeddings, | |
| namespace=namespace | |
| ) | |
| # Load catalog and questions | |
| self.poll_catalog = self._load_catalog() | |
| self.questions_by_id = self._load_questions_index() | |
| if self.verbose: | |
| print(f"β Loaded {len(self.questions_by_id)} questions from {len(self.poll_catalog)} polls") | |
| def _load_catalog(self) -> Dict[str, Dict]: | |
| """Load poll catalog""" | |
| catalog_path = Path(self.persist_directory) / "poll_catalog.json" | |
| if not catalog_path.exists(): | |
| # Try parent directory if not found | |
| parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "poll_catalog.json" | |
| if parent_path.exists(): | |
| catalog_path = parent_path | |
| else: | |
| return {} | |
| with open(catalog_path, 'r') as f: | |
| return json.load(f) | |
| def _load_questions_index(self) -> Dict[str, Dict]: | |
| """Load questions index""" | |
| questions_path = Path(self.persist_directory) / "questions_index.json" | |
| if not questions_path.exists(): | |
| # Try parent directory if not found | |
| parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "questions_index.json" | |
| if parent_path.exists(): | |
| questions_path = parent_path | |
| else: | |
| return {} | |
| with open(questions_path, 'r') as f: | |
| return json.load(f) | |
| def _fuzzy_match_survey_name(self, requested_name: str) -> Optional[str]: | |
| """Fuzzy match survey name""" | |
| available_names = set() | |
| for info in self.poll_catalog.values(): | |
| available_names.add(info["survey_name"]) | |
| normalized_requested = requested_name.lower().replace("_", " ").replace("-", " ") | |
| for stored_name in available_names: | |
| normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ") | |
| if normalized_requested == normalized_stored: | |
| return stored_name | |
| if normalized_requested in normalized_stored or normalized_stored in normalized_requested: | |
| return stored_name | |
| requested_words = set(normalized_requested.split()) | |
| for stored_name in available_names: | |
| normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ") | |
| stored_words = set(normalized_stored.split()) | |
| if requested_words.issubset(stored_words): | |
| return stored_name | |
| return None | |
| def _build_pinecone_filter(self, filters: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Build Pinecone metadata filter""" | |
| if not filters: | |
| return None | |
| filter_conditions = [] | |
| if "year" in filters and filters["year"] is not None: | |
| year = int(filters["year"]) if isinstance(filters["year"], str) else filters["year"] | |
| filter_conditions.append({"year": {"$eq": year}}) | |
| if "month" in filters and filters["month"] is not None: | |
| month = filters["month"].capitalize() | |
| filter_conditions.append({"month": {"$eq": month}}) | |
| if "poll_date" in filters and filters["poll_date"] is not None: | |
| filter_conditions.append({"poll_date": {"$eq": filters["poll_date"]}}) | |
| if "survey_name" in filters and filters["survey_name"] is not None: | |
| matched_name = self._fuzzy_match_survey_name(filters["survey_name"]) | |
| if matched_name: | |
| filter_conditions.append({"survey_name": {"$eq": matched_name}}) | |
| if "question_ids" in filters and filters["question_ids"]: | |
| question_ids = filters["question_ids"] | |
| if isinstance(question_ids, list) and len(question_ids) > 0: | |
| if len(question_ids) == 1: | |
| filter_conditions.append({"question_id": {"$eq": question_ids[0]}}) | |
| else: | |
| filter_conditions.append({"question_id": {"$in": question_ids}}) | |
| if "topic" in filters and filters["topic"]: | |
| topic = filters["topic"].lower() | |
| filter_conditions.append({"topics": {"$in": [topic]}}) | |
| if len(filter_conditions) == 0: | |
| return None | |
| elif len(filter_conditions) == 1: | |
| return filter_conditions[0] | |
| else: | |
| return {"$and": filter_conditions} | |
| def retrieve_raw_data( | |
| self, | |
| question: str, | |
| filters: Optional[Dict[str, Any]] = None, | |
| k: int = 20 | |
| ) -> Dict[str, Any]: | |
| """ | |
| Retrieve raw questionnaire data. | |
| Metadata filtering first, semantic search fallback. | |
| Returns: | |
| Dict with 'source_questions', 'num_sources', 'filters_applied', 'question_info' | |
| """ | |
| if self.verbose: | |
| print(f"\nπ [Questionnaire] Query: {question}") | |
| if filters: | |
| print(f"π Filters: {filters}") | |
| # Build Pinecone filter | |
| pinecone_filter = self._build_pinecone_filter(filters or {}) | |
| # Try metadata filtering first | |
| docs = [] | |
| if pinecone_filter: | |
| if self.verbose: | |
| print(f"π§ Using metadata filter: {pinecone_filter}") | |
| retriever = self.vectorstore.as_retriever( | |
| search_kwargs={"k": k, "filter": pinecone_filter} | |
| ) | |
| docs = retriever.invoke(question) | |
| if self.verbose: | |
| print(f"π₯ Retrieved {len(docs)} documents with metadata filter") | |
| # Fallback to semantic search if no results | |
| if not docs: | |
| if self.verbose: | |
| print(f"β οΈ No results with metadata filter, falling back to semantic search") | |
| retriever = self.vectorstore.as_retriever(search_kwargs={"k": k * 2}) | |
| docs = retriever.invoke(question) | |
| if self.verbose: | |
| print(f"π₯ Retrieved {len(docs)} documents with semantic search") | |
| if not docs: | |
| return { | |
| "source_questions": [], | |
| "num_sources": 0, | |
| "filters_applied": filters or {}, | |
| "question_info": [] | |
| } | |
| # Reconstruct full questions and extract question_info | |
| full_questions = [] | |
| seen_ids = set() | |
| question_info_list = [] | |
| for doc in docs: | |
| q_id = doc.metadata.get('question_id') | |
| if q_id and q_id not in seen_ids: | |
| if q_id in self.questions_by_id: | |
| q_data = self.questions_by_id[q_id] | |
| full_questions.append(q_data) | |
| seen_ids.add(q_id) | |
| # Extract question_info | |
| question_info_list.append(QuestionInfo( | |
| variable_name=q_data.get("variable_name", ""), | |
| year=q_data.get("year"), | |
| month=q_data.get("month", ""), | |
| poll_date=q_data.get("poll_date", ""), | |
| question_id=q_id | |
| )) | |
| # Sort by position | |
| full_questions.sort(key=lambda q: (q.get('poll_date', ''), q.get('position', 0))) | |
| if self.verbose: | |
| print(f"β Extracted {len(question_info_list)} question info entries") | |
| return { | |
| 'source_questions': full_questions, | |
| 'num_sources': len(full_questions), | |
| 'filters_applied': filters or {}, | |
| 'question_info': [q.to_dict() for q in question_info_list] | |
| } | |
| def get_available_survey_names(self) -> List[str]: | |
| """Get list of unique survey names""" | |
| survey_names = set() | |
| for info in self.poll_catalog.values(): | |
| survey_names.add(info["survey_name"]) | |
| return sorted(survey_names) | |
| def get_available_polls(self) -> List[Dict[str, Any]]: | |
| """Get list of all available polls""" | |
| return [ | |
| { | |
| "poll_date": poll_date, | |
| "survey_name": info["survey_name"], | |
| "year": info["year"], | |
| "month": info.get("month", ""), | |
| "num_questions": info["num_questions"] | |
| } | |
| for poll_date, info in sorted(self.poll_catalog.items()) | |
| ] | |