Files
Automixbot/mixer_controller.py
2025-11-10 19:56:47 +01:00

564 lines
30 KiB
Python

# mixer_controller.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import sys
import os
import json
import time
import math
import threading
import queue
from pathlib import Path
from typing import List, Optional
from config import *
class TF5MixerController:
"""
Controllore per mixer Yamaha TF5 con connessione socket persistente e
gestione dei messaggi NOTIFY per l'aggiornamento della cache in tempo reale.
"""
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
self.host = host
self.port = port
self.socket = None
self._ensure_cache_dir()
self._cache_lock = threading.Lock()
self._cache = self._load_cache()
self._command_lock = threading.Lock()
self._response_queue = queue.Queue(maxsize=1)
self._reader_thread = None
self._is_running = threading.Event()
self._is_connected = threading.Event()
self.start()
def start(self):
if self._is_running.is_set():
print("Controller già in esecuzione.")
return
print("🚀 Avvio del Mixer Controller...")
self._is_running.set()
self._reader_thread = threading.Thread(target=self._connection_manager)
self._reader_thread.daemon = True
self._reader_thread.start()
if not self._is_connected.wait(timeout=5):
print("⚠️ Impossibile connettersi al mixer all'avvio.")
else:
print("✅ Controller avviato e connesso.")
def _connection_manager(self):
while self._is_running.is_set():
try:
print(f"🔌 Tentativo di connessione a {self.host}:{self.port}...")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(10)
self.socket.connect((self.host, self.port))
self.socket.settimeout(1)
self._is_connected.set()
print(f"🔗 Connesso a {self.host}:{self.port}")
with self._cache_lock:
is_cache_empty = not self._cache.get("channels")
if is_cache_empty:
print("Cache iniziale vuota. Avvio refresh completo...")
self.refresh_cache()
self._socket_reader()
except socket.error as e:
print(f"🔥 Errore di connessione: {e}. Riprovo tra 5 secondi...")
finally:
self._is_connected.clear()
if self.socket:
self.socket.close()
self.socket = None
if self._is_running.is_set():
time.sleep(5)
print("🛑 Gestore connessioni terminato.")
def _socket_reader(self):
buffer = ""
while self._is_running.is_set():
try:
data = self.socket.recv(4096)
if not data:
print("💔 Connessione chiusa dal mixer.")
break
buffer += data.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
if line.startswith("NOTIFY"):
self._handle_notify(line)
elif line.startswith("OK") or line.startswith("ERROR"):
try:
self._response_queue.put_nowait(line)
except queue.Full:
print(f"⚠️ Coda risposte piena, scartato: {line}")
else:
print(f"🤔 Messaggio non gestito: {line}")
except socket.timeout:
continue
except socket.error as e:
print(f"🔥 Errore di lettura dal socket: {e}")
break
def _handle_notify(self, message: str):
"""
Analizza un messaggio NOTIFY e aggiorna la cache in modo robusto.
Versione che gestisce il formato esatto 'NOTIFY set ...' con parametri extra.
"""
print(f"RECV NOTIFY: {message}")
parts = message.split()
if len(parts) < 2: return
path = None
path_index = -1
for i, part in enumerate(parts):
if part.startswith("MIXER:"):
path = part
path_index = i
break
if path is None:
print(f" -> ATTENZIONE: Nessun path valido (che inizia con 'MIXER:') trovato nel messaggio.")
return
print(f" -> Tentativo di acquisire il lock per l'aggiornamento (Path: {path})...")
with self._cache_lock:
print(f" -> LOCK ACQUISITO. Elaborazione...")
try:
updated = False
# --- LOGICA DI PARSING SPECIFICA E ROBUSTA ---
# Per 'set' commands, i parametri sono: ch_idx, 0, value, [optional_string]
# Indice ch: path_index + 1
# Indice valore: path_index + 3
# Gestione NOME Canale (usa un parsing diverso)
if "InCh/Label/Name" in path:
params = parts[path_index + 1:]
if len(params) >= 2:
ch_idx, name = int(params[0]), " ".join(params[1:]).strip('"')
ch_key = str(ch_idx + 1)
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
channel_data["name"] = name
print(f" -> SUCCESS: Ch {ch_key} nome -> '{name}'")
updated = True
# Gestione ON/OFF Canale
elif "InCh/Fader/On" in path:
if len(parts) > path_index + 3:
ch_idx = int(parts[path_index + 1])
state_val = parts[path_index + 3]
state = state_val == "1"
ch_key = str(ch_idx + 1)
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
channel_data["on"] = state
print(f" -> SUCCESS: Ch {ch_key} stato -> {'ON' if state else 'OFF'}")
updated = True
# Gestione LIVELLO Canale
elif "InCh/Fader/Level" in path:
if len(parts) > path_index + 3:
ch_idx = int(parts[path_index + 1])
level_int_str = parts[path_index + 3]
level_int = int(level_int_str)
ch_key = str(ch_idx + 1)
level_db = level_int / 100.0 if level_int > -32768 else float('-inf')
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
channel_data["level_db"] = level_db
print(f" -> SUCCESS: Ch {ch_key} level -> {level_db:.2f} dB")
updated = True
if updated:
self._cache['timestamp'] = time.time()
else:
print(f" -> ATTENZIONE: Nessuna condizione di aggiornamento trovata per il path '{path}'")
except (ValueError, IndexError) as e:
print(f" -> ERRORE CRITICO durante l'elaborazione: {e}")
finally:
print(f" -> LOCK RILASCIATO.")
def _send_command(self, command: str) -> str:
"""Invia un comando al mixer e attende la risposta."""
if not self._is_connected.wait(timeout=5):
return "ERROR connection - Mixer non connesso."
with self._command_lock:
try:
while not self._response_queue.empty():
self._response_queue.get_nowait()
print(f"SEND: {command}")
self.socket.sendall((command + '\n').encode('utf-8'))
response = self._response_queue.get(timeout=5)
print(f"RECV RESP: {response}")
return response
except queue.Empty:
return "ERROR timeout - Nessuna risposta dal mixer."
except socket.error as e:
self._is_connected.clear()
return f"ERROR connection - Errore socket: {e}"
def close(self):
"""Ferma il controller e chiude la connessione."""
if not self._is_running.is_set():
return
print("🛑 Chiusura del Mixer Controller...")
self._is_running.clear()
if self.socket:
try:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
except socket.error:
pass
if self._reader_thread and self._reader_thread.is_alive():
self._reader_thread.join(timeout=2)
print("✅ Controller fermato.")
self._save_cache()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# --- Metodi di gestione della cache e di utilità (invariati) ---
def _sanitize_value(self, value):
if value is None: return -120.0
if value == float('-inf') or (isinstance(value, float) and math.isinf(value) and value < 0): return -120.0
elif value == float('inf') or (isinstance(value, float) and math.isinf(value) and value > 0): return 10.0
elif isinstance(value, float) and math.isnan(value): return 0.0
return value
def _ensure_cache_dir(self):
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def _load_cache(self) -> dict:
with self._cache_lock:
if CACHE_FILE.exists():
try:
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Assicura che le chiavi principali esistano
if "channels" not in data: data["channels"] = {}
if "mixes" not in data: data["mixes"] = {}
return data
except Exception as e:
print(f"⚠️ Errore nel caricamento della cache: {e}")
return {"channels": {}, "mixes": {}, "timestamp": 0}
def _save_cache(self):
with self._cache_lock:
try:
cache_to_save = self._prepare_cache_for_json(self._cache)
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(cache_to_save, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"⚠️ Errore nel salvataggio della cache: {e}")
def _prepare_cache_for_json(self, obj):
if isinstance(obj, dict): return {k: self._prepare_cache_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list): return [self._prepare_cache_for_json(item) for item in obj]
elif isinstance(obj, float):
if obj == float('-inf'): return None
return obj
else: return obj
def _is_cache_valid(self) -> bool:
with self._cache_lock:
if not self._cache.get("channels"): return False
cache_age = time.time() - self._cache.get("timestamp", 0)
return cache_age < CACHE_DURATION
def _normalize_level_from_cache(self, level_value):
if level_value is None: return float('-inf')
return level_value
def _parse_name(self, response: str) -> str:
try:
first_line = response.split('\n')[0]
start = first_line.find('"') + 1
end = first_line.rfind('"')
if start > 0 and end > start: return first_line[start:end]
return "Sconosciuto"
except: return "Errore"
def _parse_value(self, response: str) -> str:
first_line = response.split('\n')[0]
parts = first_line.split()
if len(parts) > 0 and parts[0] == "OK": return parts[-1]
return "N/A"
# --- API Pubblica del Controller (invariata) ---
# ... tutti gli altri metodi da refresh_cache in poi rimangono identici ...
def refresh_cache(self) -> dict:
print("🔄 Aggiornamento cache completo in corso...")
channels_data = {}
mixes_data = {}
for ch in range(1, TF5_INPUT_CHANNELS + 1):
ch_idx = ch - 1
resp_name = self._send_command(f"get MIXER:Current/InCh/Label/Name {ch_idx} 0")
name = self._parse_name(resp_name)
resp_on = self._send_command(f"get MIXER:Current/InCh/Fader/On {ch_idx} 0")
is_on = self._parse_value(resp_on) == "1"
resp_level = self._send_command(f"get MIXER:Current/InCh/Fader/Level {ch_idx} 0")
level_raw = self._parse_value(resp_level)
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
except: level_db = None
resp_pan = self._send_command(f"get MIXER:Current/InCh/ToSt/Pan {ch_idx} 0")
pan_raw = self._parse_value(resp_pan)
try: pan_value = int(pan_raw)
except: pan_value = None
channels_data[str(ch)] = {"channel": ch, "name": name, "on": is_on, "level_db": level_db, "pan": pan_value}
time.sleep(0.01) # Riduci il delay, la connessione persistente è veloce
for mix in range(1, TF5_MIX_BUSSES + 1):
mix_idx = mix - 1
resp_name = self._send_command(f"get MIXER:Current/Mix/Label/Name {mix_idx} 0")
name = self._parse_name(resp_name)
resp_on = self._send_command(f"get MIXER:Current/Mix/Fader/On {mix_idx} 0")
is_on = self._parse_value(resp_on) == "1"
resp_level = self._send_command(f"get MIXER:Current/Mix/Fader/Level {mix_idx} 0")
level_raw = self._parse_value(resp_level)
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
except: level_db = None
mixes_data[str(mix)] = {"mix": mix, "name": name, "on": is_on, "level_db": level_db}
time.sleep(0.01)
with self._cache_lock:
self._cache = {"channels": channels_data, "mixes": mixes_data, "timestamp": time.time()}
self._save_cache()
msg = f"Cache aggiornata con {len(channels_data)} canali e {len(mixes_data)} mix"
print(f"{msg}")
return {"status": "success", "message": msg, "channels_count": len(channels_data), "mixes_count": len(mixes_data)}
# ... TUTTI GLI ALTRI METODI PUBBLICI RESTANO UGUALI ...
def recall_scene(self, bank: str, scene_number: int) -> dict:
if bank.lower() not in ['a', 'b']: return {"status": "error", "message": "Il banco deve essere 'a' o 'b'"}
if not 0 <= scene_number <= 99: return {"status": "error", "message": "Il numero scena deve essere tra 0 e 99"}
command = f"ssrecall_ex scene_{bank.lower()} {scene_number}"
response = self._send_command(command)
if "OK" in response:
print("Scene richiamata. La cache si aggiornerà tramite NOTIFY. Schedulato un refresh completo in 5s per sicurezza.")
threading.Timer(5.0, self.refresh_cache).start()
return {"status": "success" if "OK" in response else "error", "message": f"Scena {bank.upper()}{scene_number} richiamata.", "response": response}
def set_channel_level(self, channel: int, level_db: float) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
command = f"set MIXER:Current/InCh/Fader/Level {channel-1} 0 {internal_value}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} impostato a {level_db:+.1f} dB", "response": response}
def set_channel_on_off(self, channel: int, state: bool) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
command = f"set MIXER:Current/InCh/Fader/On {channel-1} 0 {1 if state else 0}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} {'acceso' if state else 'spento'}", "response": response}
def set_channel_pan(self, channel: int, pan_value: int) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
if not -63 <= pan_value <= 63: return {"status": "error", "message": "Il pan deve essere tra -63 e +63"}
command = f"set MIXER:Current/InCh/ToSt/Pan {channel-1} 0 {pan_value}"
response = self._send_command(command)
pan_desc = "centro"
if pan_value < 0: pan_desc = f"sinistra {abs(pan_value)}"
elif pan_value > 0: pan_desc = f"destra {pan_value}"
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} pan impostato a {pan_desc}", "response": response}
def set_mix_level(self, mix_number: int, level_db: float) -> dict:
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
command = f"set MIXER:Current/Mix/Fader/Level {mix_number-1} 0 {internal_value}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Mix {mix_number} impostato a {level_db:+.1f} dB", "response": response}
def set_mix_on_off(self, mix_number: int, state: bool) -> dict:
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
command = f"set MIXER:Current/Mix/Fader/On {mix_number-1} 0 {1 if state else 0}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Mix {mix_number} {'acceso' if state else 'spento'}", "response": response}
def mute_multiple_channels(self, channels: List[int]) -> dict:
results = [self.set_channel_on_off(ch, False) for ch in channels]
success_count = sum(1 for r in results if r["status"] == "success")
return {"status": "success" if success_count == len(channels) else "partial", "message": f"Mutati {success_count}/{len(channels)} canali: {channels}", "details": results}
def unmute_multiple_channels(self, channels: List[int]) -> dict:
results = [self.set_channel_on_off(ch, True) for ch in channels]
success_count = sum(1 for r in results if r["status"] == "success")
return {"status": "success" if success_count == len(channels) else "partial", "message": f"Riattivati {success_count}/{len(channels)} canali: {channels}", "details": results}
def get_channel_info(self, channel: int, force_refresh: bool = False) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
if not force_refresh and self._is_cache_valid():
with self._cache_lock:
cached_data = self._cache.get("channels", {}).get(str(channel))
if cached_data:
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(cached_data.get("level_db")))
return {"status": "success", "source": "cache", "channel": cached_data["channel"], "name": cached_data["name"], "on": cached_data["on"], "level_db": sanitized_level_db, "pan": cached_data.get("pan")}
ch_idx = channel - 1
resp_name = self._send_command(f"get MIXER:Current/InCh/Label/Name {ch_idx} 0")
name = self._parse_name(resp_name)
resp_on = self._send_command(f"get MIXER:Current/InCh/Fader/On {ch_idx} 0")
is_on = self._parse_value(resp_on) == "1"
resp_level = self._send_command(f"get MIXER:Current/InCh/Fader/Level {ch_idx} 0")
level_raw = self._parse_value(resp_level)
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
except: level_db = None
resp_pan = self._send_command(f"get MIXER:Current/InCh/ToSt/Pan {ch_idx} 0")
pan_raw = self._parse_value(resp_pan)
try: pan_value = int(pan_raw)
except: pan_value = None
with self._cache_lock:
if "channels" not in self._cache: self._cache["channels"] = {}
self._cache["channels"][str(channel)] = {"channel": channel, "name": name, "on": is_on, "level_db": level_db, "pan": pan_value}
self._cache['timestamp'] = time.time()
return {"status": "success", "source": "mixer", "channel": channel, "name": name, "on": is_on, "level_db": self._sanitize_value(level_db), "pan": pan_value}
def get_mix_info(self, mix_number: int, force_refresh: bool = False) -> dict:
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
if not force_refresh and self._is_cache_valid():
with self._cache_lock:
cached_data = self._cache.get("mixes", {}).get(str(mix_number))
if cached_data:
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(cached_data.get("level_db")))
return {"status": "success", "source": "cache", "mix": cached_data["mix"], "name": cached_data["name"], "on": cached_data["on"], "level_db": sanitized_level_db}
mix_idx = mix_number - 1
resp_name = self._send_command(f"get MIXER:Current/Mix/Label/Name {mix_idx} 0")
name = self._parse_name(resp_name)
resp_on = self._send_command(f"get MIXER:Current/Mix/Fader/On {mix_idx} 0")
is_on = self._parse_value(resp_on) == "1"
resp_level = self._send_command(f"get MIXER:Current/Mix/Fader/Level {mix_idx} 0")
level_raw = self._parse_value(resp_level)
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
except: level_db = None
with self._cache_lock:
if "mixes" not in self._cache: self._cache["mixes"] = {}
self._cache["mixes"][str(mix_number)] = {"mix": mix_number, "name": name, "on": is_on, "level_db": level_db}
self._cache['timestamp'] = time.time()
return {"status": "success", "source": "mixer", "mix": mix_number, "name": name, "on": is_on, "level_db": self._sanitize_value(level_db)}
def search_channels_by_name(self, search_term: str) -> dict:
if not self._is_cache_valid(): self.refresh_cache()
search_lower = search_term.lower()
found_channels = []
with self._cache_lock:
channels_copy = list(self._cache.get("channels", {}).values())
for info in channels_copy:
if search_lower in info.get("name", "").lower():
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
found_channels.append({"channel": info["channel"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
found_channels.sort(key=lambda x: x["channel"])
return {"status": "success", "search_term": search_term, "found_count": len(found_channels), "channels": found_channels, "message": f"Trovati {len(found_channels)} canali contenenti '{search_term}'"}
def search_mixes_by_name(self, search_term: str) -> dict:
if not self._is_cache_valid(): self.refresh_cache()
search_lower = search_term.lower()
found_mixes = []
with self._cache_lock:
mixes_copy = list(self._cache.get("mixes", {}).values())
for info in mixes_copy:
if search_lower in info.get("name", "").lower():
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
found_mixes.append({"mix": info["mix"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
found_mixes.sort(key=lambda x: x["mix"])
return {"status": "success", "search_term": search_term, "found_count": len(found_mixes), "mixes": found_mixes, "message": f"Trovati {len(found_mixes)} mix contenenti '{search_term}'"}
def get_all_channels_summary(self) -> dict:
if not self._is_cache_valid(): self.refresh_cache()
channels = []
with self._cache_lock:
channels_copy = list(self._cache.get("channels", {}).values())
cache_age = time.time() - self._cache.get("timestamp", 0)
for info in channels_copy:
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
channels.append({"channel": info["channel"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
channels.sort(key=lambda x: x["channel"])
return {"status": "success", "total_channels": len(channels), "channels": channels, "cache_age_seconds": int(cache_age), "message": f"Riepilogo di {len(channels)} canali (cache: {int(cache_age)}s fa)"}
def get_all_mixes_summary(self) -> dict:
if not self._is_cache_valid(): self.refresh_cache()
mixes = []
with self._cache_lock:
mixes_copy = list(self._cache.get("mixes", {}).values())
cache_age = time.time() - self._cache.get("timestamp", 0)
for info in mixes_copy:
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
mixes.append({"mix": info["mix"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
mixes.sort(key=lambda x: x["mix"])
return {"status": "success", "total_mixes": len(mixes), "mixes": mixes, "cache_age_seconds": int(cache_age), "message": f"Riepilogo di {len(mixes)} mix (cache: {int(cache_age)}s fa)"}
def set_channel_to_mix_level(self, channel: int, mix_number: int, level_db: float) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
command = f"set MIXER:Current/InCh/ToMix/Level {channel-1} {mix_number-1} {internal_value}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} → Mix {mix_number} impostato a {level_db:+.1f} dB", "response": response}
def set_channel_to_mix_on_off(self, channel: int, mix_number: int, state: bool) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
command = f"set MIXER:Current/InCh/ToMix/On {channel-1} {mix_number-1} {1 if state else 0}"
response = self._send_command(command)
return {"status": "success" if "OK" in response else "error", "message": f"Invio canale {channel} → Mix {mix_number} {'acceso' if state else 'spento'}", "response": response}
def get_channel_to_mix_info(self, channel: int, mix_number: int) -> dict:
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
ch_idx, mix_idx = channel - 1, mix_number - 1
resp_level = self._send_command(f"get MIXER:Current/InCh/ToMix/Level {ch_idx} {mix_idx}")
level_raw = self._parse_value(resp_level)
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
except: level_db = None
resp_on = self._send_command(f"get MIXER:Current/InCh/ToMix/On {ch_idx} {mix_idx}")
is_on = self._parse_value(resp_on) == "1"
sanitized_level_db = self._sanitize_value(level_db)
return {"status": "success", "channel": channel, "mix": mix_number, "on": is_on, "send_level_db": sanitized_level_db, "message": f"Canale {channel} → Mix {mix_number}: {sanitized_level_db:+.1f} dB ({'ON' if is_on else 'OFF'})"}
def get_full_mix_details(self, mix_number: int) -> dict:
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
if not self._is_cache_valid(): self.refresh_cache()
mix_info = self.get_mix_info(mix_number)
if mix_info["status"] != "success": return mix_info
sends = []
for channel in range(1, TF5_INPUT_CHANNELS + 1):
send_info = self.get_channel_to_mix_info(channel, mix_number)
if send_info["status"] == "success" and send_info["send_level_db"] > -120.0:
with self._cache_lock:
channel_cache = self._cache.get("channels", {}).get(str(channel), {})
sends.append({"channel": channel, "channel_name": channel_cache.get("name", "Sconosciuto"), "channel_is_on": channel_cache.get("on", False), "send_level_db": send_info["send_level_db"], "send_is_on": send_info["on"]})
sends.sort(key=lambda x: x["send_level_db"], reverse=True)
return {"status": "success", "mix_details": mix_info, "active_sends": sends, "message": f"Dettagli completi per il Mix {mix_number} recuperati."}