Spaces:
Sleeping
Sleeping
| # audio_processor.py - FREE TTS and STT for English AND Urdu voice notes | |
| import os | |
| import tempfile | |
| import logging | |
| import time | |
| from typing import Optional, Dict, Any | |
| from fastapi import HTTPException, UploadFile | |
| import uuid | |
| import re | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """FREE Audio processing system for STT and TTS functionality (English + Urdu ONLY)""" | |
| def __init__(self): | |
| self.supported_languages = ["english", "urdu"] | |
| logger.info("🎵 FREE Audio Processor initialized - Supporting English & Urdu ONLY") | |
| async def speech_to_text(self, audio_file: UploadFile, language: str = "auto") -> Dict[str, Any]: | |
| """ | |
| Convert speech to text using FREE STT services for English AND Urdu ONLY | |
| """ | |
| try: | |
| logger.info(f"🎤 Converting speech to text - Language: {language}") | |
| # Read audio file | |
| audio_content = await audio_file.read() | |
| # Try local Whisper for multilingual support | |
| stt_result = await self._try_whisper_stt(audio_content, language) | |
| if stt_result: | |
| # Verify detected language is only Urdu or English | |
| detected_language = self._strict_detect_language_from_text(stt_result["text"]) | |
| if detected_language not in ["english", "urdu"]: | |
| logger.warning(f"⚠️ Detected non-supported language: {detected_language}, treating as English") | |
| detected_language = "english" | |
| stt_result["language"] = detected_language | |
| return stt_result | |
| # Fallback to SpeechRecognition with Google Web API (mainly English) | |
| stt_result = await self._try_speech_recognition(audio_content) | |
| if stt_result: | |
| detected_language = self._strict_detect_language_from_text(stt_result["text"]) | |
| if detected_language not in ["english", "urdu"]: | |
| detected_language = "english" | |
| stt_result["language"] = detected_language | |
| return stt_result | |
| raise HTTPException(status_code=400, detail="No FREE STT service available") | |
| except Exception as e: | |
| logger.error(f"❌ STT Error: {e}") | |
| raise HTTPException(status_code=500, detail=f"Speech recognition failed: {str(e)}") | |
| async def _try_whisper_stt(self, audio_content: bytes, language: str = "auto") -> Optional[Dict[str, Any]]: | |
| """Try local Whisper model with strict language filtering""" | |
| try: | |
| import whisper | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_audio: | |
| temp_audio.write(audio_content) | |
| temp_audio_path = temp_audio.name | |
| try: | |
| logger.info("🔊 Using local Whisper (English/Urdu)...") | |
| model = whisper.load_model("base") | |
| # Set language parameter for Whisper - only allow English or Urdu | |
| whisper_language = None | |
| if language == "urdu": | |
| whisper_language = "urdu" | |
| elif language == "english": | |
| whisper_language = "english" | |
| # For "auto", let Whisper detect but we'll filter later | |
| result = model.transcribe(temp_audio_path, language=whisper_language) | |
| # Apply strict language detection | |
| detected_language = self._strict_detect_language_from_text(result["text"]) | |
| return { | |
| "text": result["text"].strip(), | |
| "language": detected_language, | |
| "service": "local_whisper", | |
| "confidence": 0.8 | |
| } | |
| finally: | |
| # Ensure temp file cleanup | |
| if os.path.exists(temp_audio_path): | |
| try: | |
| os.unlink(temp_audio_path) | |
| except Exception as cleanup_error: | |
| logger.warning(f"⚠️ Failed to cleanup temp file: {cleanup_error}") | |
| except ImportError: | |
| logger.warning("Whisper not available for local STT") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Local Whisper STT failed: {e}") | |
| return None | |
| async def _try_speech_recognition(self, audio_content: bytes) -> Optional[Dict[str, Any]]: | |
| """Try SpeechRecognition with Google Web API (mainly English)""" | |
| try: | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| import io | |
| # Convert webm to wav for SpeechRecognition | |
| audio = AudioSegment.from_file(io.BytesIO(audio_content), format="webm") | |
| wav_data = io.BytesIO() | |
| audio.export(wav_data, format="wav") | |
| wav_data.seek(0) | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(wav_data) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| # Apply strict language detection | |
| detected_language = self._strict_detect_language_from_text(text) | |
| return { | |
| "text": text, | |
| "language": detected_language, | |
| "service": "google_web_api", | |
| "confidence": 0.7 | |
| } | |
| except ImportError: | |
| logger.warning("SpeechRecognition not available") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"SpeechRecognition failed: {e}") | |
| return None | |
| def _strict_detect_language_from_text(self, text: str) -> str: | |
| """ | |
| Strict language detection that only identifies Urdu or English | |
| Specifically excludes Hindi, Arabic, and other languages | |
| """ | |
| try: | |
| text = text.strip() | |
| if not text: | |
| return "english" # Default to English for empty text | |
| # === STRICT URDU DETECTION === | |
| # Urdu-specific character ranges (excluding Arabic and Hindi overlaps) | |
| urdu_specific_ranges = [ | |
| r'[\u0679-\u0679]', # Urdu-specific letters | |
| r'[\u067E-\u067E]', # Peh | |
| r'[\u0686-\u0686]', # Cheh | |
| r'[\u0688-\u0688]', # Ddal | |
| r'[\u0691-\u0691]', # Rreh | |
| r'[\u0698-\u0698]', # Jeh | |
| r'[\u06A9-\u06A9]', # Keheh | |
| r'[\u06AF-\u06AF]', # Gaf | |
| r'[\u06BA-\u06BA]', # Noon Ghunna | |
| r'[\u06BE-\u06BE]', # Heh Doachashmee | |
| r'[\u06C1-\u06C1]', # Heh Goal | |
| r'[\u06C2-\u06C2]', # Heh Goal with Hamza Above | |
| r'[\u06CC-\u06CC]', # Farsi Yeh | |
| r'[\u06D2-\u06D2]', # Yeh Barree | |
| ] | |
| # Common Urdu words that are distinct from Hindi/Arabic | |
| urdu_specific_words = [ | |
| 'ہے', 'ہیں', 'ہوں', 'کیا', 'کے', 'کو', 'سے', 'پر', 'میں', | |
| 'اور', 'لیکن', 'اگر', 'تو', 'بھی', 'ہی', 'تھا', 'تھی', | |
| 'تھے', 'ہو', 'رہا', 'رہی', 'رہے', 'دیں', 'دی', 'دو', 'دیجیے', | |
| 'برائے', 'کےلیے', 'کےساتھ', 'کےبعد', 'کےپاس', 'کےنیچے' | |
| ] | |
| # Check for Urdu-specific characters | |
| urdu_char_count = 0 | |
| for pattern in urdu_specific_ranges: | |
| urdu_char_count += len(re.findall(pattern, text)) | |
| # Check for Urdu-specific words | |
| urdu_word_count = sum(1 for word in urdu_specific_words if word in text) | |
| # Check for common Urdu sentence structures | |
| urdu_indicators = [ | |
| ' کا ', ' کی ', ' کے ', ' کو ', ' سے ', ' پر ', ' میں ', ' نے ', | |
| ' ہی ', ' بھی ', ' تو ', ' اگر ', ' لیکن ', ' اور ', ' یا ' | |
| ] | |
| urdu_structure_count = sum(1 for indicator in urdu_indicators if indicator in text) | |
| # === HINDI EXCLUSION === | |
| # Hindi-specific characters and words to exclude | |
| hindi_specific_chars = r'[\u0900-\u097F]' # Devanagari range | |
| hindi_char_count = len(re.findall(hindi_specific_chars, text)) | |
| hindi_specific_words = ['है', 'हो', 'की', 'के', 'को', 'से', 'में', 'ना', 'नी', 'ने'] | |
| hindi_word_count = sum(1 for word in hindi_specific_words if word in text) | |
| # === ARABIC EXCLUSION === | |
| # Arabic-specific characters (excluding common Urdu-Arabic overlaps) | |
| arabic_specific_chars = r'[\uFE70-\uFEFF]' # Arabic presentation forms | |
| arabic_char_count = len(re.findall(arabic_specific_chars, text)) | |
| # === ENGLISH DETECTION === | |
| english_words = [ | |
| 'the', 'and', 'you', 'that', 'was', 'for', 'are', 'with', 'his', 'they', | |
| 'this', 'have', 'from', 'one', 'had', 'word', 'but', 'not', 'what', 'all', | |
| 'were', 'when', 'your', 'can', 'said', 'there', 'each', 'which', 'she', 'do', | |
| 'how', 'their', 'will', 'other', 'about', 'out', 'many', 'then', 'them', 'these' | |
| ] | |
| text_lower = text.lower() | |
| english_score = sum(1 for word in english_words if word in text_lower) | |
| # === LANGUAGE DECISION LOGIC === | |
| # First, exclude Hindi and Arabic | |
| if hindi_char_count > 2 or hindi_word_count > 1: | |
| logger.info("🔍 Hindi detected, treating as English") | |
| return "english" | |
| if arabic_char_count > 2: | |
| logger.info("🔍 Arabic detected, treating as English") | |
| return "english" | |
| # Then detect Urdu with high confidence | |
| urdu_confidence_score = ( | |
| urdu_char_count * 2 + | |
| urdu_word_count * 3 + | |
| urdu_structure_count * 1.5 | |
| ) | |
| # Strong Urdu detection thresholds | |
| if urdu_confidence_score >= 5: | |
| logger.info(f"🔍 Urdu detected (confidence: {urdu_confidence_score})") | |
| return "urdu" | |
| # English detection | |
| if english_score >= 3 or len(text.split()) >= 4: | |
| logger.info(f"🔍 English detected (score: {english_score})") | |
| return "english" | |
| # If we have some Urdu indicators but not enough for confident detection | |
| if urdu_confidence_score >= 2: | |
| logger.info(f"🔍 Weak Urdu signals, treating as Urdu (confidence: {urdu_confidence_score})") | |
| return "urdu" | |
| # Default to English | |
| logger.info("🔍 Defaulting to English") | |
| return "english" | |
| except Exception as e: | |
| logger.error(f"❌ Language detection error: {e}") | |
| return "english" # Safe default | |
| def _detect_language_from_text(self, text: str) -> str: | |
| """Legacy method for backward compatibility""" | |
| return self._strict_detect_language_from_text(text) | |
| async def text_to_speech(self, text: str, language: str = "english") -> Optional[Dict[str, Any]]: | |
| """ | |
| Convert text to speech using FREE TTS services | |
| NOTE: Keeping TTS for potential future use, but currently disabled for responses | |
| """ | |
| try: | |
| # Since we're only returning text responses now, TTS is optional | |
| # But keeping the function for potential future use | |
| logger.info(f"🔊 TTS requested for {language}: {text[:50]}...") | |
| return None # Disable TTS for now | |
| except Exception as e: | |
| logger.error(f"❌ TTS Error: {e}") | |
| return None | |
| async def cleanup_old_audio_files(self, max_age_hours: int = 1): | |
| """Clean up audio files older than specified hours""" | |
| try: | |
| audio_dir = os.path.join("static", "audio") | |
| if not os.path.exists(audio_dir): | |
| return | |
| current_time = time.time() | |
| deleted_count = 0 | |
| for filename in os.listdir(audio_dir): | |
| if filename.startswith("tts_") and (filename.endswith(".mp3") or filename.endswith(".wav")): | |
| file_path = os.path.join(audio_dir, filename) | |
| if os.path.isfile(file_path): | |
| # Delete files older than max_age_hours | |
| file_age_hours = (current_time - os.path.getctime(file_path)) / 3600 | |
| if file_age_hours > max_age_hours: | |
| try: | |
| os.remove(file_path) | |
| deleted_count += 1 | |
| logger.info(f"🧹 Cleaned up old audio file: {filename}") | |
| except Exception as cleanup_error: | |
| logger.warning(f"⚠️ Failed to cleanup audio file {filename}: {cleanup_error}") | |
| if deleted_count > 0: | |
| logger.info(f"🧹 Cleaned up {deleted_count} old audio file(s)") | |
| except Exception as e: | |
| logger.error(f"Error cleaning up audio files: {e}") | |
| # Global audio processor instance | |
| audio_processor = AudioProcessor() |