Files
Automixbot/telegram_bot.py
2025-10-27 20:55:10 +01:00

626 lines
23 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TF5 Mixer Telegram Bot - Con salvataggio conversazioni
"""
import json
import time
import sys
import os
from typing import Dict, List, Optional
from pathlib import Path
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from urllib.parse import urlencode
from google import genai
from google.genai import types
from dotenv import load_dotenv
from mixer_controller import TF5MixerController
import re
load_dotenv()
from config import *
class ConversationManager:
"""Gestisce il salvataggio delle conversazioni su file JSON."""
def __init__(self, conversations_dir: str = "conversations"):
self.conversations_dir = Path(conversations_dir)
self.conversations_dir.mkdir(exist_ok=True)
def _get_filepath(self, user_id: int) -> Path:
"""Restituisce il percorso del file per un utente."""
return self.conversations_dir / f"{user_id}.json"
def save_conversation(self, user_id: int, history: List[Dict],
username: str = "Unknown"):
"""Salva la conversazione di un utente."""
try:
filepath = self._get_filepath(user_id)
data = {
"user_id": user_id,
"username": username,
"last_updated": time.time(),
"last_updated_str": time.strftime("%Y-%m-%d %H:%M:%S"),
"message_count": len(history),
"history": history
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"❌ Errore nel salvataggio conversazione {user_id}: {e}")
return False
def load_conversation(self, user_id: int) -> Optional[List[Dict]]:
"""Carica la conversazione di un utente."""
try:
filepath = self._get_filepath(user_id)
if not filepath.exists():
return None
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("history", [])
except Exception as e:
print(f"❌ Errore nel caricamento conversazione {user_id}: {e}")
return None
def delete_conversation(self, user_id: int) -> bool:
"""Elimina la conversazione di un utente."""
try:
filepath = self._get_filepath(user_id)
if filepath.exists():
filepath.unlink()
return True
return False
except Exception as e:
print(f"❌ Errore nell'eliminazione conversazione {user_id}: {e}")
return False
def get_all_users(self) -> List[Dict]:
"""Restituisce la lista di tutti gli utenti con conversazioni salvate."""
users = []
for filepath in self.conversations_dir.glob("*.json"):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
users.append({
"user_id": data["user_id"],
"username": data.get("username", "Unknown"),
"message_count": data.get("message_count", 0),
"last_updated": data.get("last_updated_str", "Unknown")
})
except Exception as e:
print(f"❌ Errore lettura {filepath}: {e}")
return users
class TelegramBot:
"""Bot Telegram base con long polling."""
def __init__(self, token: str):
self.token = token
self.base_url = f"https://api.telegram.org/bot{token}"
self.offset = 0
def _make_request(self, method: str, params: Optional[Dict] = None) -> Dict:
"""Esegue una richiesta all'API di Telegram."""
url = f"{self.base_url}/{method}"
try:
if params:
data = json.dumps(params).encode('utf-8')
headers = {'Content-Type': 'application/json'}
req = Request(url, data=data, headers=headers)
else:
req = Request(url)
with urlopen(req, timeout=30) as response:
return json.loads(response.read().decode('utf-8'))
except HTTPError as e:
error_body = e.read().decode('utf-8')
print(f"❌ HTTP Error {e.code}: {error_body}")
return {"ok": False, "description": error_body}
except URLError as e:
print(f"❌ URL Error: {e.reason}")
return {"ok": False, "description": str(e.reason)}
except Exception as e:
print(f"❌ Error: {e}")
return {"ok": False, "description": str(e)}
def get_updates(self, timeout: int = 30) -> List[Dict]:
"""Ottiene gli aggiornamenti usando long polling."""
params = {
"offset": self.offset,
"timeout": timeout,
"allowed_updates": ["message"]
}
result = self._make_request("getUpdates", params)
if result.get("ok"):
updates = result.get("result", [])
if updates:
self.offset = updates[-1]["update_id"] + 1
return updates
return []
def escape_markdown_v2(self, text: str) -> str:
"""Escape dei caratteri speciali per MarkdownV2."""
# Caratteri da escapare in MarkdownV2
escape_chars = r'_*[]()~`>#+-=|{}.!'
return re.sub(f'([{re.escape(escape_chars)}])', r'\\\1', text)
def send_message(self, chat_id: int, text: str,
parse_mode: Optional[str] = None) -> Dict:
"""Invia un messaggio a una chat."""
params = {
"chat_id": chat_id,
"text": text
}
if parse_mode:
# Se usi MarkdownV2, applica l'escape automatico
if parse_mode == "MarkdownV2":
params["text"] = self.escape_markdown_v2(text)
params["parse_mode"] = parse_mode
return self._make_request("sendMessage", params)
def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict:
"""Invia un'azione (es. "typing")."""
params = {
"chat_id": chat_id,
"action": action
}
return self._make_request("sendChatAction", params)
class TF5TelegramBot:
"""Bot Telegram per controllare il mixer TF5."""
def __init__(self, telegram_token: str, mixer_host: str = DEFAULT_HOST,
mixer_port: int = DEFAULT_PORT, conversations_dir: str = "conversations"):
self.bot = TelegramBot(telegram_token)
self.controller = TF5MixerController(mixer_host, mixer_port)
self.conversation_manager = ConversationManager(conversations_dir)
# Configura Gemini
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY non trovata")
self.client = genai.Client(api_key=api_key)
# Configurazione Gemini con function calling
self.config = types.GenerateContentConfig(
tools=[
self.controller.recall_scene,
self.controller.set_channel_level,
self.controller.set_channel_on_off,
self.controller.set_channel_pan,
self.controller.set_mix_level,
self.controller.set_mix_on_off,
self.controller.mute_multiple_channels,
self.controller.unmute_multiple_channels,
self.controller.get_channel_info,
self.controller.get_mix_info,
self.controller.search_channels_by_name,
self.controller.search_mixes_by_name,
self.controller.get_all_channels_summary,
self.controller.get_all_mixes_summary,
self.controller.refresh_cache,
self.controller.set_channel_to_mix_level,
self.controller.set_channel_to_mix_on_off,
self.controller.get_channel_to_mix_info,
],
temperature=0,
)
# Storia delle conversazioni per ogni utente (in memoria)
self.conversations: Dict[int, List[Dict]] = {}
# Mappa user_id -> username per il salvataggio
self.usernames: Dict[int, str] = {}
# System instruction
self.system_instruction = """Sei un assistente per il controllo del mixer audio Yamaha TF5.
Parli in modo semplice e diretto, come un tecnico del suono esperto che aiuta i musicisti sul palco.
Il mixer ha:
- 40 canali (microfoni, strumenti, ecc.)
- 20 mix/aux (monitor, effetti, ecc.)
- Ogni canale ha volume (da silenzio a +10 dB), acceso/spento, e bilanciamento sinistra/destra
- Scene memorizzate nei banchi A e B (da 0 a 99)
IMPORTANTE - Sistema di Cache:
- Le info sui canali e mix sono salvate per 60 minuti per non sovraccaricare il mixer
- Usa search_channels_by_name e search_mixes_by_name per cercare velocemente
- Usa get_all_channels_summary e get_all_mixes_summary per vedere tutto
- Quando modifichi un canale/mix, la cache viene aggiornata automaticamente
- Quando si carica una scena, i dati vengono invalidati e aggiornati alla prossima richiesta
- Puoi fare refresh_cache solo se l'utente lo chiede esplicitamente
Come interpretare le richieste:
VOLUME/LIVELLO:
- "alza/abbassa/aumenta/diminuisci" → cambia il volume
- "più/meno forte/volume" → cambia il volume
- "al massimo" → +10 dB
- "un po' più alto" → +3 dB circa
- "metti a zero" o "unity" → 0 dB
- "abbassa di poco" → -3 dB
- "metti basso" → -20 dB
- "silenzio/muto" → spegni il canale
ON/OFF:
- "accendi/attiva/apri" → canale/mix ON
- "spegni/muta/chiudi/stacca" → canale/mix OFF
- "muto" può significare sia spegnere che abbassare molto
BILANCIAMENTO (PAN):
- "a sinistra/left" → pan -63
- "a destra/right" → pan +63
- "al centro" → pan 0
- "un po' a sinistra" → pan -30 circa
IDENTIFICAZIONE CANALI E MIX:
- Accetta sia numeri ("canale 5", "mix 3") che nomi ("il microfono del cantante", "monitor palco")
- Se non trovi un canale/mix per nome, cerca usando search_channels_by_name o search_mixes_by_name
- "il mio mic/microfono" → cerca tra i canali chi è sul palco
- "le chitarre/i vox/le tastiere" → cerca per strumento
- "tutti i mic/tutte le chitarre" → cerca e gestisci multipli
- "il monitor" / "l'aux 2" → cerca tra i mix
SEND AI MIX/AUX:
- "alza il gelato nell'aux 2" → usa set_channel_to_mix_level
- "manda più chitarra nel monitor" → aumenta il send
- "togli la voce dal mix 3" → spegni il send o abbassa molto
- i mix 9/10, 11/12, 13/14, 15/16, 17/18, 19/20 sono stereo, fai le operazioni su entrambi
SCENE:
- "carica/richiama/vai alla scena X" → recall_scene
- Accetta "A5", "scena A 5", "la cinque del banco A", ecc.
GRUPPI DI CANALI:
- "i canali dal 3 al 7" → canali 3,4,5,6,7
- "spegni tutto tranne..." → muta tutti gli altri
- "solo i microfoni" → attiva solo quelli, spegni il resto
CASI PARTICOLARI:
- Se la richiesta è ambigua, chiedi chiarimenti in modo colloquiale
- Se serve cercare un canale/mix, usa prima la cache (search_channels_by_name / search_mixes_by_name)
- Conferma sempre cosa hai fatto con un messaggio breve e chiaro
- Usa emoji occasionalmente per rendere le risposte più amichevoli (✅ ❌ 🎤 🎸 🔊)
- Se qualcosa non funziona, spiega il problema in modo semplice
Rispondi sempre in modo:
- Diretto e colloquiale
- Senza troppi tecnicismi
- Confermando chiaramente l'azione eseguita
- Suggerendo alternative se qualcosa non è possibile
Ricorda: chi ti parla è spesso sul palco, con le mani occupate da uno strumento.
Devi essere veloce, chiaro e capire anche richieste approssimative.
"""
def get_conversation_history(self, user_id: int) -> List[Dict]:
"""Ottiene la storia della conversazione per un utente."""
if user_id not in self.conversations:
# Prova a caricare dal file
history = self.conversation_manager.load_conversation(user_id)
self.conversations[user_id] = history if history else []
return self.conversations[user_id]
def add_to_history(self, user_id: int, role: str, content: str):
"""Aggiunge un messaggio alla storia e salva."""
history = self.get_conversation_history(user_id)
history.append({
"role": role,
"content": content,
"timestamp": time.time(),
"timestamp_str": time.strftime("%Y-%m-%d %H:%M:%S")
})
# Mantieni solo gli ultimi 20 messaggi per evitare context overflow
if len(history) > 20:
self.conversations[user_id] = history[-20:]
# Salva su file
username = self.usernames.get(user_id, "Unknown")
self.conversation_manager.save_conversation(user_id, history, username)
def clear_history(self, user_id: int):
"""Cancella la storia di un utente."""
self.conversations[user_id] = []
self.conversation_manager.delete_conversation(user_id)
def process_message(self, user_id: int, username: str, text: str) -> str:
"""Elabora un messaggio dell'utente."""
try:
# Salva username
self.usernames[user_id] = username
# Comandi speciali
if text.lower() in ['/start', '/help']:
return self._get_help_message()
if text.lower() == '/reset':
self.clear_history(user_id)
return "🔄 Storia della conversazione cancellata. Ricominciamo da capo!"
if text.lower() == '/status':
return self._get_status_message()
if text.lower() == '/export':
return self._export_conversation(user_id)
# Aggiungi il messaggio alla storia
self.add_to_history(user_id, "user", text)
# Prepara il prompt con la storia
history = self.get_conversation_history(user_id)
conversation_context = "\n".join([
f"{'Utente' if msg['role'] == 'user' else 'Assistente'}: {msg['content']}"
for msg in history[:-1] # Escludi l'ultimo (già incluso sotto)
])
full_prompt = f"{self.system_instruction}\n\n"
if conversation_context:
full_prompt += f"Conversazione precedente:\n{conversation_context}\n\n"
full_prompt += f"Utente: {text}"
# Genera risposta con Gemini
response = self.client.models.generate_content(
model="gemini-2.5-flash",
contents=full_prompt,
config=self.config,
)
assistant_response = response.text
# Aggiungi la risposta alla storia
self.add_to_history(user_id, "assistant", assistant_response)
return assistant_response
except Exception as e:
return f"❌ Errore nell'elaborazione: {e}"
def _export_conversation(self, user_id: int) -> str:
"""Genera un riepilogo della conversazione."""
history = self.get_conversation_history(user_id)
if not history:
return "📭 Non ci sono messaggi nella conversazione."
username = self.usernames.get(user_id, "Unknown")
filepath = self.conversation_manager._get_filepath(user_id)
msg = f"💾 **Conversazione salvata**\n\n"
msg += f"👤 Utente: @{username} (ID: {user_id})\n"
msg += f"💬 Messaggi: {len(history)}\n"
msg += f"📁 File: {filepath}\n\n"
msg += "La conversazione viene salvata automaticamente dopo ogni messaggio."
return msg
def _get_help_message(self) -> str:
"""Restituisce il messaggio di aiuto."""
return """🎛️ **TF5 Mixer Bot**
Controlla il mixer Yamaha TF5 con comandi in linguaggio naturale!
**Esempi di comandi:**
"Alza il canale 5 a -10 dB"
"Spegni i canali dal 1 al 5"
"Richiama la scena A10"
"Muta i canali 2, 4 e 6"
"Alza il mix 3 di 5 dB"
"Mostrami lo stato del canale 12"
"Cerca i monitor"
**Comandi speciali:**
/help - Mostra questo messaggio
/reset - Cancella la storia della conversazione
/status - Mostra stato del sistema
/export - Info sul file della conversazione
Scrivi semplicemente cosa vuoi fare e ci penso io! 🎵
💾 Ogni conversazione viene salvata automaticamente."""
def _get_status_message(self) -> str:
"""Restituisce lo stato del sistema."""
cache_age = time.time() - self.controller._cache.get("timestamp", 0)
cache_valid = self.controller._is_cache_valid()
status = "📊 **Status del Sistema**\n\n"
status += f"🎛️ Mixer: Connesso a {self.controller.host}:{self.controller.port}\n"
if cache_valid:
channels_count = len(self.controller._cache.get("channels", {}))
mixes_count = len(self.controller._cache.get("mixes", {}))
status += f"💾 Cache: Valida (età: {int(cache_age)}s)\n"
status += f"📊 Dati: {channels_count} canali, {mixes_count} mix\n"
else:
status += "💾 Cache: Non disponibile\n"
users_count = len(self.conversations)
total_messages = sum(len(hist) for hist in self.conversations.values())
status += f"👥 Utenti attivi: {users_count}\n"
status += f"💬 Messaggi totali: {total_messages}\n\n"
# Lista utenti con conversazioni salvate
all_users = self.conversation_manager.get_all_users()
if all_users:
status += "📁 **Conversazioni salvate:**\n"
for user in all_users[:5]: # Mostra solo i primi 5
status += f"• @{user['username']}: {user['message_count']} msg ({user['last_updated']})\n"
if len(all_users) > 5:
status += f"... e altri {len(all_users) - 5} utenti"
return status
def handle_update(self, update: Dict):
"""Gestisce un aggiornamento di Telegram."""
if "message" not in update:
return
message = update["message"]
# Ignora messaggi non testuali
if "text" not in message:
return
chat_id = message["chat"]["id"]
user_id = message["from"]["id"]
username = message["from"].get("username", "Unknown")
text = message["text"]
print(f"📨 Messaggio da @{username} (ID: {user_id}): {text}")
# Mostra "typing..."
self.bot.send_chat_action(chat_id, "typing")
# Elabora il messaggio
response = self.process_message(user_id, username, text)
# Invia la risposta
self.bot.send_message(chat_id, response, parse_mode='MarkdownV2')
print(f"✅ Risposta inviata a @{username}")
def run(self):
"""Avvia il bot con long polling."""
print("=" * 70)
print("🤖 TF5 Mixer Telegram Bot")
print("=" * 70)
# Verifica connessione
me = self.bot._make_request("getMe")
if me.get("ok"):
bot_info = me["result"]
print(f"\n✅ Bot avviato: @{bot_info['username']}")
print(f" Nome: {bot_info['first_name']}")
print(f" ID: {bot_info['id']}")
else:
print("❌ Errore nella verifica del bot")
return
# Mostra stato cache
cache_age = time.time() - self.controller._cache.get("timestamp", 0)
if self.controller._is_cache_valid():
channels_count = len(self.controller._cache.get("channels", {}))
mixes_count = len(self.controller._cache.get("mixes", {}))
print(f"\n💾 Cache disponibile (età: {int(cache_age)}s)")
print(f" 📊 {channels_count} canali, {mixes_count} mix")
else:
print("\n💾 Cache non disponibile, verrà creata al primo utilizzo")
# Mostra conversazioni salvate
saved_convs = self.conversation_manager.get_all_users()
if saved_convs:
print(f"\n📁 Conversazioni salvate: {len(saved_convs)}")
print(f" Directory: {self.conversation_manager.conversations_dir}")
print("\n🔄 In attesa di messaggi... (Ctrl+C per terminare)\n")
try:
while True:
try:
updates = self.bot.get_updates()
for update in updates:
try:
self.handle_update(update)
except Exception as e:
print(f"❌ Errore nell'elaborazione update: {e}")
time.sleep(0.1) # Piccola pausa per non sovraccaricare
except KeyboardInterrupt:
raise
except Exception as e:
print(f"❌ Errore nel polling: {e}")
time.sleep(5) # Attendi prima di riprovare
except KeyboardInterrupt:
print("\n\n👋 Bot terminato")
print(f"💾 Conversazioni salvate in: {self.conversation_manager.conversations_dir}")
def close(self):
"""Chiude le connessioni."""
self.controller.close()
def main():
"""Funzione principale."""
import argparse
parser = argparse.ArgumentParser(
description='TF5 Mixer Telegram Bot'
)
parser.add_argument('--mixer-host', default=DEFAULT_HOST,
help=f'IP del mixer (default: {DEFAULT_HOST})')
parser.add_argument('--mixer-port', type=int, default=DEFAULT_PORT,
help=f'Porta mixer (default: {DEFAULT_PORT})')
parser.add_argument('--conversations-dir', default='conversations',
help='Directory per salvare le conversazioni (default: conversations)')
args = parser.parse_args()
# Verifica variabili d'ambiente
telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not telegram_token:
print("❌ Errore: TELEGRAM_BOT_TOKEN non trovata")
print("\nPer impostare il token:")
print(" export TELEGRAM_BOT_TOKEN='il-tuo-token'")
print("\nOttieni un token da @BotFather su Telegram")
sys.exit(1)
gemini_key = os.getenv("GEMINI_API_KEY")
if not gemini_key:
print("❌ Errore: GEMINI_API_KEY non trovata")
print("\nPer impostare la chiave API:")
print(" export GEMINI_API_KEY='la-tua-chiave-api'")
print("\nOttieni una chiave su: https://aistudio.google.com/apikey")
sys.exit(1)
try:
bot = TF5TelegramBot(
telegram_token=telegram_token,
mixer_host=args.mixer_host,
mixer_port=args.mixer_port,
conversations_dir=args.conversations_dir
)
bot.run()
except Exception as e:
print(f"❌ Errore fatale: {e}")
sys.exit(1)
finally:
bot.close()
if __name__ == '__main__':
main()