survey-analytics / questionnaire_rag.py
umangchaudhry's picture
Upload 20 files
cc2626e verified
"""
Questionnaire RAG Module
------------------------
Retrieves survey questions from Pinecone vectorstore.
Metadata filtering first, semantic search fallback.
Returns raw data only - no synthesis.
"""
import os
import json
from typing import List, Dict, Any, Optional
from pathlib import Path
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
class QuestionInfo:
"""Structured question information for cross-pipeline coordination."""
def __init__(self, variable_name: str, year: Optional[int] = None,
month: Optional[str] = None, poll_date: Optional[str] = None,
question_id: Optional[str] = None):
self.variable_name = variable_name
self.year = year
self.month = month
self.poll_date = poll_date
self.question_id = question_id
def to_dict(self) -> Dict[str, Any]:
return {
"variable_name": self.variable_name,
"year": self.year,
"month": self.month,
"poll_date": self.poll_date,
"question_id": self.question_id
}
class QuestionnaireRAG:
"""Questionnaire RAG with metadata-first filtering."""
def __init__(
self,
openai_api_key: str,
pinecone_api_key: str,
persist_directory: str = "./questionnaire_vectorstores",
verbose: bool = False
):
self.openai_api_key = openai_api_key
self.pinecone_api_key = pinecone_api_key
self.persist_directory = persist_directory
self.verbose = verbose
# Initialize embeddings
self.embeddings = OpenAIEmbeddings(
model=os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small")
)
# Connect to Pinecone
index_name = os.getenv("PINECONE_INDEX_NAME", "poll-questionnaire-index")
namespace = os.getenv("PINECONE_NAMESPACE") or None
pc = Pinecone(api_key=self.pinecone_api_key)
self.index = pc.Index(index_name)
self.vectorstore = PineconeVectorStore(
index=self.index,
embedding=self.embeddings,
namespace=namespace
)
# Load catalog and questions
self.poll_catalog = self._load_catalog()
self.questions_by_id = self._load_questions_index()
if self.verbose:
print(f"βœ“ Loaded {len(self.questions_by_id)} questions from {len(self.poll_catalog)} polls")
def _load_catalog(self) -> Dict[str, Dict]:
"""Load poll catalog"""
catalog_path = Path(self.persist_directory) / "poll_catalog.json"
if not catalog_path.exists():
# Try parent directory if not found
parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "poll_catalog.json"
if parent_path.exists():
catalog_path = parent_path
else:
return {}
with open(catalog_path, 'r') as f:
return json.load(f)
def _load_questions_index(self) -> Dict[str, Dict]:
"""Load questions index"""
questions_path = Path(self.persist_directory) / "questions_index.json"
if not questions_path.exists():
# Try parent directory if not found
parent_path = Path(self.persist_directory).parent / "questionnaire_vectorstores" / "questions_index.json"
if parent_path.exists():
questions_path = parent_path
else:
return {}
with open(questions_path, 'r') as f:
return json.load(f)
def _fuzzy_match_survey_name(self, requested_name: str) -> Optional[str]:
"""Fuzzy match survey name"""
available_names = set()
for info in self.poll_catalog.values():
available_names.add(info["survey_name"])
normalized_requested = requested_name.lower().replace("_", " ").replace("-", " ")
for stored_name in available_names:
normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ")
if normalized_requested == normalized_stored:
return stored_name
if normalized_requested in normalized_stored or normalized_stored in normalized_requested:
return stored_name
requested_words = set(normalized_requested.split())
for stored_name in available_names:
normalized_stored = stored_name.lower().replace("_", " ").replace("-", " ")
stored_words = set(normalized_stored.split())
if requested_words.issubset(stored_words):
return stored_name
return None
def _build_pinecone_filter(self, filters: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Build Pinecone metadata filter"""
if not filters:
return None
filter_conditions = []
if "year" in filters and filters["year"] is not None:
year = int(filters["year"]) if isinstance(filters["year"], str) else filters["year"]
filter_conditions.append({"year": {"$eq": year}})
if "month" in filters and filters["month"] is not None:
month = filters["month"].capitalize()
filter_conditions.append({"month": {"$eq": month}})
if "poll_date" in filters and filters["poll_date"] is not None:
filter_conditions.append({"poll_date": {"$eq": filters["poll_date"]}})
if "survey_name" in filters and filters["survey_name"] is not None:
matched_name = self._fuzzy_match_survey_name(filters["survey_name"])
if matched_name:
filter_conditions.append({"survey_name": {"$eq": matched_name}})
if "question_ids" in filters and filters["question_ids"]:
question_ids = filters["question_ids"]
if isinstance(question_ids, list) and len(question_ids) > 0:
if len(question_ids) == 1:
filter_conditions.append({"question_id": {"$eq": question_ids[0]}})
else:
filter_conditions.append({"question_id": {"$in": question_ids}})
if "topic" in filters and filters["topic"]:
topic = filters["topic"].lower()
filter_conditions.append({"topics": {"$in": [topic]}})
if len(filter_conditions) == 0:
return None
elif len(filter_conditions) == 1:
return filter_conditions[0]
else:
return {"$and": filter_conditions}
def retrieve_raw_data(
self,
question: str,
filters: Optional[Dict[str, Any]] = None,
k: int = 20
) -> Dict[str, Any]:
"""
Retrieve raw questionnaire data.
Metadata filtering first, semantic search fallback.
Returns:
Dict with 'source_questions', 'num_sources', 'filters_applied', 'question_info'
"""
if self.verbose:
print(f"\nπŸ“Š [Questionnaire] Query: {question}")
if filters:
print(f"πŸ” Filters: {filters}")
# Build Pinecone filter
pinecone_filter = self._build_pinecone_filter(filters or {})
# Try metadata filtering first
docs = []
if pinecone_filter:
if self.verbose:
print(f"πŸ”§ Using metadata filter: {pinecone_filter}")
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k, "filter": pinecone_filter}
)
docs = retriever.invoke(question)
if self.verbose:
print(f"πŸ“₯ Retrieved {len(docs)} documents with metadata filter")
# Fallback to semantic search if no results
if not docs:
if self.verbose:
print(f"⚠️ No results with metadata filter, falling back to semantic search")
retriever = self.vectorstore.as_retriever(search_kwargs={"k": k * 2})
docs = retriever.invoke(question)
if self.verbose:
print(f"πŸ“₯ Retrieved {len(docs)} documents with semantic search")
if not docs:
return {
"source_questions": [],
"num_sources": 0,
"filters_applied": filters or {},
"question_info": []
}
# Reconstruct full questions and extract question_info
full_questions = []
seen_ids = set()
question_info_list = []
for doc in docs:
q_id = doc.metadata.get('question_id')
if q_id and q_id not in seen_ids:
if q_id in self.questions_by_id:
q_data = self.questions_by_id[q_id]
full_questions.append(q_data)
seen_ids.add(q_id)
# Extract question_info
question_info_list.append(QuestionInfo(
variable_name=q_data.get("variable_name", ""),
year=q_data.get("year"),
month=q_data.get("month", ""),
poll_date=q_data.get("poll_date", ""),
question_id=q_id
))
# Sort by position
full_questions.sort(key=lambda q: (q.get('poll_date', ''), q.get('position', 0)))
if self.verbose:
print(f"βœ… Extracted {len(question_info_list)} question info entries")
return {
'source_questions': full_questions,
'num_sources': len(full_questions),
'filters_applied': filters or {},
'question_info': [q.to_dict() for q in question_info_list]
}
def get_available_survey_names(self) -> List[str]:
"""Get list of unique survey names"""
survey_names = set()
for info in self.poll_catalog.values():
survey_names.add(info["survey_name"])
return sorted(survey_names)
def get_available_polls(self) -> List[Dict[str, Any]]:
"""Get list of all available polls"""
return [
{
"poll_date": poll_date,
"survey_name": info["survey_name"],
"year": info["year"],
"month": info.get("month", ""),
"num_questions": info["num_questions"]
}
for poll_date, info in sorted(self.poll_catalog.items())
]