Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import re | |
| import logging | |
| import asyncio | |
| from typing import List, Dict, Any | |
| from datetime import datetime, timezone | |
| # Core Web Framework and Async Handlers | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse, RedirectResponse | |
| from uvicorn import run as uvicorn_run | |
| # Slack Libraries (Async versions) | |
| from slack_bolt.async_app import AsyncApp | |
| from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler | |
| from slack_bolt.oauth.async_oauth_settings import AsyncOAuthSettings | |
| from slack_sdk.oauth.installation_store.async_installation_store import AsyncInstallationStore | |
| from slack_sdk.oauth.installation_store.models import Installation, Bot | |
| from slack_sdk.oauth.state_store import FileOAuthStateStore | |
| from slack_sdk.web.async_client import AsyncWebClient | |
| from slack_bolt.authorization import AuthorizeResult | |
| # RAG/ML Libraries | |
| from sentence_transformers import SentenceTransformer | |
| from transformers import pipeline | |
| import torch | |
| from huggingface_hub import login | |
| # Data Handling Libraries | |
| from supabase import create_client, Client | |
| import pypdf | |
| from docx import Document | |
| import requests | |
| # --- Configuration and Initialization --- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load secrets from environment variables | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_KEY") | |
| SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN") | |
| SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET") | |
| SLACK_CLIENT_ID = os.environ.get("SLACK_CLIENT_ID") | |
| SLACK_CLIENT_SECRET = os.environ.get("SLACK_CLIENT_SECRET") | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| # Check for required environment variables | |
| required_vars = [SUPABASE_URL, SUPABASE_KEY, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET, SLACK_SIGNING_SECRET] | |
| if not all(required_vars): | |
| missing = [var for var in required_vars if not var] | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
| # Validate bot token if provided | |
| if SLACK_BOT_TOKEN: | |
| if SLACK_BOT_TOKEN.startswith("xoxp-"): | |
| raise ValueError( | |
| "β SLACK_BOT_TOKEN is a User OAuth Token (xoxp-).\n" | |
| "You need the Bot User OAuth Token (xoxb-)" | |
| ) | |
| logger.info("β Bot token provided") | |
| # Set HF_TOKEN if provided | |
| if HF_TOKEN: | |
| try: | |
| login(token=HF_TOKEN, add_to_git_credential=False) | |
| logger.info("Logged into Hugging Face Hub successfully.") | |
| except Exception as e: | |
| logger.warning(f"Failed to log into Hugging Face Hub: {e}") | |
| # Initialize Supabase client | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| # --- Supabase Installation Store with Token Rotation Support --- | |
| class SupabaseAsyncInstallationStore(AsyncInstallationStore): | |
| def __init__(self, supabase_client): | |
| self.supabase = supabase_client | |
| self._client = AsyncWebClient() | |
| async def async_save(self, installation: Installation): | |
| """Save installation with token rotation support.""" | |
| return await self.save(installation) | |
| async def save(self, installation: Installation) -> None: | |
| """Save installation including refresh token and expiration.""" | |
| data = { | |
| "team_id": installation.team_id, | |
| "enterprise_id": installation.enterprise_id, | |
| "bot_token": installation.bot_token, | |
| "bot_user_id": installation.bot_user_id, | |
| "bot_scopes": ",".join(installation.bot_scopes) if installation.bot_scopes else None, | |
| "app_id": installation.app_id, | |
| "bot_refresh_token": installation.bot_refresh_token, | |
| "bot_token_expires_at": installation.bot_token_expires_at, | |
| "user_id": installation.user_id, | |
| "installed_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| try: | |
| result = await asyncio.to_thread( | |
| lambda: self.supabase.table("installations").upsert(data, on_conflict="team_id").execute() | |
| ) | |
| logger.info(f"β Saved installation for team: {installation.team_id}") | |
| if installation.bot_token_expires_at: | |
| expires_dt = datetime.fromtimestamp(installation.bot_token_expires_at, tz=timezone.utc) | |
| logger.info(f" Token expires at: {expires_dt}") | |
| except Exception as e: | |
| logger.error(f"Failed to save installation for team {installation.team_id}: {e}") | |
| raise | |
| async def async_find_installation( | |
| self, | |
| *, | |
| enterprise_id: str | None, | |
| team_id: str | None, | |
| user_id: str | None = None, | |
| is_enterprise_install: bool | None = None, | |
| ) -> Installation | None: | |
| """Find installation with automatic token rotation.""" | |
| if not team_id: | |
| return None | |
| installation = await self.fetch_installation( | |
| team_id=team_id, | |
| enterprise_id=enterprise_id, | |
| user_id=user_id | |
| ) | |
| if not installation: | |
| return None | |
| # Check if token needs rotation | |
| if installation.bot_token_expires_at and installation.bot_refresh_token: | |
| now = datetime.now(timezone.utc).timestamp() | |
| # Refresh if token expires in less than 1 hour | |
| if installation.bot_token_expires_at - now < 3600: | |
| logger.info(f"π Token expiring soon for team {team_id}, rotating...") | |
| installation = await self._rotate_token(installation) | |
| return installation | |
| async def _rotate_token(self, installation: Installation) -> Installation: | |
| """Rotate an expired or expiring token using refresh token.""" | |
| try: | |
| response = await self._client.oauth_v2_access( | |
| client_id=SLACK_CLIENT_ID, | |
| client_secret=SLACK_CLIENT_SECRET, | |
| grant_type="refresh_token", | |
| refresh_token=installation.bot_refresh_token, | |
| ) | |
| if response["ok"]: | |
| # Update installation with new tokens | |
| installation.bot_token = response["access_token"] | |
| installation.bot_refresh_token = response.get("refresh_token", installation.bot_refresh_token) | |
| installation.bot_token_expires_at = response.get("expires_in", 0) + int(datetime.now(timezone.utc).timestamp()) | |
| # Save updated installation | |
| await self.save(installation) | |
| logger.info(f"β Token rotated successfully for team {installation.team_id}") | |
| return installation | |
| else: | |
| logger.error(f"Token rotation failed: {response.get('error')}") | |
| return installation | |
| except Exception as e: | |
| logger.error(f"Error rotating token: {e}") | |
| return installation | |
| async def fetch_installation( | |
| self, | |
| team_id: str, | |
| *, | |
| enterprise_id: str | None = None, | |
| user_id: str | None = None | |
| ) -> Installation | None: | |
| """Fetch installation from Supabase.""" | |
| try: | |
| result = await asyncio.to_thread( | |
| lambda: self.supabase.table("installations").select("*").eq("team_id", team_id).execute() | |
| ) | |
| if not result.data: | |
| return None | |
| data = result.data[0] | |
| return Installation( | |
| app_id=data.get("app_id"), | |
| enterprise_id=data.get("enterprise_id"), | |
| team_id=team_id, | |
| user_id=data.get("user_id"), | |
| bot_token=data.get("bot_token"), | |
| bot_user_id=data.get("bot_user_id"), | |
| bot_scopes=data.get("bot_scopes", "").split(',') if data.get("bot_scopes") else [], | |
| bot_refresh_token=data.get("bot_refresh_token"), | |
| bot_token_expires_at=data.get("bot_token_expires_at"), | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch installation for team {team_id}: {e}") | |
| return None | |
| async def async_delete_installation( | |
| self, | |
| *, | |
| enterprise_id: str | None, | |
| team_id: str | None, | |
| user_id: str | None = None, | |
| ) -> None: | |
| """Delete installation.""" | |
| if team_id: | |
| await self.delete(team_id=team_id, enterprise_id=enterprise_id, user_id=user_id) | |
| async def delete( | |
| self, | |
| team_id: str, | |
| *, | |
| enterprise_id: str | None = None, | |
| user_id: str | None = None | |
| ) -> None: | |
| """Delete installation from Supabase.""" | |
| try: | |
| await asyncio.to_thread( | |
| lambda: self.supabase.table("installations").delete().eq("team_id", team_id).execute() | |
| ) | |
| logger.info(f"Deleted installation for team: {team_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to delete installation for team {team_id}: {e}") | |
| raise | |
| # Initialize installation store | |
| installation_store = SupabaseAsyncInstallationStore(supabase) | |
| # Initialize Bolt Async App with OAuth | |
| # Note: When using oauth_settings, authorization is handled automatically | |
| # The installation_store's async_find_installation method handles token rotation | |
| app = AsyncApp( | |
| signing_secret=SLACK_SIGNING_SECRET, | |
| installation_store=installation_store, | |
| # DO NOT use 'authorize' when using 'oauth_settings' - they conflict | |
| oauth_settings=AsyncOAuthSettings( | |
| client_id=SLACK_CLIENT_ID, | |
| client_secret=SLACK_CLIENT_SECRET, | |
| scopes=[ | |
| # Core required scopes | |
| "app_mentions:read", # Listen when bot is @mentioned | |
| "chat:write", # Send messages as the bot | |
| "files:read", # Read uploaded files | |
| "channels:read", # View basic channel info | |
| # DM support (recommended) | |
| "im:read", # View DM info | |
| "im:write", # Send DMs | |
| "im:history", # Read DM messages | |
| # Optional: for private channels | |
| # "groups:read", # View private channel info | |
| # "channels:history", # Read channel message history | |
| ], | |
| install_path="/slack/install", | |
| redirect_uri_path="/slack/oauth_redirect", | |
| state_store=FileOAuthStateStore(expiration_seconds=600, base_dir="/tmp/slack_states"), | |
| ), | |
| process_before_response=True, | |
| ) | |
| api = FastAPI() | |
| # --- RAG Model Loading --- | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| logger.info(f"Using device: {device}") | |
| print("Loading embedding model...") | |
| embedding_model = None | |
| qa_pipeline = None | |
| try: | |
| embedding_model = SentenceTransformer('all-MiniLM-L6-v2', device=device) | |
| print("Loading QA model...") | |
| qa_pipeline = pipeline( | |
| "question-answering", | |
| model="deepset/roberta-base-squad2", | |
| device=0 if device == 'cuda' else -1 | |
| ) | |
| print("Models loaded successfully!") | |
| except Exception as e: | |
| logger.error(f"Error loading models: {e}") | |
| raise RuntimeError("Failed to load required ML models.") | |
| # --- Utility Functions --- | |
| def download_slack_file(url: str, token: str) -> bytes: | |
| """Downloads a file from Slack.""" | |
| try: | |
| headers = {"Authorization": f"Bearer {token}"} | |
| response = requests.get(url, headers=headers, stream=True, timeout=30) | |
| response.raise_for_status() | |
| return response.content | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to download Slack file: {e}") | |
| raise | |
| def extract_text_from_pdf(file_content: bytes) -> str: | |
| """Extracts text from PDF.""" | |
| try: | |
| pdf_reader = pypdf.PdfReader(io.BytesIO(file_content)) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| extracted = page.extract_text() | |
| if extracted: | |
| text += extracted + "\n" | |
| return text | |
| except Exception as e: | |
| logger.error(f"Failed to extract PDF text: {e}") | |
| raise | |
| def extract_text_from_docx(file_content: bytes) -> str: | |
| """Extracts text from DOCX.""" | |
| try: | |
| doc = Document(io.BytesIO(file_content)) | |
| return "\n".join([p.text for p in doc.paragraphs]) | |
| except Exception as e: | |
| logger.error(f"Failed to extract DOCX text: {e}") | |
| raise | |
| def chunk_text(text: str, chunk_size: int = 300) -> List[str]: | |
| """Chunks text by word count.""" | |
| words = text.split() | |
| return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size) if words[i:i + chunk_size]] | |
| def embed_text(text: str) -> List[float]: | |
| """Generates embedding for text.""" | |
| if not embedding_model: | |
| raise RuntimeError("Embedding model not loaded.") | |
| return embedding_model.encode(text, convert_to_tensor=False).tolist() | |
| async def store_embeddings(chunks: List[str]): | |
| """Stores text chunks and embeddings in Supabase.""" | |
| for chunk in chunks: | |
| try: | |
| embedding = embed_text(chunk) | |
| await asyncio.to_thread( | |
| lambda c=chunk, e=embedding: supabase.table("documents").insert({ | |
| "content": c, | |
| "embedding": e | |
| }).execute() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to insert chunk: {e}") | |
| async def is_table_empty() -> bool: | |
| """Checks if documents table is empty.""" | |
| try: | |
| result = await asyncio.to_thread( | |
| lambda: supabase.table("documents").select("id", count="exact").limit(1).execute() | |
| ) | |
| return result.count == 0 | |
| except Exception as e: | |
| logger.error(f"Error checking table: {e}") | |
| return True | |
| async def search_documents(query: str, match_count: int = 5) -> List[Dict[str, Any]]: | |
| """Searches documents using vector similarity.""" | |
| if not embedding_model: | |
| raise RuntimeError("Embedding model not loaded.") | |
| try: | |
| query_embedding = embed_text(query) | |
| result = await asyncio.to_thread( | |
| lambda: supabase.rpc("match_documents", { | |
| "query_embedding": query_embedding, | |
| "match_count": match_count | |
| }).execute() | |
| ) | |
| return result.data | |
| except Exception as e: | |
| logger.error(f"Error searching documents: {e}") | |
| return [] | |
| async def answer_question(question: str, context: str) -> str: | |
| """Answers question using QA pipeline.""" | |
| if not qa_pipeline: | |
| raise RuntimeError("QA pipeline not loaded.") | |
| if not context.strip(): | |
| return "No relevant documents found." | |
| try: | |
| context_slice = context[:4096] | |
| result = await asyncio.to_thread( | |
| lambda: qa_pipeline(question=question, context=context_slice) | |
| ) | |
| return result['answer'] | |
| except Exception as e: | |
| logger.error(f"Error in QA pipeline: {e}") | |
| return "Error generating answer." | |
| # --- Slack Event Handlers --- | |
| async def handle_file_shared(event, say, client): | |
| """Processes shared files.""" | |
| file_id = event["file_id"] | |
| try: | |
| file_info = await client.files_info(file=file_id) | |
| file_data = file_info["file"] | |
| file_type = file_data.get("mimetype", "") | |
| file_url = file_data.get("url_private_download") | |
| if not file_url: | |
| await say("No download URL available.") | |
| return | |
| file_content = await asyncio.to_thread(download_slack_file, file_url, client.token) | |
| text = "" | |
| if "pdf" in file_type: | |
| text = await asyncio.to_thread(extract_text_from_pdf, file_content) | |
| elif "wordprocessingml" in file_type or "msword" in file_type: | |
| text = await asyncio.to_thread(extract_text_from_docx, file_content) | |
| else: | |
| await say("β Unsupported file type. Please upload PDF or DOCX files.") | |
| return | |
| if not text.strip(): | |
| await say("β No text could be extracted from the file.") | |
| return | |
| chunks = chunk_text(text) | |
| await store_embeddings(chunks) | |
| await say(f"β File processed! Added **{len(chunks)}** chunks to knowledge base.") | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_id}: {e}") | |
| await say(f"β Error processing file: {str(e)}") | |
| async def handle_mention(event, say): | |
| """Handles bot mentions.""" | |
| text = event["text"] | |
| user_query = re.sub(r'<@[A-Z0-9]+>', '', text).strip() | |
| if not user_query: | |
| await say("Please ask me a question!") | |
| return | |
| try: | |
| if await is_table_empty(): | |
| await say("π My knowledge base is empty. Please share PDF or DOCX files first!") | |
| return | |
| results = await search_documents(user_query, match_count=5) | |
| if not results: | |
| await say("π I couldn't find relevant information. Try uploading more documents.") | |
| return | |
| context = " ".join([doc["content"] for doc in results]) | |
| answer = await answer_question(user_query, context) | |
| await say(f"π‘ **Answer:** {answer}\n\n_Found from {len(results)} document chunks._") | |
| except Exception as e: | |
| logger.error(f"Error answering query '{user_query}': {e}") | |
| await say(f"β Error: {str(e)}") | |
| async def handle_message(event, say): | |
| """Handles direct messages to the bot.""" | |
| # Only respond to DMs (not channel messages) | |
| if event.get("channel_type") == "im": | |
| text = event.get("text", "").strip() | |
| if not text: | |
| return | |
| # Don't respond to bot's own messages | |
| if event.get("bot_id"): | |
| return | |
| try: | |
| if await is_table_empty(): | |
| await say("π My knowledge base is empty. Please share PDF or DOCX files in a channel where I'm present!") | |
| return | |
| results = await search_documents(text, match_count=5) | |
| if not results: | |
| await say("π I couldn't find relevant information. Try asking about uploaded documents.") | |
| return | |
| context = " ".join([doc["content"] for doc in results]) | |
| answer = await answer_question(text, context) | |
| await say(f"π‘ **Answer:** {answer}\n\n_Found from {len(results)} document chunks._") | |
| except Exception as e: | |
| logger.error(f"Error in DM handler: {e}") | |
| await say(f"β Error: {str(e)}") | |
| # --- FastAPI Routes --- | |
| handler = AsyncSlackRequestHandler(app) | |
| async def slack_events(request: Request): | |
| """Slack events endpoint.""" | |
| return await handler.handle(request) | |
| async def slack_install(request: Request): | |
| """OAuth installation initiation.""" | |
| return await handler.handle(request) | |
| async def oauth_redirect(request: Request): | |
| """OAuth callback handler.""" | |
| return await handler.handle(request) | |
| async def root(): | |
| """Root endpoint.""" | |
| return { | |
| "status": "β Slack RAG Bot Running", | |
| "install_url": f"/slack/install", | |
| "features": ["OAuth2 with token rotation", "RAG with vector search", "PDF/DOCX support"] | |
| } | |
| async def health(): | |
| """Health check.""" | |
| db_status = "error" | |
| try: | |
| await asyncio.to_thread( | |
| lambda: supabase.table("documents").select("id").limit(1).execute() | |
| ) | |
| db_status = "ok" | |
| except Exception as e: | |
| logger.error(f"DB health check failed: {e}") | |
| return { | |
| "status": "ok" if db_status == "ok" and embedding_model and qa_pipeline else "error", | |
| "database": db_status, | |
| "models": "ok" if embedding_model and qa_pipeline else "error", | |
| "oauth": "enabled", | |
| "token_rotation": "enabled" | |
| } | |
| async def debug_installations(): | |
| """Debug endpoint to view installations.""" | |
| try: | |
| result = await asyncio.to_thread( | |
| lambda: supabase.table("installations").select("team_id", "bot_user_id", "bot_token_expires_at", "installed_at").execute() | |
| ) | |
| installations = [] | |
| for inst in result.data: | |
| expires_at = inst.get("bot_token_expires_at") | |
| if expires_at: | |
| expires_dt = datetime.fromtimestamp(expires_at, tz=timezone.utc) | |
| inst["expires_at_formatted"] = expires_dt.isoformat() | |
| inst["expires_in_hours"] = round((expires_at - datetime.now(timezone.utc).timestamp()) / 3600, 2) | |
| installations.append(inst) | |
| return {"installations": installations, "count": len(installations)} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info(f"π Starting server on port {port}") | |
| logger.info("π OAuth2 with token rotation enabled") | |
| uvicorn_run(api, host="0.0.0.0", port=port) |