Files
Automixbot/main.py
T
2026-04-20 20:17:46 +02:00

249 lines
11 KiB
Python

# main.py
import asyncio
import json
import time
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import uvicorn
# Importa la tua classe
# Assicurati che il nome file sia corretto, lo rinomino per chiarezza
from mixer_controller import TF5MixerController
app = FastAPI(title="Yamaha TF5 Web Interface")
app.mount("/static", StaticFiles(directory="static"), name="static")
mixer_controller = None
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception:
pass
manager = ConnectionManager()
@app.on_event("startup")
async def startup_event():
global mixer_controller
mixer_controller = TF5MixerController(host="192.168.1.57")
asyncio.create_task(broadcast_mixer_state())
# === NUOVA RIGA ===
asyncio.create_task(broadcast_meter_data()) # Avvia il broadcast dei meter
@app.on_event("shutdown")
def shutdown_event():
if mixer_controller:
mixer_controller.close()
# NUOVA FUNZIONE PER I METER
async def broadcast_meter_data():
"""Invia i dati dei meter in tempo reale via WebSocket."""
while True:
if mixer_controller and mixer_controller._is_connected.is_set() and manager.active_connections:
try:
# Esegui la funzione bloccante in un thread separato
snapshot = await asyncio.to_thread(mixer_controller.get_meter_snapshot)
await manager.broadcast(json.dumps({"type": "meter_update", "data": snapshot}))
except Exception as e:
print(f"Errore durante il broadcast dei meter: {e}")
await asyncio.sleep(0.1) # Aggiorna 10 volte al secondo
async def broadcast_mixer_state():
"""Controlla se la cache del mixer è cambiata e invia lo stato al WebSocket."""
last_timestamp = 0
while True:
if mixer_controller and mixer_controller._is_connected.is_set():
with mixer_controller._cache_lock:
current_timestamp = mixer_controller._cache.get("timestamp", 0)
if current_timestamp > last_timestamp:
state = {
"channels": mixer_controller.get_all_channels_summary().get("channels", []),
"steinch": mixer_controller.get_all_steinch_summary().get("channels", []),
"mixes": mixer_controller.get_all_mixes_summary().get("mixes", []),
"dcas": mixer_controller.get_all_dca_summary().get("dcas", []),
"fxrtn": mixer_controller.get_all_fxrtn_summary().get("channels", []),
"stereo": mixer_controller.get_stereo_info(1).get("level_db", -120),
"stereo_on": mixer_controller.get_stereo_info(1).get("on", False)
}
await manager.broadcast(json.dumps({"type": "state_update", "data": state}))
last_timestamp = current_timestamp
await asyncio.sleep(0.1)
@app.get("/")
async def get():
with open("static/index.html", "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
# Invia lo stato iniziale appena si connette
with mixer_controller._cache_lock:
mixer_controller._cache['timestamp'] = time.time() # Forza l'invio
try:
while True:
data = await websocket.receive_text()
cmd = json.loads(data)
await handle_command(cmd)
except WebSocketDisconnect:
manager.disconnect(websocket)
def get_actual_level(response_str: str, fallback_db: float) -> float:
try:
parts = response_str.strip().split()
for part in reversed(parts):
try:
val_int = int(part)
return val_int / 100.0 if val_int > -32768 else -120.0 # <--- SOSTITUITO float('-inf') CON -120.0
except ValueError:
continue
except Exception:
pass
return fallback_db
async def handle_command(cmd: dict):
action = cmd.get("action")
target_type = cmd.get("type")
value = cmd.get("value")
raw_id = cmd.get("id", 1)
try:
ch_id = int(raw_id)
except ValueError:
ch_id = 1
# ==========================================
# 1. GESTIONE LIVELLI (MAIN FADERS)
# ==========================================
if action == "set_level":
val_float = float(value)
res = {}
if target_type == "channel": res = await asyncio.to_thread(mixer_controller.set_channel_level, ch_id, val_float)
elif target_type == "steinch": res = await asyncio.to_thread(mixer_controller.set_steinch_level, ch_id, val_float)
elif target_type == "mix": res = await asyncio.to_thread(mixer_controller.set_mix_level, ch_id, val_float)
elif target_type == "dca": res = await asyncio.to_thread(mixer_controller.set_dca_level, ch_id, val_float)
elif target_type == "fxrtn": res = await asyncio.to_thread(mixer_controller.set_fxrtn_level, ch_id, val_float)
elif target_type == "stereo": res = await asyncio.to_thread(mixer_controller.set_stereo_level, 1, val_float)
actual_db = get_actual_level(res.get("response", ""), val_float)
with mixer_controller._cache_lock:
cache_key = "channels" if target_type == "channel" else target_type
if target_type == "mix": cache_key = "mixes"
if target_type == "dca": cache_key = "dcas"
str_id = str(ch_id)
if target_type == "stereo":
mixer_controller._cache.setdefault("stereo", {}).setdefault("1", {})["level_db"] = actual_db
elif cache_key in mixer_controller._cache and str_id in mixer_controller._cache[cache_key]:
mixer_controller._cache[cache_key][str_id]["level_db"] = actual_db
mixer_controller._cache['timestamp'] = time.time()
# ==========================================
# 2. GESTIONE ON/OFF (MAIN FADERS)
# ==========================================
elif action == "set_on":
val_bool = bool(value)
if target_type == "channel": await asyncio.to_thread(mixer_controller.set_channel_on_off, ch_id, val_bool)
elif target_type == "steinch": await asyncio.to_thread(mixer_controller.set_steinch_on_off, ch_id, val_bool)
elif target_type == "mix": await asyncio.to_thread(mixer_controller.set_mix_on_off, ch_id, val_bool)
elif target_type == "dca": await asyncio.to_thread(mixer_controller.set_dca_on_off, ch_id, val_bool)
elif target_type == "fxrtn": await asyncio.to_thread(mixer_controller.set_fxrtn_on_off, ch_id, val_bool)
elif target_type == "stereo": await asyncio.to_thread(mixer_controller.set_stereo_on_off, 1, val_bool)
with mixer_controller._cache_lock:
cache_key = "channels" if target_type == "channel" else target_type
if target_type == "mix": cache_key = "mixes"
if target_type == "dca": cache_key = "dcas"
str_id = str(ch_id)
if target_type == "stereo":
mixer_controller._cache.setdefault("stereo", {}).setdefault("1", {})["on"] = val_bool
elif cache_key in mixer_controller._cache and str_id in mixer_controller._cache[cache_key]:
mixer_controller._cache[cache_key][str_id]["on"] = val_bool
mixer_controller._cache['timestamp'] = time.time()
# ==========================================
# 3. GESTIONE "SENDS ON FADERS" (NUOVO!)
# ==========================================
elif action == "get_mix_sends":
target_mix = int(cmd.get("mix", 1))
# Funzione per interrogare velocemente tutti i 40 canali per un mix specifico
def fetch_sends():
sends = []
for ch in range(1, 41): # 40 Input Channels
try:
lvl_cmd = f"get MIXER:Current/InCh/ToMix/Level {ch-1} {target_mix-1}"
on_cmd = f"get MIXER:Current/InCh/ToMix/On {ch-1} {target_mix-1}"
lvl_raw = mixer_controller._send_command(lvl_cmd)
on_raw = mixer_controller._send_command(on_cmd)
lvl_db = mixer_controller._internal_to_level(mixer_controller._parse_value(lvl_raw))
# --- LA CORREZIONE È QUI ---
# Usa il sanitizzatore per convertire -Infinity in -120.0
lvl_db = mixer_controller._sanitize_value(lvl_db)
is_on = mixer_controller._parse_value(on_raw) == "1"
# Prendi il nome dal canale master nella cache
name = f"CH {ch}"
with mixer_controller._cache_lock:
if str(ch) in mixer_controller._cache.get("channels", {}):
name = mixer_controller._cache["channels"][str(ch)]["name"]
sends.append({
"id": ch,
"name": name,
"level_db": lvl_db,
"on": is_on
})
except Exception:
pass
return sends
# Esegui in background per non bloccare FastAPI
sends_data = await asyncio.to_thread(fetch_sends)
await manager.broadcast(json.dumps({"type": "mix_sends_data", "mix": target_mix, "data": sends_data}))
elif action == "set_send_level":
target_mix = int(cmd.get("mix", 1))
val_float = float(value)
# Invia e recupera la risposta (per i log e l'allineamento)
res = await asyncio.to_thread(mixer_controller.set_channel_to_mix_level, ch_id, target_mix, val_float)
# Logghiamo l'effettivo valore applicato dal mixer (OKm)
actual_db = get_actual_level(res.get("response", ""), val_float)
print(f"DEBUG: Mandata CH{ch_id} -> MIX{target_mix} impostata a {actual_db} dB")
elif action == "set_send_on":
target_mix = int(cmd.get("mix", 1))
val_bool = bool(value)
await asyncio.to_thread(mixer_controller.set_channel_to_mix_on_off, ch_id, target_mix, val_bool)
print(f"DEBUG: Mandata CH{ch_id} -> MIX{target_mix} {'ACCESA' if val_bool else 'SPENTA'}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)