564 lines
30 KiB
Python
564 lines
30 KiB
Python
# mixer_controller.py
|
|
|
|
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
import socket
|
|
import sys
|
|
import os
|
|
import json
|
|
import time
|
|
import math
|
|
import threading
|
|
import queue
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
from config import *
|
|
|
|
class TF5MixerController:
|
|
"""
|
|
Controllore per mixer Yamaha TF5 con connessione socket persistente e
|
|
gestione dei messaggi NOTIFY per l'aggiornamento della cache in tempo reale.
|
|
"""
|
|
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT):
|
|
self.host = host
|
|
self.port = port
|
|
self.socket = None
|
|
self._ensure_cache_dir()
|
|
|
|
self._cache_lock = threading.Lock()
|
|
self._cache = self._load_cache()
|
|
|
|
self._command_lock = threading.Lock()
|
|
self._response_queue = queue.Queue(maxsize=1)
|
|
self._reader_thread = None
|
|
self._is_running = threading.Event()
|
|
self._is_connected = threading.Event()
|
|
|
|
self.start()
|
|
|
|
def start(self):
|
|
if self._is_running.is_set():
|
|
print("Controller già in esecuzione.")
|
|
return
|
|
|
|
print("🚀 Avvio del Mixer Controller...")
|
|
self._is_running.set()
|
|
self._reader_thread = threading.Thread(target=self._connection_manager)
|
|
self._reader_thread.daemon = True
|
|
self._reader_thread.start()
|
|
|
|
if not self._is_connected.wait(timeout=5):
|
|
print("⚠️ Impossibile connettersi al mixer all'avvio.")
|
|
else:
|
|
print("✅ Controller avviato e connesso.")
|
|
|
|
def _connection_manager(self):
|
|
while self._is_running.is_set():
|
|
try:
|
|
print(f"🔌 Tentativo di connessione a {self.host}:{self.port}...")
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.settimeout(10)
|
|
self.socket.connect((self.host, self.port))
|
|
self.socket.settimeout(1)
|
|
self._is_connected.set()
|
|
print(f"🔗 Connesso a {self.host}:{self.port}")
|
|
|
|
with self._cache_lock:
|
|
is_cache_empty = not self._cache.get("channels")
|
|
|
|
if is_cache_empty:
|
|
print("Cache iniziale vuota. Avvio refresh completo...")
|
|
self.refresh_cache()
|
|
|
|
self._socket_reader()
|
|
except socket.error as e:
|
|
print(f"🔥 Errore di connessione: {e}. Riprovo tra 5 secondi...")
|
|
finally:
|
|
self._is_connected.clear()
|
|
if self.socket:
|
|
self.socket.close()
|
|
self.socket = None
|
|
if self._is_running.is_set():
|
|
time.sleep(5)
|
|
print("🛑 Gestore connessioni terminato.")
|
|
|
|
def _socket_reader(self):
|
|
buffer = ""
|
|
while self._is_running.is_set():
|
|
try:
|
|
data = self.socket.recv(4096)
|
|
if not data:
|
|
print("💔 Connessione chiusa dal mixer.")
|
|
break
|
|
|
|
buffer += data.decode('utf-8', errors='ignore')
|
|
while '\n' in buffer:
|
|
line, buffer = buffer.split('\n', 1)
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
if line.startswith("NOTIFY"):
|
|
self._handle_notify(line)
|
|
elif line.startswith("OK") or line.startswith("ERROR"):
|
|
try:
|
|
self._response_queue.put_nowait(line)
|
|
except queue.Full:
|
|
print(f"⚠️ Coda risposte piena, scartato: {line}")
|
|
else:
|
|
print(f"🤔 Messaggio non gestito: {line}")
|
|
|
|
except socket.timeout:
|
|
continue
|
|
except socket.error as e:
|
|
print(f"🔥 Errore di lettura dal socket: {e}")
|
|
break
|
|
|
|
def _handle_notify(self, message: str):
|
|
"""
|
|
Analizza un messaggio NOTIFY e aggiorna la cache in modo robusto.
|
|
Versione che gestisce il formato esatto 'NOTIFY set ...' con parametri extra.
|
|
"""
|
|
print(f"RECV NOTIFY: {message}")
|
|
|
|
parts = message.split()
|
|
if len(parts) < 2: return
|
|
|
|
path = None
|
|
path_index = -1
|
|
for i, part in enumerate(parts):
|
|
if part.startswith("MIXER:"):
|
|
path = part
|
|
path_index = i
|
|
break
|
|
|
|
if path is None:
|
|
print(f" -> ATTENZIONE: Nessun path valido (che inizia con 'MIXER:') trovato nel messaggio.")
|
|
return
|
|
|
|
print(f" -> Tentativo di acquisire il lock per l'aggiornamento (Path: {path})...")
|
|
with self._cache_lock:
|
|
print(f" -> LOCK ACQUISITO. Elaborazione...")
|
|
try:
|
|
updated = False
|
|
|
|
# --- LOGICA DI PARSING SPECIFICA E ROBUSTA ---
|
|
# Per 'set' commands, i parametri sono: ch_idx, 0, value, [optional_string]
|
|
# Indice ch: path_index + 1
|
|
# Indice valore: path_index + 3
|
|
|
|
# Gestione NOME Canale (usa un parsing diverso)
|
|
if "InCh/Label/Name" in path:
|
|
params = parts[path_index + 1:]
|
|
if len(params) >= 2:
|
|
ch_idx, name = int(params[0]), " ".join(params[1:]).strip('"')
|
|
ch_key = str(ch_idx + 1)
|
|
|
|
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
|
|
channel_data["name"] = name
|
|
print(f" -> SUCCESS: Ch {ch_key} nome -> '{name}'")
|
|
updated = True
|
|
|
|
# Gestione ON/OFF Canale
|
|
elif "InCh/Fader/On" in path:
|
|
if len(parts) > path_index + 3:
|
|
ch_idx = int(parts[path_index + 1])
|
|
state_val = parts[path_index + 3]
|
|
state = state_val == "1"
|
|
ch_key = str(ch_idx + 1)
|
|
|
|
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
|
|
channel_data["on"] = state
|
|
print(f" -> SUCCESS: Ch {ch_key} stato -> {'ON' if state else 'OFF'}")
|
|
updated = True
|
|
|
|
# Gestione LIVELLO Canale
|
|
elif "InCh/Fader/Level" in path:
|
|
if len(parts) > path_index + 3:
|
|
ch_idx = int(parts[path_index + 1])
|
|
level_int_str = parts[path_index + 3]
|
|
level_int = int(level_int_str)
|
|
ch_key = str(ch_idx + 1)
|
|
level_db = level_int / 100.0 if level_int > -32768 else float('-inf')
|
|
|
|
channel_data = self._cache.setdefault("channels", {}).setdefault(ch_key, {"channel": int(ch_key)})
|
|
channel_data["level_db"] = level_db
|
|
print(f" -> SUCCESS: Ch {ch_key} level -> {level_db:.2f} dB")
|
|
updated = True
|
|
|
|
if updated:
|
|
self._cache['timestamp'] = time.time()
|
|
else:
|
|
print(f" -> ATTENZIONE: Nessuna condizione di aggiornamento trovata per il path '{path}'")
|
|
|
|
except (ValueError, IndexError) as e:
|
|
print(f" -> ERRORE CRITICO durante l'elaborazione: {e}")
|
|
finally:
|
|
print(f" -> LOCK RILASCIATO.")
|
|
|
|
def _send_command(self, command: str) -> str:
|
|
"""Invia un comando al mixer e attende la risposta."""
|
|
if not self._is_connected.wait(timeout=5):
|
|
return "ERROR connection - Mixer non connesso."
|
|
|
|
with self._command_lock:
|
|
try:
|
|
while not self._response_queue.empty():
|
|
self._response_queue.get_nowait()
|
|
|
|
print(f"SEND: {command}")
|
|
self.socket.sendall((command + '\n').encode('utf-8'))
|
|
|
|
response = self._response_queue.get(timeout=5)
|
|
print(f"RECV RESP: {response}")
|
|
return response
|
|
|
|
except queue.Empty:
|
|
return "ERROR timeout - Nessuna risposta dal mixer."
|
|
except socket.error as e:
|
|
self._is_connected.clear()
|
|
return f"ERROR connection - Errore socket: {e}"
|
|
|
|
def close(self):
|
|
"""Ferma il controller e chiude la connessione."""
|
|
if not self._is_running.is_set():
|
|
return
|
|
print("🛑 Chiusura del Mixer Controller...")
|
|
self._is_running.clear()
|
|
if self.socket:
|
|
try:
|
|
self.socket.shutdown(socket.SHUT_RDWR)
|
|
self.socket.close()
|
|
except socket.error:
|
|
pass
|
|
if self._reader_thread and self._reader_thread.is_alive():
|
|
self._reader_thread.join(timeout=2)
|
|
print("✅ Controller fermato.")
|
|
self._save_cache()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
# --- Metodi di gestione della cache e di utilità (invariati) ---
|
|
def _sanitize_value(self, value):
|
|
if value is None: return -120.0
|
|
if value == float('-inf') or (isinstance(value, float) and math.isinf(value) and value < 0): return -120.0
|
|
elif value == float('inf') or (isinstance(value, float) and math.isinf(value) and value > 0): return 10.0
|
|
elif isinstance(value, float) and math.isnan(value): return 0.0
|
|
return value
|
|
|
|
def _ensure_cache_dir(self):
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _load_cache(self) -> dict:
|
|
with self._cache_lock:
|
|
if CACHE_FILE.exists():
|
|
try:
|
|
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
# Assicura che le chiavi principali esistano
|
|
if "channels" not in data: data["channels"] = {}
|
|
if "mixes" not in data: data["mixes"] = {}
|
|
return data
|
|
except Exception as e:
|
|
print(f"⚠️ Errore nel caricamento della cache: {e}")
|
|
return {"channels": {}, "mixes": {}, "timestamp": 0}
|
|
|
|
def _save_cache(self):
|
|
with self._cache_lock:
|
|
try:
|
|
cache_to_save = self._prepare_cache_for_json(self._cache)
|
|
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
|
|
json.dump(cache_to_save, f, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
print(f"⚠️ Errore nel salvataggio della cache: {e}")
|
|
|
|
def _prepare_cache_for_json(self, obj):
|
|
if isinstance(obj, dict): return {k: self._prepare_cache_for_json(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list): return [self._prepare_cache_for_json(item) for item in obj]
|
|
elif isinstance(obj, float):
|
|
if obj == float('-inf'): return None
|
|
return obj
|
|
else: return obj
|
|
|
|
def _is_cache_valid(self) -> bool:
|
|
with self._cache_lock:
|
|
if not self._cache.get("channels"): return False
|
|
cache_age = time.time() - self._cache.get("timestamp", 0)
|
|
return cache_age < CACHE_DURATION
|
|
|
|
def _normalize_level_from_cache(self, level_value):
|
|
if level_value is None: return float('-inf')
|
|
return level_value
|
|
|
|
def _parse_name(self, response: str) -> str:
|
|
try:
|
|
first_line = response.split('\n')[0]
|
|
start = first_line.find('"') + 1
|
|
end = first_line.rfind('"')
|
|
if start > 0 and end > start: return first_line[start:end]
|
|
return "Sconosciuto"
|
|
except: return "Errore"
|
|
|
|
def _parse_value(self, response: str) -> str:
|
|
first_line = response.split('\n')[0]
|
|
parts = first_line.split()
|
|
if len(parts) > 0 and parts[0] == "OK": return parts[-1]
|
|
return "N/A"
|
|
|
|
# --- API Pubblica del Controller (invariata) ---
|
|
# ... tutti gli altri metodi da refresh_cache in poi rimangono identici ...
|
|
def refresh_cache(self) -> dict:
|
|
print("🔄 Aggiornamento cache completo in corso...")
|
|
channels_data = {}
|
|
mixes_data = {}
|
|
|
|
for ch in range(1, TF5_INPUT_CHANNELS + 1):
|
|
ch_idx = ch - 1
|
|
resp_name = self._send_command(f"get MIXER:Current/InCh/Label/Name {ch_idx} 0")
|
|
name = self._parse_name(resp_name)
|
|
resp_on = self._send_command(f"get MIXER:Current/InCh/Fader/On {ch_idx} 0")
|
|
is_on = self._parse_value(resp_on) == "1"
|
|
resp_level = self._send_command(f"get MIXER:Current/InCh/Fader/Level {ch_idx} 0")
|
|
level_raw = self._parse_value(resp_level)
|
|
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
|
|
except: level_db = None
|
|
resp_pan = self._send_command(f"get MIXER:Current/InCh/ToSt/Pan {ch_idx} 0")
|
|
pan_raw = self._parse_value(resp_pan)
|
|
try: pan_value = int(pan_raw)
|
|
except: pan_value = None
|
|
channels_data[str(ch)] = {"channel": ch, "name": name, "on": is_on, "level_db": level_db, "pan": pan_value}
|
|
time.sleep(0.01) # Riduci il delay, la connessione persistente è veloce
|
|
|
|
for mix in range(1, TF5_MIX_BUSSES + 1):
|
|
mix_idx = mix - 1
|
|
resp_name = self._send_command(f"get MIXER:Current/Mix/Label/Name {mix_idx} 0")
|
|
name = self._parse_name(resp_name)
|
|
resp_on = self._send_command(f"get MIXER:Current/Mix/Fader/On {mix_idx} 0")
|
|
is_on = self._parse_value(resp_on) == "1"
|
|
resp_level = self._send_command(f"get MIXER:Current/Mix/Fader/Level {mix_idx} 0")
|
|
level_raw = self._parse_value(resp_level)
|
|
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
|
|
except: level_db = None
|
|
mixes_data[str(mix)] = {"mix": mix, "name": name, "on": is_on, "level_db": level_db}
|
|
time.sleep(0.01)
|
|
|
|
with self._cache_lock:
|
|
self._cache = {"channels": channels_data, "mixes": mixes_data, "timestamp": time.time()}
|
|
self._save_cache()
|
|
|
|
msg = f"Cache aggiornata con {len(channels_data)} canali e {len(mixes_data)} mix"
|
|
print(f"✅ {msg}")
|
|
return {"status": "success", "message": msg, "channels_count": len(channels_data), "mixes_count": len(mixes_data)}
|
|
|
|
# ... TUTTI GLI ALTRI METODI PUBBLICI RESTANO UGUALI ...
|
|
def recall_scene(self, bank: str, scene_number: int) -> dict:
|
|
if bank.lower() not in ['a', 'b']: return {"status": "error", "message": "Il banco deve essere 'a' o 'b'"}
|
|
if not 0 <= scene_number <= 99: return {"status": "error", "message": "Il numero scena deve essere tra 0 e 99"}
|
|
|
|
command = f"ssrecall_ex scene_{bank.lower()} {scene_number}"
|
|
response = self._send_command(command)
|
|
|
|
if "OK" in response:
|
|
print("Scene richiamata. La cache si aggiornerà tramite NOTIFY. Schedulato un refresh completo in 5s per sicurezza.")
|
|
threading.Timer(5.0, self.refresh_cache).start()
|
|
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Scena {bank.upper()}{scene_number} richiamata.", "response": response}
|
|
|
|
def set_channel_level(self, channel: int, level_db: float) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
|
|
command = f"set MIXER:Current/InCh/Fader/Level {channel-1} 0 {internal_value}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} impostato a {level_db:+.1f} dB", "response": response}
|
|
|
|
def set_channel_on_off(self, channel: int, state: bool) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
command = f"set MIXER:Current/InCh/Fader/On {channel-1} 0 {1 if state else 0}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} {'acceso' if state else 'spento'}", "response": response}
|
|
|
|
def set_channel_pan(self, channel: int, pan_value: int) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
if not -63 <= pan_value <= 63: return {"status": "error", "message": "Il pan deve essere tra -63 e +63"}
|
|
command = f"set MIXER:Current/InCh/ToSt/Pan {channel-1} 0 {pan_value}"
|
|
response = self._send_command(command)
|
|
pan_desc = "centro"
|
|
if pan_value < 0: pan_desc = f"sinistra {abs(pan_value)}"
|
|
elif pan_value > 0: pan_desc = f"destra {pan_value}"
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} pan impostato a {pan_desc}", "response": response}
|
|
|
|
def set_mix_level(self, mix_number: int, level_db: float) -> dict:
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
|
|
command = f"set MIXER:Current/Mix/Fader/Level {mix_number-1} 0 {internal_value}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Mix {mix_number} impostato a {level_db:+.1f} dB", "response": response}
|
|
|
|
def set_mix_on_off(self, mix_number: int, state: bool) -> dict:
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
command = f"set MIXER:Current/Mix/Fader/On {mix_number-1} 0 {1 if state else 0}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Mix {mix_number} {'acceso' if state else 'spento'}", "response": response}
|
|
|
|
def mute_multiple_channels(self, channels: List[int]) -> dict:
|
|
results = [self.set_channel_on_off(ch, False) for ch in channels]
|
|
success_count = sum(1 for r in results if r["status"] == "success")
|
|
return {"status": "success" if success_count == len(channels) else "partial", "message": f"Mutati {success_count}/{len(channels)} canali: {channels}", "details": results}
|
|
|
|
def unmute_multiple_channels(self, channels: List[int]) -> dict:
|
|
results = [self.set_channel_on_off(ch, True) for ch in channels]
|
|
success_count = sum(1 for r in results if r["status"] == "success")
|
|
return {"status": "success" if success_count == len(channels) else "partial", "message": f"Riattivati {success_count}/{len(channels)} canali: {channels}", "details": results}
|
|
|
|
def get_channel_info(self, channel: int, force_refresh: bool = False) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
if not force_refresh and self._is_cache_valid():
|
|
with self._cache_lock:
|
|
cached_data = self._cache.get("channels", {}).get(str(channel))
|
|
if cached_data:
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(cached_data.get("level_db")))
|
|
return {"status": "success", "source": "cache", "channel": cached_data["channel"], "name": cached_data["name"], "on": cached_data["on"], "level_db": sanitized_level_db, "pan": cached_data.get("pan")}
|
|
|
|
ch_idx = channel - 1
|
|
resp_name = self._send_command(f"get MIXER:Current/InCh/Label/Name {ch_idx} 0")
|
|
name = self._parse_name(resp_name)
|
|
resp_on = self._send_command(f"get MIXER:Current/InCh/Fader/On {ch_idx} 0")
|
|
is_on = self._parse_value(resp_on) == "1"
|
|
resp_level = self._send_command(f"get MIXER:Current/InCh/Fader/Level {ch_idx} 0")
|
|
level_raw = self._parse_value(resp_level)
|
|
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
|
|
except: level_db = None
|
|
resp_pan = self._send_command(f"get MIXER:Current/InCh/ToSt/Pan {ch_idx} 0")
|
|
pan_raw = self._parse_value(resp_pan)
|
|
try: pan_value = int(pan_raw)
|
|
except: pan_value = None
|
|
|
|
with self._cache_lock:
|
|
if "channels" not in self._cache: self._cache["channels"] = {}
|
|
self._cache["channels"][str(channel)] = {"channel": channel, "name": name, "on": is_on, "level_db": level_db, "pan": pan_value}
|
|
self._cache['timestamp'] = time.time()
|
|
|
|
return {"status": "success", "source": "mixer", "channel": channel, "name": name, "on": is_on, "level_db": self._sanitize_value(level_db), "pan": pan_value}
|
|
|
|
def get_mix_info(self, mix_number: int, force_refresh: bool = False) -> dict:
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
if not force_refresh and self._is_cache_valid():
|
|
with self._cache_lock:
|
|
cached_data = self._cache.get("mixes", {}).get(str(mix_number))
|
|
if cached_data:
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(cached_data.get("level_db")))
|
|
return {"status": "success", "source": "cache", "mix": cached_data["mix"], "name": cached_data["name"], "on": cached_data["on"], "level_db": sanitized_level_db}
|
|
|
|
mix_idx = mix_number - 1
|
|
resp_name = self._send_command(f"get MIXER:Current/Mix/Label/Name {mix_idx} 0")
|
|
name = self._parse_name(resp_name)
|
|
resp_on = self._send_command(f"get MIXER:Current/Mix/Fader/On {mix_idx} 0")
|
|
is_on = self._parse_value(resp_on) == "1"
|
|
resp_level = self._send_command(f"get MIXER:Current/Mix/Fader/Level {mix_idx} 0")
|
|
level_raw = self._parse_value(resp_level)
|
|
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
|
|
except: level_db = None
|
|
|
|
with self._cache_lock:
|
|
if "mixes" not in self._cache: self._cache["mixes"] = {}
|
|
self._cache["mixes"][str(mix_number)] = {"mix": mix_number, "name": name, "on": is_on, "level_db": level_db}
|
|
self._cache['timestamp'] = time.time()
|
|
|
|
return {"status": "success", "source": "mixer", "mix": mix_number, "name": name, "on": is_on, "level_db": self._sanitize_value(level_db)}
|
|
|
|
def search_channels_by_name(self, search_term: str) -> dict:
|
|
if not self._is_cache_valid(): self.refresh_cache()
|
|
search_lower = search_term.lower()
|
|
found_channels = []
|
|
with self._cache_lock:
|
|
channels_copy = list(self._cache.get("channels", {}).values())
|
|
for info in channels_copy:
|
|
if search_lower in info.get("name", "").lower():
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
|
|
found_channels.append({"channel": info["channel"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
|
|
found_channels.sort(key=lambda x: x["channel"])
|
|
return {"status": "success", "search_term": search_term, "found_count": len(found_channels), "channels": found_channels, "message": f"Trovati {len(found_channels)} canali contenenti '{search_term}'"}
|
|
|
|
def search_mixes_by_name(self, search_term: str) -> dict:
|
|
if not self._is_cache_valid(): self.refresh_cache()
|
|
search_lower = search_term.lower()
|
|
found_mixes = []
|
|
with self._cache_lock:
|
|
mixes_copy = list(self._cache.get("mixes", {}).values())
|
|
for info in mixes_copy:
|
|
if search_lower in info.get("name", "").lower():
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
|
|
found_mixes.append({"mix": info["mix"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
|
|
found_mixes.sort(key=lambda x: x["mix"])
|
|
return {"status": "success", "search_term": search_term, "found_count": len(found_mixes), "mixes": found_mixes, "message": f"Trovati {len(found_mixes)} mix contenenti '{search_term}'"}
|
|
|
|
def get_all_channels_summary(self) -> dict:
|
|
if not self._is_cache_valid(): self.refresh_cache()
|
|
channels = []
|
|
with self._cache_lock:
|
|
channels_copy = list(self._cache.get("channels", {}).values())
|
|
cache_age = time.time() - self._cache.get("timestamp", 0)
|
|
for info in channels_copy:
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
|
|
channels.append({"channel": info["channel"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
|
|
channels.sort(key=lambda x: x["channel"])
|
|
return {"status": "success", "total_channels": len(channels), "channels": channels, "cache_age_seconds": int(cache_age), "message": f"Riepilogo di {len(channels)} canali (cache: {int(cache_age)}s fa)"}
|
|
|
|
def get_all_mixes_summary(self) -> dict:
|
|
if not self._is_cache_valid(): self.refresh_cache()
|
|
mixes = []
|
|
with self._cache_lock:
|
|
mixes_copy = list(self._cache.get("mixes", {}).values())
|
|
cache_age = time.time() - self._cache.get("timestamp", 0)
|
|
for info in mixes_copy:
|
|
sanitized_level_db = self._sanitize_value(self._normalize_level_from_cache(info.get("level_db")))
|
|
mixes.append({"mix": info["mix"], "name": info["name"], "on": info["on"], "level_db": sanitized_level_db})
|
|
mixes.sort(key=lambda x: x["mix"])
|
|
return {"status": "success", "total_mixes": len(mixes), "mixes": mixes, "cache_age_seconds": int(cache_age), "message": f"Riepilogo di {len(mixes)} mix (cache: {int(cache_age)}s fa)"}
|
|
|
|
def set_channel_to_mix_level(self, channel: int, mix_number: int, level_db: float) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
internal_value = -32768 if level_db <= -138 else int(level_db * 100)
|
|
command = f"set MIXER:Current/InCh/ToMix/Level {channel-1} {mix_number-1} {internal_value}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Canale {channel} → Mix {mix_number} impostato a {level_db:+.1f} dB", "response": response}
|
|
|
|
def set_channel_to_mix_on_off(self, channel: int, mix_number: int, state: bool) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
command = f"set MIXER:Current/InCh/ToMix/On {channel-1} {mix_number-1} {1 if state else 0}"
|
|
response = self._send_command(command)
|
|
return {"status": "success" if "OK" in response else "error", "message": f"Invio canale {channel} → Mix {mix_number} {'acceso' if state else 'spento'}", "response": response}
|
|
|
|
def get_channel_to_mix_info(self, channel: int, mix_number: int) -> dict:
|
|
if not 1 <= channel <= TF5_INPUT_CHANNELS: return {"status": "error", "message": f"Il canale deve essere tra 1 e {TF5_INPUT_CHANNELS}"}
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
ch_idx, mix_idx = channel - 1, mix_number - 1
|
|
resp_level = self._send_command(f"get MIXER:Current/InCh/ToMix/Level {ch_idx} {mix_idx}")
|
|
level_raw = self._parse_value(resp_level)
|
|
try: level_db = int(level_raw) / 100.0 if int(level_raw) > -32768 else float('-inf')
|
|
except: level_db = None
|
|
resp_on = self._send_command(f"get MIXER:Current/InCh/ToMix/On {ch_idx} {mix_idx}")
|
|
is_on = self._parse_value(resp_on) == "1"
|
|
sanitized_level_db = self._sanitize_value(level_db)
|
|
return {"status": "success", "channel": channel, "mix": mix_number, "on": is_on, "send_level_db": sanitized_level_db, "message": f"Canale {channel} → Mix {mix_number}: {sanitized_level_db:+.1f} dB ({'ON' if is_on else 'OFF'})"}
|
|
|
|
def get_full_mix_details(self, mix_number: int) -> dict:
|
|
if not 1 <= mix_number <= TF5_MIX_BUSSES: return {"status": "error", "message": f"Il mix deve essere tra 1 e {TF5_MIX_BUSSES}"}
|
|
if not self._is_cache_valid(): self.refresh_cache()
|
|
mix_info = self.get_mix_info(mix_number)
|
|
if mix_info["status"] != "success": return mix_info
|
|
sends = []
|
|
for channel in range(1, TF5_INPUT_CHANNELS + 1):
|
|
send_info = self.get_channel_to_mix_info(channel, mix_number)
|
|
if send_info["status"] == "success" and send_info["send_level_db"] > -120.0:
|
|
with self._cache_lock:
|
|
channel_cache = self._cache.get("channels", {}).get(str(channel), {})
|
|
sends.append({"channel": channel, "channel_name": channel_cache.get("name", "Sconosciuto"), "channel_is_on": channel_cache.get("on", False), "send_level_db": send_info["send_level_db"], "send_is_on": send_info["on"]})
|
|
sends.sort(key=lambda x: x["send_level_db"], reverse=True)
|
|
return {"status": "success", "mix_details": mix_info, "active_sends": sends, "message": f"Dettagli completi per il Mix {mix_number} recuperati."} |