import os import io import re import logging import asyncio from typing import List, Dict, Any from datetime import datetime, timezone # Core Web Framework and Async Handlers from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse from uvicorn import run as uvicorn_run # Slack Libraries (Async versions) from slack_bolt.async_app import AsyncApp from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler from slack_bolt.oauth.async_oauth_settings import AsyncOAuthSettings from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore from slack_sdk.oauth.installation_store.models import Installation, Bot from slack_sdk.oauth.state_store import FileOAuthStateStore from slack_sdk.web.async_client import AsyncWebClient from slack_bolt.authorization import AuthorizeResult # RAG/ML Libraries from sentence_transformers import SentenceTransformer from transformers import pipeline import torch from huggingface_hub import login # Data Handling Libraries from supabase import create_client, Client import pypdf from docx import Document import requests # --- Configuration and Initialization --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load secrets from environment variables SUPABASE_URL = os.environ.get("SUPABASE_URL") SUPABASE_KEY = os.environ.get("SUPABASE_KEY") SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN") SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET") SLACK_CLIENT_ID = os.environ.get("SLACK_CLIENT_ID") SLACK_CLIENT_SECRET = os.environ.get("SLACK_CLIENT_SECRET") HF_TOKEN = os.environ.get("HF_TOKEN") # Check for required environment variables required_vars = [SUPABASE_URL, SUPABASE_KEY, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET, SLACK_SIGNING_SECRET] if not all(required_vars): missing = [var for var in required_vars if not var] raise ValueError(f"Missing required environment variables: {', '.join(missing)}") # Validate bot token if provided if SLACK_BOT_TOKEN: if SLACK_BOT_TOKEN.startswith("xoxp-"): raise ValueError( "❌ SLACK_BOT_TOKEN is a User OAuth Token (xoxp-).\n" "You need the Bot User OAuth Token (xoxb-)" ) logger.info("✓ Bot token provided") # Set HF_TOKEN if provided if HF_TOKEN: try: login(token=HF_TOKEN, add_to_git_credential=False) logger.info("Logged into Hugging Face Hub successfully.") except Exception as e: logger.warning(f"Failed to log into Hugging Face Hub: {e}") # Initialize Supabase client supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) # --- Supabase Installation Store with Token Rotation Support --- class SupabaseAsyncInstallationStore(AsyncInstallationStore): def __init__(self, supabase_client): self.supabase = supabase_client self._client = AsyncWebClient() async def async_save(self, installation: Installation): """Save installation with token rotation support.""" return await self.save(installation) async def save(self, installation: Installation) -> None: """Save installation including refresh token and expiration.""" data = { "team_id": installation.team_id, "enterprise_id": installation.enterprise_id, "bot_token": installation.bot_token, "bot_user_id": installation.bot_user_id, "bot_scopes": ",".join(installation.bot_scopes) if installation.bot_scopes else None, "app_id": installation.app_id, "bot_refresh_token": installation.bot_refresh_token, "bot_token_expires_at": installation.bot_token_expires_at, "user_id": installation.user_id, "installed_at": datetime.now(timezone.utc).isoformat(), } try: result = await asyncio.to_thread( lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute() ) logger.info(f"✓ Saved installation for team: {installation.team_id}") if installation.bot_token_expires_at: expires_dt = datetime.fromtimestamp(installation.bot_token_expires_at, tz=timezone.utc) logger.info(f" Token expires at: {expires_dt}") except Exception as e: logger.error(f"Failed to save installation for team {installation.team_id}: {e}") raise async def async_find_installation( self, *, enterprise_id: str | None, team_id: str | None, user_id: str | None = None, is_enterprise_install: bool | None = None, ) -> Installation | None: """Find installation with automatic token rotation.""" if not team_id: return None installation = await self.fetch_installation( team_id=team_id, enterprise_id=enterprise_id, user_id=user_id ) if not installation: return None # Check if token needs rotation if installation.bot_token_expires_at and installation.bot_refresh_token: now = datetime.now(timezone.utc).timestamp() # Refresh if token expires in less than 1 hour if installation.bot_token_expires_at - now < 3600: logger.info(f"🔄 Token expiring soon for team {team_id}, rotating...") installation = await self._rotate_token(installation) return installation async def _rotate_token(self, installation: Installation) -> Installation: """Rotate an expired or expiring token using refresh token.""" try: response = await self._client.oauth_v2_access( client_id=SLACK_CLIENT_ID, client_secret=SLACK_CLIENT_SECRET, grant_type="refresh_token", refresh_token=installation.bot_refresh_token, ) if response["ok"]: # Update installation with new tokens installation.bot_token = response["access_token"] installation.bot_refresh_token = response.get("refresh_token", installation.bot_refresh_token) installation.bot_token_expires_at = response.get("expires_in", 0) + int(datetime.now(timezone.utc).timestamp()) # Save updated installation await self.save(installation) logger.info(f"✓ Token rotated successfully for team {installation.team_id}") return installation else: logger.error(f"Token rotation failed: {response.get('error')}") return installation except Exception as e: logger.error(f"Error rotating token: {e}") return installation async def fetch_installation( self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None ) -> Installation | None: """Fetch installation from Supabase.""" try: result = await asyncio.to_thread( lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute() ) if not result.data: return None data = result.data[0] return Installation( app_id=data.get("app_id"), enterprise_id=data.get("enterprise_id"), team_id=team_id, user_id=data.get("user_id"), bot_token=data.get("bot_token"), bot_user_id=data.get("bot_user_id"), bot_scopes=data.get("bot_scopes", "").split(',') if data.get("bot_scopes") else [], bot_refresh_token=data.get("bot_refresh_token"), bot_token_expires_at=data.get("bot_token_expires_at"), ) except Exception as e: logger.error(f"Failed to fetch installation for team {team_id}: {e}") return None async def async_delete_installation( self, *, enterprise_id: str | None, team_id: str | None, user_id: str | None = None, ) -> None: """Delete installation.""" if team_id: await self.delete(team_id=team_id, enterprise_id=enterprise_id, user_id=user_id) async def delete( self, team_id: str, *, enterprise_id: str | None = None, user_id: str | None = None ) -> None: """Delete installation from Supabase.""" try: await asyncio.to_thread( lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute() ) logger.info(f"Deleted installation for team: {team_id}") except Exception as e: logger.error(f"Failed to delete installation for team {team_id}: {e}") raise # Initialize installation store installation_store = SupabaseAsyncInstallationStore(supabase) # Initialize Bolt Async App with OAuth # Note: When using oauth_settings, authorization is handled automatically # The installation_store's async_find_installation method handles token rotation app = AsyncApp( signing_secret=SLACK_SIGNING_SECRET, installation_store=installation_store, # DO NOT use 'authorize' when using 'oauth_settings' - they conflict oauth_settings=AsyncOAuthSettings( client_id=SLACK_CLIENT_ID, client_secret=SLACK_CLIENT_SECRET, scopes=[ # Core required scopes "app_mentions:read", # Listen when bot is @mentioned "chat:write", # Send messages as the bot "files:read", # Read uploaded files "channels:read", # View basic channel info # DM support (recommended) "im:read", # View DM info "im:write", # Send DMs "im:history", # Read DM messages # Optional: for private channels # "groups:read", # View private channel info # "channels:history", # Read channel message history ], install_path="/slack/install", redirect_uri_path="/slack/oauth_redirect", state_store=FileOAuthStateStore(expiration_seconds=600, base_dir="/tmp/slack_states"), ), process_before_response=True, ) api = FastAPI() # --- RAG Model Loading --- device = 'cuda' if torch.cuda.is_available() else 'cpu' logger.info(f"Using device: {device}") print("Loading embedding model...") embedding_model = None qa_pipeline = None try: embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device) print("Loading QA model...") qa_pipeline = pipeline( "question-answering", model="deepset/roberta-base-squad2", device=0 if device == 'cuda' else -1 ) print("Models loaded successfully!") except Exception as e: logger.error(f"Error loading models: {e}") raise RuntimeError("Failed to load required ML models.") # --- Utility Functions --- def download_slack_file(url: str, token: str) -> bytes: """Downloads a file from Slack.""" try: headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers, stream=True, timeout=30) response.raise_for_status() return response.content except requests.RequestException as e: logger.error(f"Failed to download Slack file: {e}") raise def extract_text_from_pdf(file_content: bytes) -> str: """Extracts text from PDF.""" try: pdf_reader = pypdf.PdfReader(io.BytesIO(file_content)) text = "" for page in pdf_reader.pages: extracted = page.extract_text() if extracted: text += extracted + "\n" return text except Exception as e: logger.error(f"Failed to extract PDF text: {e}") raise def extract_text_from_docx(file_content: bytes) -> str: """Extracts text from DOCX.""" try: doc = Document(io.BytesIO(file_content)) return "\n".join([p.text for p in doc.paragraphs]) except Exception as e: logger.error(f"Failed to extract DOCX text: {e}") raise def chunk_text(text: str, chunk_size: int = 300) -> List[str]: """Chunks text by word count.""" words = text.split() return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size) if words[i:i + chunk_size]] def embed_text(text: str) -> List[float]: """Generates embedding for text.""" if not embedding_model: raise RuntimeError("Embedding model not loaded.") return embedding_model.encode(text, convert_to_tensor=False).tolist() async def store_embeddings(chunks: List[str]): """Stores text chunks and embeddings in Supabase.""" for chunk in chunks: try: embedding = embed_text(chunk) await asyncio.to_thread( lambda c=chunk, e=embedding: supabase.table("documents").insert({ "content": c, "embedding": e }).execute() ) except Exception as e: logger.error(f"Failed to insert chunk: {e}") async def is_table_empty() -> bool: """Checks if documents table is empty.""" try: result = await asyncio.to_thread( lambda: supabase.table("documents").select("id", count="exact").limit(1).execute() ) return result.count == 0 except Exception as e: logger.error(f"Error checking table: {e}") return True async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]: """Searches documents using vector similarity.""" if not embedding_model: raise RuntimeError("Embedding model not loaded.") try: query_embedding = embed_text(query) result = await asyncio.to_thread( lambda: supabase.rpc("match_documents", { "query_embedding": query_embedding, "match_count": match_count }).execute() ) return result.data except Exception as e: logger.error(f"Error searching documents: {e}") return [] async def answer_question(question: str, context: str) -> str: """Answers question using QA pipeline.""" if not qa_pipeline: raise RuntimeError("QA pipeline not loaded.") if not context.strip(): return "No relevant documents found." try: context_slice = context[:4096] result = await asyncio.to_thread( lambda: qa_pipeline(question=question, context=context_slice) ) return result['answer'] except Exception as e: logger.error(f"Error in QA pipeline: {e}") return "Error generating answer." # --- Slack Event Handlers --- @app.event("file_shared") async def handle_file_shared(event, say, client): """Processes shared files.""" file_id = event["file_id"] try: file_info = await client.files_info(file=file_id) file_data = file_info["file"] file_type = file_data.get("mimetype", "") file_url = file_data.get("url_private_download") if not file_url: await say("No download URL available.") return file_content = await asyncio.to_thread(download_slack_file, file_url, client.token) text = "" if "pdf" in file_type: text = await asyncio.to_thread(extract_text_from_pdf, file_content) elif "wordprocessingml" in file_type or "msword" in file_type: text = await asyncio.to_thread(extract_text_from_docx, file_content) else: await say("❌ Unsupported file type. Please upload PDF or DOCX files.") return if not text.strip(): await say("❌ No text could be extracted from the file.") return chunks = chunk_text(text) await store_embeddings(chunks) await say(f"✅ File processed! Added **{len(chunks)}** chunks to knowledge base.") except Exception as e: logger.error(f"Error processing file {file_id}: {e}") await say(f"❌ Error processing file: {str(e)}") @app.event("app_mention") async def handle_mention(event, say): """Handles bot mentions.""" text = event["text"] user_query = re.sub(r'<@[A-Z0-9]+>', '', text).strip() if not user_query: await say("Please ask me a question!") return try: if await is_table_empty(): await say("📚 My knowledge base is empty. Please share PDF or DOCX files first!") return results = await search_documents(user_query, match_count=5) if not results: await say("🔍 I couldn't find relevant information. Try uploading more documents.") return context = " ".join([doc["content"] for doc in results]) answer = await answer_question(user_query, context) await say(f"💡 **Answer:** {answer}\n\n_Found from {len(results)} document chunks._") except Exception as e: logger.error(f"Error answering query '{user_query}': {e}") await say(f"❌ Error: {str(e)}") @app.event("message") async def handle_message(event, say): """Handles direct messages to the bot.""" # Only respond to DMs (not channel messages) if event.get("channel_type") == "im": text = event.get("text", "").strip() if not text: return # Don't respond to bot's own messages if event.get("bot_id"): return try: if await is_table_empty(): await say("📚 My knowledge base is empty. Please share PDF or DOCX files in a channel where I'm present!") return results = await search_documents(text, match_count=5) if not results: await say("🔍 I couldn't find relevant information. Try asking about uploaded documents.") return context = " ".join([doc["content"] for doc in results]) answer = await answer_question(text, context) await say(f"💡 **Answer:** {answer}\n\n_Found from {len(results)} document chunks._") except Exception as e: logger.error(f"Error in DM handler: {e}") await say(f"❌ Error: {str(e)}") # --- FastAPI Routes --- handler = AsyncSlackRequestHandler(app) @api.post("/slack/events") async def slack_events(request: Request): """Slack events endpoint.""" return await handler.handle(request) @api.get("/slack/install") async def slack_install(request: Request): """OAuth installation initiation.""" return await handler.handle(request) @api.get("/slack/oauth_redirect") async def oauth_redirect(request: Request): """OAuth callback handler.""" return await handler.handle(request) @api.get("/") async def root(): """Root endpoint.""" return { "status": "✅ Slack RAG Bot Running", "install_url": f"/slack/install", "features": ["OAuth2 with token rotation", "RAG with vector search", "PDF/DOCX support"] } @api.get("/health") async def health(): """Health check.""" db_status = "error" try: await asyncio.to_thread( lambda: supabase.table("documents").select("id").limit(1).execute() ) db_status = "ok" except Exception as e: logger.error(f"DB health check failed: {e}") return { "status": "ok" if db_status == "ok" and embedding_model and qa_pipeline else "error", "database": db_status, "models": "ok" if embedding_model and qa_pipeline else "error", "oauth": "enabled", "token_rotation": "enabled" } @api.get("/debug/installations") async def debug_installations(): """Debug endpoint to view installations.""" try: result = await asyncio.to_thread( lambda: supabase.table("installations").select("team_id", "bot_user_id", "bot_token_expires_at", "installed_at").execute() ) installations = [] for inst in result.data: expires_at = inst.get("bot_token_expires_at") if expires_at: expires_dt = datetime.fromtimestamp(expires_at, tz=timezone.utc) inst["expires_at_formatted"] = expires_dt.isoformat() inst["expires_in_hours"] = round((expires_at - datetime.now(timezone.utc).timestamp()) / 3600, 2) installations.append(inst) return {"installations": installations, "count": len(installations)} except Exception as e: return {"error": str(e)} if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) logger.info(f"🚀 Starting server on port {port}") logger.info("🔐 OAuth2 with token rotation enabled") uvicorn_run(api, host="0.0.0.0", port=port)