rag-slack / app.py
NeerajCodz's picture
debug3
cbde178 verified
import os
import io
import re
import logging
import asyncio
from typing import List, Dict, Any
from datetime import datetime, timezone
# Core Web Framework and Async Handlers
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from uvicorn import run as uvicorn_run
# Slack Libraries (Async versions)
from slack_bolt.async_app import AsyncApp
from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
from slack_bolt.oauth.async_oauth_settings import AsyncOAuthSettings
from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore
from slack_sdk.oauth.installation_store.models import Installation, Bot
from slack_sdk.oauth.state_store import FileOAuthStateStore
from slack_sdk.web.async_client import AsyncWebClient
from slack_bolt.authorization import AuthorizeResult
# RAG/ML Libraries
from sentence_transformers import SentenceTransformer
from transformers import pipeline
import torch
from huggingface_hub import login
# Data Handling Libraries
from supabase import create_client, Client
import pypdf
from docx import Document
import requests
# --- Configuration and Initialization ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load secrets from environment variables
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN")
SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET")
SLACK_CLIENT_ID = os.environ.get("SLACK_CLIENT_ID")
SLACK_CLIENT_SECRET = os.environ.get("SLACK_CLIENT_SECRET")
HF_TOKEN = os.environ.get("HF_TOKEN")
# Check for required environment variables
required_vars = [SUPABASE_URL, SUPABASE_KEY, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET, SLACK_SIGNING_SECRET]
if not all(required_vars):
missing = [var for var in required_vars if not var]
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
# Validate bot token if provided
if SLACK_BOT_TOKEN:
if SLACK_BOT_TOKEN.startswith("xoxp-"):
raise ValueError(
"❌ SLACK_BOT_TOKEN is a User OAuth Token (xoxp-).\n"
"You need the Bot User OAuth Token (xoxb-)"
)
logger.info("βœ“ Bot token provided")
# Set HF_TOKEN if provided
if HF_TOKEN:
try:
login(token=HF_TOKEN, add_to_git_credential=False)
logger.info("Logged into Hugging Face Hub successfully.")
except Exception as e:
logger.warning(f"Failed to log into Hugging Face Hub: {e}")
# Initialize Supabase client
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# --- Supabase Installation Store with Token Rotation Support ---
class SupabaseAsyncInstallationStore(AsyncInstallationStore):
def __init__(self, supabase_client):
self.supabase = supabase_client
self._client = AsyncWebClient()
async def async_save(self, installation: Installation):
"""Save installation with token rotation support."""
return await self.save(installation)
async def save(self, installation: Installation) -> None:
"""Save installation including refresh token and expiration."""
data = {
"team_id": installation.team_id,
"enterprise_id": installation.enterprise_id,
"bot_token": installation.bot_token,
"bot_user_id": installation.bot_user_id,
"bot_scopes": ",".join(installation.bot_scopes) if installation.bot_scopes else None,
"app_id": installation.app_id,
"bot_refresh_token": installation.bot_refresh_token,
"bot_token_expires_at": installation.bot_token_expires_at,
"user_id": installation.user_id,
"installed_at": datetime.now(timezone.utc).isoformat(),
}
try:
result = await asyncio.to_thread(
lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute()
)
logger.info(f"βœ“ Saved installation for team: {installation.team_id}")
if installation.bot_token_expires_at:
expires_dt = datetime.fromtimestamp(installation.bot_token_expires_at, tz=timezone.utc)
logger.info(f" Token expires at: {expires_dt}")
except Exception as e:
logger.error(f"Failed to save installation for team {installation.team_id}: {e}")
raise
async def async_find_installation(
self,
*,
enterprise_id: str | None,
team_id: str | None,
user_id: str | None = None,
is_enterprise_install: bool | None = None,
) -> Installation | None:
"""Find installation with automatic token rotation."""
if not team_id:
return None
installation = await self.fetch_installation(
team_id=team_id,
enterprise_id=enterprise_id,
user_id=user_id
)
if not installation:
return None
# Check if token needs rotation
if installation.bot_token_expires_at and installation.bot_refresh_token:
now = datetime.now(timezone.utc).timestamp()
# Refresh if token expires in less than 1 hour
if installation.bot_token_expires_at - now < 3600:
logger.info(f"πŸ”„ Token expiring soon for team {team_id}, rotating...")
installation = await self._rotate_token(installation)
return installation
async def _rotate_token(self, installation: Installation) -> Installation:
"""Rotate an expired or expiring token using refresh token."""
try:
response = await self._client.oauth_v2_access(
client_id=SLACK_CLIENT_ID,
client_secret=SLACK_CLIENT_SECRET,
grant_type="refresh_token",
refresh_token=installation.bot_refresh_token,
)
if response["ok"]:
# Update installation with new tokens
installation.bot_token = response["access_token"]
installation.bot_refresh_token = response.get("refresh_token", installation.bot_refresh_token)
installation.bot_token_expires_at = response.get("expires_in", 0) + int(datetime.now(timezone.utc).timestamp())
# Save updated installation
await self.save(installation)
logger.info(f"βœ“ Token rotated successfully for team {installation.team_id}")
return installation
else:
logger.error(f"Token rotation failed: {response.get('error')}")
return installation
except Exception as e:
logger.error(f"Error rotating token: {e}")
return installation
async def fetch_installation(
self,
team_id: str,
*,
enterprise_id: str | None = None,
user_id: str | None = None
) -> Installation | None:
"""Fetch installation from Supabase."""
try:
result = await asyncio.to_thread(
lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute()
)
if not result.data:
return None
data = result.data[0]
return Installation(
app_id=data.get("app_id"),
enterprise_id=data.get("enterprise_id"),
team_id=team_id,
user_id=data.get("user_id"),
bot_token=data.get("bot_token"),
bot_user_id=data.get("bot_user_id"),
bot_scopes=data.get("bot_scopes", "").split(',') if data.get("bot_scopes") else [],
bot_refresh_token=data.get("bot_refresh_token"),
bot_token_expires_at=data.get("bot_token_expires_at"),
)
except Exception as e:
logger.error(f"Failed to fetch installation for team {team_id}: {e}")
return None
async def async_delete_installation(
self,
*,
enterprise_id: str | None,
team_id: str | None,
user_id: str | None = None,
) -> None:
"""Delete installation."""
if team_id:
await self.delete(team_id=team_id, enterprise_id=enterprise_id, user_id=user_id)
async def delete(
self,
team_id: str,
*,
enterprise_id: str | None = None,
user_id: str | None = None
) -> None:
"""Delete installation from Supabase."""
try:
await asyncio.to_thread(
lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute()
)
logger.info(f"Deleted installation for team: {team_id}")
except Exception as e:
logger.error(f"Failed to delete installation for team {team_id}: {e}")
raise
# Initialize installation store
installation_store = SupabaseAsyncInstallationStore(supabase)
# Initialize Bolt Async App with OAuth
# Note: When using oauth_settings, authorization is handled automatically
# The installation_store's async_find_installation method handles token rotation
app = AsyncApp(
signing_secret=SLACK_SIGNING_SECRET,
installation_store=installation_store,
# DO NOT use 'authorize' when using 'oauth_settings' - they conflict
oauth_settings=AsyncOAuthSettings(
client_id=SLACK_CLIENT_ID,
client_secret=SLACK_CLIENT_SECRET,
scopes=[
# Core required scopes
"app_mentions:read", # Listen when bot is @mentioned
"chat:write", # Send messages as the bot
"files:read", # Read uploaded files
"channels:read", # View basic channel info
# DM support (recommended)
"im:read", # View DM info
"im:write", # Send DMs
"im:history", # Read DM messages
# Optional: for private channels
# "groups:read", # View private channel info
# "channels:history", # Read channel message history
],
install_path="/slack/install",
redirect_uri_path="/slack/oauth_redirect",
state_store=FileOAuthStateStore(expiration_seconds=600, base_dir="/tmp/slack_states"),
),
process_before_response=True,
)
api = FastAPI()
# --- RAG Model Loading ---
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info(f"Using device: {device}")
print("Loading embedding model...")
embedding_model = None
qa_pipeline = None
try:
embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
print("Loading QA model...")
qa_pipeline = pipeline(
"question-answering",
model="deepset/roberta-base-squad2",
device=0 if device == 'cuda' else -1
)
print("Models loaded successfully!")
except Exception as e:
logger.error(f"Error loading models: {e}")
raise RuntimeError("Failed to load required ML models.")
# --- Utility Functions ---
def download_slack_file(url: str, token: str) -> bytes:
"""Downloads a file from Slack."""
try:
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
return response.content
except requests.RequestException as e:
logger.error(f"Failed to download Slack file: {e}")
raise
def extract_text_from_pdf(file_content: bytes) -> str:
"""Extracts text from PDF."""
try:
pdf_reader = pypdf.PdfReader(io.BytesIO(file_content))
text = ""
for page in pdf_reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
except Exception as e:
logger.error(f"Failed to extract PDF text: {e}")
raise
def extract_text_from_docx(file_content: bytes) -> str:
"""Extracts text from DOCX."""
try:
doc = Document(io.BytesIO(file_content))
return "\n".join([p.text for p in doc.paragraphs])
except Exception as e:
logger.error(f"Failed to extract DOCX text: {e}")
raise
def chunk_text(text: str, chunk_size: int = 300) -> List[str]:
"""Chunks text by word count."""
words = text.split()
return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size) if words[i:i + chunk_size]]
def embed_text(text: str) -> List[float]:
"""Generates embedding for text."""
if not embedding_model:
raise RuntimeError("Embedding model not loaded.")
return embedding_model.encode(text, convert_to_tensor=False).tolist()
async def store_embeddings(chunks: List[str]):
"""Stores text chunks and embeddings in Supabase."""
for chunk in chunks:
try:
embedding = embed_text(chunk)
await asyncio.to_thread(
lambda c=chunk, e=embedding: supabase.table("documents").insert({
"content": c,
"embedding": e
}).execute()
)
except Exception as e:
logger.error(f"Failed to insert chunk: {e}")
async def is_table_empty() -> bool:
"""Checks if documents table is empty."""
try:
result = await asyncio.to_thread(
lambda: supabase.table("documents").select("id", count="exact").limit(1).execute()
)
return result.count == 0
except Exception as e:
logger.error(f"Error checking table: {e}")
return True
async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]:
"""Searches documents using vector similarity."""
if not embedding_model:
raise RuntimeError("Embedding model not loaded.")
try:
query_embedding = embed_text(query)
result = await asyncio.to_thread(
lambda: supabase.rpc("match_documents", {
"query_embedding": query_embedding,
"match_count": match_count
}).execute()
)
return result.data
except Exception as e:
logger.error(f"Error searching documents: {e}")
return []
async def answer_question(question: str, context: str) -> str:
"""Answers question using QA pipeline."""
if not qa_pipeline:
raise RuntimeError("QA pipeline not loaded.")
if not context.strip():
return "No relevant documents found."
try:
context_slice = context[:4096]
result = await asyncio.to_thread(
lambda: qa_pipeline(question=question, context=context_slice)
)
return result['answer']
except Exception as e:
logger.error(f"Error in QA pipeline: {e}")
return "Error generating answer."
# --- Slack Event Handlers ---
@app.event("file_shared")
async def handle_file_shared(event, say, client):
"""Processes shared files."""
file_id = event["file_id"]
try:
file_info = await client.files_info(file=file_id)
file_data = file_info["file"]
file_type = file_data.get("mimetype", "")
file_url = file_data.get("url_private_download")
if not file_url:
await say("No download URL available.")
return
file_content = await asyncio.to_thread(download_slack_file, file_url, client.token)
text = ""
if "pdf" in file_type:
text = await asyncio.to_thread(extract_text_from_pdf, file_content)
elif "wordprocessingml" in file_type or "msword" in file_type:
text = await asyncio.to_thread(extract_text_from_docx, file_content)
else:
await say("❌ Unsupported file type. Please upload PDF or DOCX files.")
return
if not text.strip():
await say("❌ No text could be extracted from the file.")
return
chunks = chunk_text(text)
await store_embeddings(chunks)
await say(f"βœ… File processed! Added **{len(chunks)}** chunks to knowledge base.")
except Exception as e:
logger.error(f"Error processing file {file_id}: {e}")
await say(f"❌ Error processing file: {str(e)}")
@app.event("app_mention")
async def handle_mention(event, say):
"""Handles bot mentions."""
text = event["text"]
user_query = re.sub(r'<@[A-Z0-9]+>', '', text).strip()
if not user_query:
await say("Please ask me a question!")
return
try:
if await is_table_empty():
await say("πŸ“š My knowledge base is empty. Please share PDF or DOCX files first!")
return
results = await search_documents(user_query, match_count=5)
if not results:
await say("πŸ” I couldn't find relevant information. Try uploading more documents.")
return
context = " ".join([doc["content"] for doc in results])
answer = await answer_question(user_query, context)
await say(f"πŸ’‘ **Answer:** {answer}\n\n_Found from {len(results)} document chunks._")
except Exception as e:
logger.error(f"Error answering query '{user_query}': {e}")
await say(f"❌ Error: {str(e)}")
@app.event("message")
async def handle_message(event, say):
"""Handles direct messages to the bot."""
# Only respond to DMs (not channel messages)
if event.get("channel_type") == "im":
text = event.get("text", "").strip()
if not text:
return
# Don't respond to bot's own messages
if event.get("bot_id"):
return
try:
if await is_table_empty():
await say("πŸ“š My knowledge base is empty. Please share PDF or DOCX files in a channel where I'm present!")
return
results = await search_documents(text, match_count=5)
if not results:
await say("πŸ” I couldn't find relevant information. Try asking about uploaded documents.")
return
context = " ".join([doc["content"] for doc in results])
answer = await answer_question(text, context)
await say(f"πŸ’‘ **Answer:** {answer}\n\n_Found from {len(results)} document chunks._")
except Exception as e:
logger.error(f"Error in DM handler: {e}")
await say(f"❌ Error: {str(e)}")
# --- FastAPI Routes ---
handler = AsyncSlackRequestHandler(app)
@api.post("/slack/events")
async def slack_events(request: Request):
"""Slack events endpoint."""
return await handler.handle(request)
@api.get("/slack/install")
async def slack_install(request: Request):
"""OAuth installation initiation."""
return await handler.handle(request)
@api.get("/slack/oauth_redirect")
async def oauth_redirect(request: Request):
"""OAuth callback handler."""
return await handler.handle(request)
@api.get("/")
async def root():
"""Root endpoint."""
return {
"status": "βœ… Slack RAG Bot Running",
"install_url": f"/slack/install",
"features": ["OAuth2 with token rotation", "RAG with vector search", "PDF/DOCX support"]
}
@api.get("/health")
async def health():
"""Health check."""
db_status = "error"
try:
await asyncio.to_thread(
lambda: supabase.table("documents").select("id").limit(1).execute()
)
db_status = "ok"
except Exception as e:
logger.error(f"DB health check failed: {e}")
return {
"status": "ok" if db_status == "ok" and embedding_model and qa_pipeline else "error",
"database": db_status,
"models": "ok" if embedding_model and qa_pipeline else "error",
"oauth": "enabled",
"token_rotation": "enabled"
}
@api.get("/debug/installations")
async def debug_installations():
"""Debug endpoint to view installations."""
try:
result = await asyncio.to_thread(
lambda: supabase.table("installations").select("team_id", "bot_user_id", "bot_token_expires_at", "installed_at").execute()
)
installations = []
for inst in result.data:
expires_at = inst.get("bot_token_expires_at")
if expires_at:
expires_dt = datetime.fromtimestamp(expires_at, tz=timezone.utc)
inst["expires_at_formatted"] = expires_dt.isoformat()
inst["expires_in_hours"] = round((expires_at - datetime.now(timezone.utc).timestamp()) / 3600, 2)
installations.append(inst)
return {"installations": installations, "count": len(installations)}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
logger.info(f"πŸš€ Starting server on port {port}")
logger.info("πŸ” OAuth2 with token rotation enabled")
uvicorn_run(api, host="0.0.0.0", port=port)