""" Toplines RAG Module ------------------- Retrieves topline response frequency data from Pinecone vectorstore. Uses question_info for precise metadata filtering. Returns raw data only - no synthesis. """ import os from typing import Any, Dict, List, Optional from pathlib import Path from dotenv import load_dotenv from langchain_openai import OpenAIEmbeddings from langchain_pinecone import PineconeVectorStore from pinecone import Pinecone load_dotenv() class ToplinesRAG: """Toplines RAG with question_info-based metadata filtering.""" def __init__( self, index_name: Optional[str] = None, llm_model: str = "gpt-4-turbo", verbose: bool = False ): self.index_name = index_name or os.getenv("PINECONE_INDEX_NAME_TOPLINES", "toplines-index") self.namespace = os.getenv("PINECONE_NAMESPACE") or None self.verbose = verbose self.openai_api_key = os.getenv("OPENAI_API_KEY") if not self.openai_api_key: raise ValueError("OPENAI_API_KEY not set") pinecone_api_key = os.getenv("PINECONE_API_KEY") if not pinecone_api_key: raise ValueError("PINECONE_API_KEY not set") self.embeddings = OpenAIEmbeddings( model=os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small") ) self.pc = Pinecone(api_key=pinecone_api_key) self.index = self.pc.Index(self.index_name) self.vector_store = PineconeVectorStore( index=self.index, embedding=self.embeddings, namespace=self.namespace ) def _build_filter_from_question_info(self, question_info_list: List[Dict[str, Any]]) -> Optional[Dict]: """ Build Pinecone filter from question_info list. Matches on variable + year + month combination (no poll_date). """ if not question_info_list: return None # Build filter conditions for each question_info filter_clauses = [] for q_info in question_info_list: conditions = [] var_name = q_info.get("variable_name") if var_name: # Match on "variable" field (Pinecone stores short code like "VAND5" in "variable" field) # Also check "variable_name" as fallback var_conditions = [ {"variable": {"$eq": var_name}}, {"variable_name": {"$eq": var_name}} ] conditions.append({"$or": var_conditions}) year = q_info.get("year") if year: # Pinecone stores year as integer conditions.append({"year": {"$eq": int(year)}}) month = q_info.get("month") if month: # Pinecone stores month as string (capitalized like "March", "June") # Ensure month is capitalized to match Pinecone format month_str = str(month).capitalize() conditions.append({"month": {"$eq": month_str}}) if conditions: # Combine conditions with $and for this question if len(conditions) == 1: filter_clauses.append(conditions[0]) else: filter_clauses.append({"$and": conditions}) if not filter_clauses: return None # Combine all question filters with $or if len(filter_clauses) == 1: return filter_clauses[0] else: return {"$or": filter_clauses} def _build_filter_from_filters(self, filters: Dict[str, Any]) -> Optional[Dict]: """Build Pinecone filter from filters dict (for direct queries without question_info)""" if not filters: return None # Only use year and month (no poll_date) VALID_FILTER_FIELDS = {"year", "month", "survey_name"} valid_filters = {k: v for k, v in filters.items() if k in VALID_FILTER_FIELDS and v is not None} if not valid_filters: return None clauses = [] for k, v in valid_filters.items(): if k == "year": # Pinecone stores year as integer clauses.append({k: {"$eq": int(v)}}) elif k == "month": # Pinecone stores month as string (capitalized) clauses.append({k: {"$eq": str(v).capitalize()}}) else: # survey_name as string clauses.append({k: {"$eq": str(v)}}) return {"$and": clauses} if len(clauses) > 1 else clauses[0] def retrieve_raw_data( self, query: str, question_info: Optional[List[Dict[str, Any]]] = None, source_questions: Optional[List[Dict[str, Any]]] = None, filters: Optional[Dict[str, Any]] = None, top_k: int = 10 ) -> Dict[str, Any]: """ Retrieve raw topline data. Uses question_info for metadata filtering if provided, otherwise uses filters. Falls back to semantic search if metadata filtering returns no results. Args: query: User's query (used for semantic search fallback) question_info: List of question info dicts with variable_name, year, month, poll_date source_questions: Optional list of full question dicts from previous stage (for reference) filters: Optional filters dict (used if question_info not provided) top_k: Number of results to retrieve Returns: Dict with 'retrieved_docs', 'num_sources', 'filters_applied', 'source_questions' """ if self.verbose: print(f"\nšŸ“Š [Toplines] Query: {query}") if question_info: print(f"šŸ” Question info: {len(question_info)} question(s)") if filters: print(f"šŸ” Filters: {filters}") # Build filter from question_info (preferred) or filters pinecone_filter = None if question_info: pinecone_filter = self._build_filter_from_question_info(question_info) elif filters: pinecone_filter = self._build_filter_from_filters(filters) # Try metadata filtering first docs = [] if pinecone_filter: if self.verbose: print(f"šŸ”§ Using metadata filter: {pinecone_filter}") docs = self.vector_store.similarity_search(query, k=top_k, filter=pinecone_filter) if self.verbose: print(f"šŸ“„ Retrieved {len(docs)} documents with metadata filter") # Fallback to semantic search if no results if not docs: if self.verbose: print(f"āš ļø No results with metadata filter, falling back to semantic search") docs = self.vector_store.similarity_search(query, k=top_k * 2) if self.verbose: print(f"šŸ“„ Retrieved {len(docs)} documents with semantic search") return { "retrieved_docs": docs, "num_sources": len(docs), "filters_applied": filters or {}, "question_info_used": question_info or [], "source_questions": source_questions or [] }