#!/usr/bin/env python # -*- coding: utf-8 -*- """ TF5 Mixer Telegram Bot - Con salvataggio conversazioni """ import json import time import sys import os from typing import Dict, List, Optional from pathlib import Path from urllib.request import urlopen, Request from urllib.error import URLError, HTTPError from urllib.parse import urlencode from google import genai from google.genai import types from dotenv import load_dotenv from mixer_controller import TF5MixerController import re load_dotenv() from config import * class ConversationManager: """Gestisce il salvataggio delle conversazioni su file JSON.""" def __init__(self, conversations_dir: str = "conversations"): self.conversations_dir = Path(conversations_dir) self.conversations_dir.mkdir(exist_ok=True) def _get_filepath(self, user_id: int) -> Path: """Restituisce il percorso del file per un utente.""" return self.conversations_dir / f"{user_id}.json" def save_conversation(self, user_id: int, history: List[Dict], username: str = "Unknown"): """Salva la conversazione di un utente.""" try: filepath = self._get_filepath(user_id) data = { "user_id": user_id, "username": username, "last_updated": time.time(), "last_updated_str": time.strftime("%Y-%m-%d %H:%M:%S"), "message_count": len(history), "history": history } with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"❌ Errore nel salvataggio conversazione {user_id}: {e}") return False def load_conversation(self, user_id: int) -> Optional[List[Dict]]: """Carica la conversazione di un utente.""" try: filepath = self._get_filepath(user_id) if not filepath.exists(): return None with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return data.get("history", []) except Exception as e: print(f"❌ Errore nel caricamento conversazione {user_id}: {e}") return None def delete_conversation(self, user_id: int) -> bool: """Elimina la conversazione di un utente.""" try: filepath = self._get_filepath(user_id) if filepath.exists(): filepath.unlink() return True return False except Exception as e: print(f"❌ Errore nell'eliminazione conversazione {user_id}: {e}") return False def get_all_users(self) -> List[Dict]: """Restituisce la lista di tutti gli utenti con conversazioni salvate.""" users = [] for filepath in self.conversations_dir.glob("*.json"): try: with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) users.append({ "user_id": data["user_id"], "username": data.get("username", "Unknown"), "message_count": data.get("message_count", 0), "last_updated": data.get("last_updated_str", "Unknown") }) except Exception as e: print(f"❌ Errore lettura {filepath}: {e}") return users class TelegramBot: """Bot Telegram base con long polling.""" def __init__(self, token: str): self.token = token self.base_url = f"https://api.telegram.org/bot{token}" self.offset = 0 def _make_request(self, method: str, params: Optional[Dict] = None) -> Dict: """Esegue una richiesta all'API di Telegram.""" url = f"{self.base_url}/{method}" try: if params: data = json.dumps(params).encode('utf-8') headers = {'Content-Type': 'application/json'} req = Request(url, data=data, headers=headers) else: req = Request(url) with urlopen(req, timeout=30) as response: return json.loads(response.read().decode('utf-8')) except HTTPError as e: error_body = e.read().decode('utf-8') print(f"❌ HTTP Error {e.code}: {error_body}") return {"ok": False, "description": error_body} except URLError as e: print(f"❌ URL Error: {e.reason}") return {"ok": False, "description": str(e.reason)} except Exception as e: print(f"❌ Error: {e}") return {"ok": False, "description": str(e)} def get_updates(self, timeout: int = 30) -> List[Dict]: """Ottiene gli aggiornamenti usando long polling.""" params = { "offset": self.offset, "timeout": timeout, "allowed_updates": ["message"] } result = self._make_request("getUpdates", params) if result.get("ok"): updates = result.get("result", []) if updates: self.offset = updates[-1]["update_id"] + 1 return updates return [] def escape_markdown_v2(self, text: str) -> str: """Escape dei caratteri speciali per MarkdownV2.""" # Caratteri da escapare in MarkdownV2 escape_chars = r'_*[]()~`>#+-=|{}.!' return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text) def send_message(self, chat_id: int, text: str, parse_mode: Optional[str] = None) -> Dict: """Invia un messaggio a una chat.""" params = { "chat_id": chat_id, "text": text } if parse_mode: # Se usi MarkdownV2, applica l'escape automatico if parse_mode == "MarkdownV2": params["text"] = self.escape_markdown_v2(text) params["parse_mode"] = parse_mode return self._make_request("sendMessage", params) def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict: """Invia un'azione (es. "typing").""" params = { "chat_id": chat_id, "action": action } return self._make_request("sendChatAction", params) class TF5TelegramBot: """Bot Telegram per controllare il mixer TF5.""" def __init__(self, telegram_token: str, mixer_host: str = DEFAULT_HOST, mixer_port: int = DEFAULT_PORT, conversations_dir: str = "conversations"): self.bot = TelegramBot(telegram_token) self.controller = TF5MixerController(mixer_host, mixer_port) self.conversation_manager = ConversationManager(conversations_dir) # Configura Gemini api_key = os.getenv("GEMINI_API_KEY") if not api_key: raise ValueError("GEMINI_API_KEY non trovata") self.client = genai.Client(api_key=api_key) # Configurazione Gemini con function calling self.config = types.GenerateContentConfig( tools=[ self.controller.recall_scene, self.controller.set_channel_level, self.controller.set_channel_on_off, self.controller.set_channel_pan, self.controller.set_mix_level, self.controller.set_mix_on_off, self.controller.mute_multiple_channels, self.controller.unmute_multiple_channels, self.controller.get_channel_info, self.controller.get_mix_info, self.controller.search_channels_by_name, self.controller.search_mixes_by_name, self.controller.get_all_channels_summary, self.controller.get_all_mixes_summary, self.controller.refresh_cache, self.controller.set_channel_to_mix_level, self.controller.set_channel_to_mix_on_off, self.controller.get_channel_to_mix_info, ], temperature=0, ) # Storia delle conversazioni per ogni utente (in memoria) self.conversations: Dict[int, List[Dict]] = {} # Mappa user_id -> username per il salvataggio self.usernames: Dict[int, str] = {} # System instruction self.system_instruction = """Sei un assistente per il controllo del mixer audio Yamaha TF5. Parli in modo semplice e diretto, come un tecnico del suono esperto che aiuta i musicisti sul palco. Il mixer ha: - 40 canali (microfoni, strumenti, ecc.) - 20 mix/aux (monitor, effetti, ecc.) - Ogni canale ha volume (da silenzio a +10 dB), acceso/spento, e bilanciamento sinistra/destra - Scene memorizzate nei banchi A e B (da 0 a 99) IMPORTANTE - Sistema di Cache: - Le info sui canali e mix sono salvate per 60 minuti per non sovraccaricare il mixer - Usa search_channels_by_name e search_mixes_by_name per cercare velocemente - Usa get_all_channels_summary e get_all_mixes_summary per vedere tutto - Quando modifichi un canale/mix, la cache viene aggiornata automaticamente - Quando si carica una scena, i dati vengono invalidati e aggiornati alla prossima richiesta - Puoi fare refresh_cache solo se l'utente lo chiede esplicitamente Come interpretare le richieste: VOLUME/LIVELLO: - "alza/abbassa/aumenta/diminuisci" → cambia il volume - "più/meno forte/volume" → cambia il volume - "al massimo" → +10 dB - "un po' più alto" → +3 dB circa - "metti a zero" o "unity" → 0 dB - "abbassa di poco" → -3 dB - "metti basso" → -20 dB - "silenzio/muto" → spegni il canale ON/OFF: - "accendi/attiva/apri" → canale/mix ON - "spegni/muta/chiudi/stacca" → canale/mix OFF - "muto" può significare sia spegnere che abbassare molto BILANCIAMENTO (PAN): - "a sinistra/left" → pan -63 - "a destra/right" → pan +63 - "al centro" → pan 0 - "un po' a sinistra" → pan -30 circa IDENTIFICAZIONE CANALI E MIX: - Accetta sia numeri ("canale 5", "mix 3") che nomi ("il microfono del cantante", "monitor palco") - Se non trovi un canale/mix per nome, cerca usando search_channels_by_name o search_mixes_by_name - "il mio mic/microfono" → cerca tra i canali chi è sul palco - "le chitarre/i vox/le tastiere" → cerca per strumento - "tutti i mic/tutte le chitarre" → cerca e gestisci multipli - "il monitor" / "l'aux 2" → cerca tra i mix SEND AI MIX/AUX: - "alza il gelato nell'aux 2" → usa set_channel_to_mix_level - "manda più chitarra nel monitor" → aumenta il send - "togli la voce dal mix 3" → spegni il send o abbassa molto - i mix 9/10, 11/12, 13/14, 15/16, 17/18, 19/20 sono stereo, fai le operazioni su entrambi SCENE: - "carica/richiama/vai alla scena X" → recall_scene - Accetta "A5", "scena A 5", "la cinque del banco A", ecc. GRUPPI DI CANALI: - "i canali dal 3 al 7" → canali 3,4,5,6,7 - "spegni tutto tranne..." → muta tutti gli altri - "solo i microfoni" → attiva solo quelli, spegni il resto CASI PARTICOLARI: - Se la richiesta è ambigua, chiedi chiarimenti in modo colloquiale - Se serve cercare un canale/mix, usa prima la cache (search_channels_by_name / search_mixes_by_name) - Conferma sempre cosa hai fatto con un messaggio breve e chiaro - Usa emoji occasionalmente per rendere le risposte più amichevoli (✅ ❌ 🎤 🎸 🔊) - Se qualcosa non funziona, spiega il problema in modo semplice Rispondi sempre in modo: - Diretto e colloquiale - Senza troppi tecnicismi - Confermando chiaramente l'azione eseguita - Suggerendo alternative se qualcosa non è possibile Ricorda: chi ti parla è spesso sul palco, con le mani occupate da uno strumento. Devi essere veloce, chiaro e capire anche richieste approssimative. """ def _get_file_path(self, file_id: str) -> str: """Richiede a Telegram il percorso di un file.""" result = self.bot._make_request("getFile", {"file_id": file_id}) if result.get("ok"): return result["result"]["file_path"] raise Exception("Impossibile ottenere il file da Telegram") def _download_file(self, url: str) -> bytes: """Scarica un file da Telegram e restituisce i bytes.""" req = Request(url) with urlopen(req, timeout=30) as response: return response.read() def _transcribe_audio(self, audio_bytes: bytes) -> Optional[str]: """Trascrive un file audio in testo usando Gemini.""" try: response = self.client.models.generate_content( model="gemini-2.5-flash", contents=[ "Trascrivi in testo chiaro ciò che dice questa registrazione:", types.Part.from_bytes( data=audio_bytes, mime_type="audio/ogg" # Telegram voice è in OGG/Opus ), ] ) return response.text.strip() except Exception as e: print(f"❌ Errore nella trascrizione: {e}") return None def get_conversation_history(self, user_id: int) -> List[Dict]: """Ottiene la storia della conversazione per un utente.""" if user_id not in self.conversations: # Prova a caricare dal file history = self.conversation_manager.load_conversation(user_id) self.conversations[user_id] = history if history else [] return self.conversations[user_id] def add_to_history(self, user_id: int, role: str, content: str): """Aggiunge un messaggio alla storia e salva.""" history = self.get_conversation_history(user_id) history.append({ "role": role, "content": content, "timestamp": time.time(), "timestamp_str": time.strftime("%Y-%m-%d %H:%M:%S") }) # Mantieni solo gli ultimi 20 messaggi per evitare context overflow if len(history) > 20: self.conversations[user_id] = history[-20:] # Salva su file username = self.usernames.get(user_id, "Unknown") self.conversation_manager.save_conversation(user_id, history, username) def clear_history(self, user_id: int): """Cancella la storia di un utente.""" self.conversations[user_id] = [] self.conversation_manager.delete_conversation(user_id) def process_message(self, user_id: int, username: str, text: str) -> str: """Elabora un messaggio dell'utente.""" try: # Salva username self.usernames[user_id] = username # Comandi speciali if text.lower() in ['/start', '/help']: return self._get_help_message() if text.lower() == '/reset': self.clear_history(user_id) return "🔄 Storia della conversazione cancellata. Ricominciamo da capo!" if text.lower() == '/status': return self._get_status_message() if text.lower() == '/export': return self._export_conversation(user_id) # Aggiungi il messaggio alla storia self.add_to_history(user_id, "user", text) # Prepara il prompt con la storia history = self.get_conversation_history(user_id) conversation_context = "\n".join([ f"{'Utente' if msg['role'] == 'user' else 'Assistente'}: {msg['content']}" for msg in history[:-1] # Escludi l'ultimo (già incluso sotto) ]) full_prompt = f"{self.system_instruction}\n\n" if conversation_context: full_prompt += f"Conversazione precedente:\n{conversation_context}\n\n" full_prompt += f"Utente: {text}" # Genera risposta con Gemini response = self.client.models.generate_content( model="gemini-2.5-flash", contents=full_prompt, config=self.config, ) assistant_response = response.text # Aggiungi la risposta alla storia self.add_to_history(user_id, "assistant", assistant_response) return assistant_response except Exception as e: return f"❌ Errore nell'elaborazione: {e}" def _export_conversation(self, user_id: int) -> str: """Genera un riepilogo della conversazione.""" history = self.get_conversation_history(user_id) if not history: return "📭 Non ci sono messaggi nella conversazione." username = self.usernames.get(user_id, "Unknown") filepath = self.conversation_manager._get_filepath(user_id) msg = f"💾 **Conversazione salvata**\n\n" msg += f"👤 Utente: @{username} (ID: {user_id})\n" msg += f"💬 Messaggi: {len(history)}\n" msg += f"📁 File: {filepath}\n\n" msg += "La conversazione viene salvata automaticamente dopo ogni messaggio." return msg def _get_help_message(self) -> str: """Restituisce il messaggio di aiuto.""" return """🎛️ **TF5 Mixer Bot** Controlla il mixer Yamaha TF5 con comandi in linguaggio naturale! **Esempi di comandi:** • "Alza il canale 5 a -10 dB" • "Spegni i canali dal 1 al 5" • "Richiama la scena A10" • "Muta i canali 2, 4 e 6" • "Alza il mix 3 di 5 dB" • "Mostrami lo stato del canale 12" • "Cerca i monitor" **Comandi speciali:** /help - Mostra questo messaggio /reset - Cancella la storia della conversazione /status - Mostra stato del sistema /export - Info sul file della conversazione Scrivi semplicemente cosa vuoi fare e ci penso io! 🎵 💾 Ogni conversazione viene salvata automaticamente.""" def _get_status_message(self) -> str: """Restituisce lo stato del sistema.""" cache_age = time.time() - self.controller._cache.get("timestamp", 0) cache_valid = self.controller._is_cache_valid() status = "📊 **Status del Sistema**\n\n" status += f"🎛️ Mixer: Connesso a {self.controller.host}:{self.controller.port}\n" if cache_valid: channels_count = len(self.controller._cache.get("channels", {})) mixes_count = len(self.controller._cache.get("mixes", {})) status += f"💾 Cache: Valida (età: {int(cache_age)}s)\n" status += f"📊 Dati: {channels_count} canali, {mixes_count} mix\n" else: status += "💾 Cache: Non disponibile\n" users_count = len(self.conversations) total_messages = sum(len(hist) for hist in self.conversations.values()) status += f"👥 Utenti attivi: {users_count}\n" status += f"💬 Messaggi totali: {total_messages}\n\n" # Lista utenti con conversazioni salvate all_users = self.conversation_manager.get_all_users() if all_users: status += "📁 **Conversazioni salvate:**\n" for user in all_users[:5]: # Mostra solo i primi 5 status += f"• @{user['username']}: {user['message_count']} msg ({user['last_updated']})\n" if len(all_users) > 5: status += f"... e altri {len(all_users) - 5} utenti" return status def handle_update(self, update: Dict): """Gestisce un aggiornamento di Telegram.""" if "message" not in update: return message = update["message"] chat_id = message["chat"]["id"] user_id = message["from"]["id"] username = message["from"].get("username", "Unknown") # 🎤 Se è un messaggio vocale o audio if "voice" in message or "audio" in message: self.bot.send_chat_action(chat_id, "typing") file_info = message.get("voice") or message.get("audio") file_id = file_info["file_id"] # Ottieni l'URL del file Telegram file_path = self._get_file_path(file_id) file_url = f"https://api.telegram.org/file/bot{self.bot.token}/{file_path}" # Scarica il file audio_bytes = self._download_file(file_url) # Trascrivi con Gemini transcript = self._transcribe_audio(audio_bytes) if not transcript: self.bot.send_message(chat_id, "❌ Non sono riuscito a capire l’audio.") return # Conferma trascrizione self.bot.send_message(chat_id, f"🗣️ Hai detto:\n> {transcript}") # Processa il testo come se fosse un messaggio response = self.process_message(user_id, username, transcript) self.bot.send_message(chat_id, response, parse_mode='MarkdownV2') return # 🎯 Messaggi testuali classici if "text" not in message: return text = message["text"] print(f"📨 Messaggio da @{username}: {text}") self.bot.send_chat_action(chat_id, "typing") response = self.process_message(user_id, username, text) self.bot.send_message(chat_id, response, parse_mode='MarkdownV2') def run(self): """Avvia il bot con long polling.""" print("=" * 70) print("🤖 TF5 Mixer Telegram Bot") print("=" * 70) # Verifica connessione me = self.bot._make_request("getMe") if me.get("ok"): bot_info = me["result"] print(f"\n✅ Bot avviato: @{bot_info['username']}") print(f" Nome: {bot_info['first_name']}") print(f" ID: {bot_info['id']}") else: print("❌ Errore nella verifica del bot") return # Mostra stato cache cache_age = time.time() - self.controller._cache.get("timestamp", 0) if self.controller._is_cache_valid(): channels_count = len(self.controller._cache.get("channels", {})) mixes_count = len(self.controller._cache.get("mixes", {})) print(f"\n💾 Cache disponibile (età: {int(cache_age)}s)") print(f" 📊 {channels_count} canali, {mixes_count} mix") else: print("\n💾 Cache non disponibile, verrà creata al primo utilizzo") # Mostra conversazioni salvate saved_convs = self.conversation_manager.get_all_users() if saved_convs: print(f"\n📁 Conversazioni salvate: {len(saved_convs)}") print(f" Directory: {self.conversation_manager.conversations_dir}") print("\n🔄 In attesa di messaggi... (Ctrl+C per terminare)\n") try: while True: try: updates = self.bot.get_updates() for update in updates: try: self.handle_update(update) except Exception as e: print(f"❌ Errore nell'elaborazione update: {e}") time.sleep(0.1) # Piccola pausa per non sovraccaricare except KeyboardInterrupt: raise except Exception as e: print(f"❌ Errore nel polling: {e}") time.sleep(5) # Attendi prima di riprovare except KeyboardInterrupt: print("\n\n👋 Bot terminato") print(f"💾 Conversazioni salvate in: {self.conversation_manager.conversations_dir}") def close(self): """Chiude le connessioni.""" self.controller.close() def main(): """Funzione principale.""" import argparse parser = argparse.ArgumentParser( description='TF5 Mixer Telegram Bot' ) parser.add_argument('--mixer-host', default=DEFAULT_HOST, help=f'IP del mixer (default: {DEFAULT_HOST})') parser.add_argument('--mixer-port', type=int, default=DEFAULT_PORT, help=f'Porta mixer (default: {DEFAULT_PORT})') parser.add_argument('--conversations-dir', default='conversations', help='Directory per salvare le conversazioni (default: conversations)') args = parser.parse_args() # Verifica variabili d'ambiente telegram_token = os.getenv("TELEGRAM_BOT_TOKEN") if not telegram_token: print("❌ Errore: TELEGRAM_BOT_TOKEN non trovata") print("\nPer impostare il token:") print(" export TELEGRAM_BOT_TOKEN='il-tuo-token'") print("\nOttieni un token da @BotFather su Telegram") sys.exit(1) gemini_key = os.getenv("GEMINI_API_KEY") if not gemini_key: print("❌ Errore: GEMINI_API_KEY non trovata") print("\nPer impostare la chiave API:") print(" export GEMINI_API_KEY='la-tua-chiave-api'") print("\nOttieni una chiave su: https://aistudio.google.com/apikey") sys.exit(1) try: bot = TF5TelegramBot( telegram_token=telegram_token, mixer_host=args.mixer_host, mixer_port=args.mixer_port, conversations_dir=args.conversations_dir ) bot.run() except Exception as e: print(f"❌ Errore fatale: {e}") sys.exit(1) finally: bot.close() if __name__ == '__main__': main()