Spaces:
Build error
Build error
| import gradio as gr | |
| import json | |
| import os | |
| import io | |
| import pdfplumber | |
| import requests | |
| import together | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| import re | |
| import unicodedata | |
| from dotenv import load_dotenv | |
| from flask import jsonify | |
| load_dotenv() | |
| API_URL = "https://f152-105-161-25-67.ngrok-free.app" | |
| API_URL_FILES = f"{API_URL}/file" | |
| API_URL_EMBEDDINGS = f"{API_URL}/embeddings" | |
| API_URL_METADATA = f"{API_URL}/metadata" | |
| # FAISS index setup | |
| DIM = 768 # Adjust based on the embedding model | |
| # Set up Together.AI API Key (Replace with your actual key) | |
| assert os.getenv("TOGETHER_API_KEY"), "api key missing" | |
| # Use a sentence transformer for embeddings | |
| #'BAAI/bge-base-en-v1.5' | |
| # embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
| # 'togethercomputer/m2-bert-80M-8k-retrieval' | |
| embedding_model = SentenceTransformer( | |
| "togethercomputer/m2-bert-80M-8k-retrieval", | |
| trust_remote_code=True # Allow remote code execution | |
| ) | |
| embedding_dim = 768 # Adjust according to model | |
| def store_document_data(PDF_FILE): | |
| print(" Storing document...") | |
| if PDF_FILE: | |
| # Extract text from the PDF | |
| text = extract_text_from_pdf(PDF_FILE) | |
| if not text: | |
| return "Could not extract any text from the PDF." | |
| # Generate and return embedding | |
| embedding = embedding_model.encode([text]).astype(np.float32) | |
| print("Embeddings generated") | |
| print("Embedding shape:", embedding.shape) | |
| print(f"sending to {API_URL_EMBEDDINGS}") | |
| try: | |
| index = faiss.IndexFlatL2(embedding.shape[1]) | |
| index.add(embedding) # Add embedding | |
| print(index, index.ntotal) | |
| if index.ntotal == 0: | |
| raise ValueError("FAISS index is empty. No embeddings added.") | |
| index_file = "index.bin" | |
| faiss.write_index(index, index_file) | |
| faiss_index = faiss.read_index(index_file) | |
| print("FAISS index loaded successfully. Number of vectors:", faiss_index.ntotal) | |
| doc_index = index.ntotal - 1 | |
| with open(index_file, "rb") as f: | |
| response = requests.post(API_URL_EMBEDDINGS, | |
| files={"file": ("index.bin", f, "application/octet-stream")}) | |
| print("sent", response.json()) | |
| except requests.exceptions.RequestException as e: | |
| return {"error": str(e)} | |
| return doc_index | |
| else: | |
| return "No PDF file provided." | |
| def retrieve_document(query): | |
| print(f"Retrieving document based on:\n{query}") | |
| embeddings_ = requests.get(API_URL_EMBEDDINGS) | |
| metadata_ = requests.get(API_URL_METADATA) | |
| # Check for errors before parsing JSON | |
| if embeddings_.status_code != 200: | |
| print(f"Error fetching embeddings: {embeddings_.status_code} - {embeddings_.text}") | |
| return None | |
| if metadata_.status_code != 200: | |
| print(f"Error fetching metadata: {metadata_.status_code} - {metadata_.text}") | |
| return None | |
| try: | |
| metadata_file = metadata_.json()['metadata_file'] | |
| print(metadata_file) | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding metadata JSON: {e}") | |
| return None | |
| try: | |
| print("Response content length:", len(embeddings_.content)) # Debugging | |
| if len(embeddings_.content) == 0: | |
| raise ValueError("Received empty FAISS index file") | |
| # Convert response content to a byte stream | |
| byte_stream = io.BytesIO(embeddings_.content) | |
| # Write the received binary content to a temporary file | |
| with open("downloaded_index.bin", "wb") as f: | |
| f.write(byte_stream.read()) | |
| # Load FAISS index from file | |
| index = faiss.read_index("downloaded_index.bin") | |
| print(f"✅ Successfully loaded FAISS index with {index.ntotal} vectors.") | |
| except Exception as e: | |
| print(f"Error loading FAISS index: {e}") | |
| return None | |
| print(index, metadata_file) | |
| # Generate query embedding | |
| query_embedding = embedding_model.encode([query]).astype(np.float32) | |
| # Search for the closest document in FAISS index | |
| _, closest_idx = index.search(query_embedding, 1) | |
| metadata = metadata_file | |
| # Check if a relevant document was found | |
| if closest_idx[0][0] == -1 or str(closest_idx[0][0]) not in metadata: | |
| print("No relevant document found") | |
| return None | |
| # Retrieve the document file path | |
| filename = metadata[str(closest_idx[0][0])] | |
| print(filename) | |
| response = requests.get(API_URL_FILES, params={"file":filename}) | |
| print(response.content) | |
| recieved_file = "document.pdf" | |
| if response.status_code == 200: | |
| with open(recieved_file, "wb") as f: | |
| f.write(response.content) | |
| prompt_doc = extract_text_from_pdf(recieved_file) | |
| print(f"PDF received successfully: received_{filename}") | |
| else: | |
| print(f"Error: {response.status_code}, {response.json()}") | |
| return prompt_doc | |
| def clean_text(text): | |
| """Cleans extracted text for better processing by the model.""" | |
| print("cleaning") | |
| text = unicodedata.normalize("NFKC", text) # Normalize Unicode characters | |
| text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces and newlines | |
| text = re.sub(r'[^a-zA-Z0-9.,!?;:\\"()\-]', ' ', text) # Keep basic punctuation | |
| text = re.sub(r'(?i)(page\s*\d+)', '', text) # Remove page numbers | |
| return text | |
| def extract_text_from_pdf(pdf_file): | |
| """Extract and clean text from the uploaded PDF.""" | |
| print("extracting") | |
| try: | |
| with pdfplumber.open(pdf_file) as pdf: | |
| text = " ".join(clean_text(text) for page in pdf.pages if (text := page.extract_text())) | |
| return text | |
| except Exception as e: | |
| print(f"Error extracting text: {e}{pdf_file}") | |
| return None | |
| def split_text(text, chunk_size=500): | |
| """Splits text into smaller chunks for better processing.""" | |
| print("splitting") | |
| return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
| def chatbot(user_question, doc_presence): | |
| """Processes the PDF and answers the user's question.""" | |
| print("chatbot start") | |
| if doc_presence: | |
| # retrieve the document relevant to the query | |
| doc = retrieve_document(user_question) | |
| if doc: | |
| print(f"found doc:\n{doc}\n") | |
| # Split into smaller chunks | |
| chunks = split_text(doc) | |
| # Use only the first chunk (to optimize token usage) | |
| prompt = f"Based on this document, answer the question:\n\nDocument:\n{chunks[0]}\n\nQuestion: {user_question}" | |
| print(f"prompt:\n{prompt}") | |
| else: | |
| prompt=user_question | |
| else: | |
| prompt=user_question | |
| try: | |
| print("asking", prompt) | |
| response = together.Completion.create( | |
| model="mistralai/Mistral-7B-Instruct-v0.1", | |
| prompt=prompt, | |
| max_tokens=200, | |
| temperature=0.7, | |
| ) | |
| # Return chatbot's response | |
| return response.choices[0].text | |
| except Exception as e: | |
| return f"Error generating response: {e}" | |
| # Send to Together.AI (Mistral-7B) | |
| def helloWorld(text): | |
| return f"{text} : hello world" | |
| # Gradio Interface | |
| iface = gr.TabbedInterface( | |
| [ | |
| gr.Interface( | |
| fn=chatbot, | |
| inputs=[gr.Textbox(label="Ask a Question"), gr.Checkbox(label="Document Present?")], | |
| outputs=gr.Textbox(label="Answer"), | |
| title="PDF Q&A Chatbot (Powered by Together.AI)", | |
| ), | |
| gr.Interface( | |
| fn=helloWorld, | |
| inputs="text", | |
| outputs="text", | |
| ), | |
| gr.Interface( | |
| fn=store_document_data, | |
| inputs=[gr.File(label="PDF_FILE")], | |
| outputs=gr.Textbox(label="Answer"), | |
| title="pdf file, metadata, index parsing and storing", | |
| ), | |
| ] | |
| ) | |
| # Launch Gradio app | |
| iface.launch(show_error=True) |