Files
Automixbot/config.py
2026-02-08 20:22:50 +01:00

252 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import socket
import ipaddress
from pathlib import Path
from typing import Optional
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# Configurazione mixer
DEFAULT_PORT = 49280
TF5_INPUT_CHANNELS = 40
TF5_MIX_BUSSES = 20
# Configurazione cache
CACHE_DIR = Path(".tf5_mixer_cache")
CACHE_FILE = CACHE_DIR / "channels_cache.json"
IP_CACHE_FILE = CACHE_DIR / "mixer_ip.json"
CACHE_DURATION = 3600 # 60 minuti in secondi
IP_CACHE_DURATION = 300 # 5 minuti per l'IP
def log(message: str, level: str = "INFO"):
"""Stampa log formattato con timestamp"""
timestamp = time.strftime("%H:%M:%S")
symbols = {
"INFO": "",
"SUCCESS": "",
"ERROR": "",
"WARNING": "⚠️",
"SEARCH": "🔍",
"NETWORK": "🌐",
"CACHE": "📍",
"TIME": "⏱️"
}
symbol = symbols.get(level, "")
print(f"[{timestamp}] {symbol} {message}")
def get_local_network() -> str:
"""Ottiene la rete locale del sistema"""
log("Rilevamento rete locale in corso...", "NETWORK")
try:
# Crea un socket per ottenere l'IP locale
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# Converte in network (es: 192.168.1.0/24)
ip_parts = local_ip.split('.')
network = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.0/24"
log(f"IP locale rilevato: {local_ip}", "INFO")
log(f"Rete da scansionare: {network}", "NETWORK")
return network
except Exception as e:
log(f"Errore nel rilevare la rete locale: {e}", "ERROR")
network = "192.168.1.0/24"
log(f"Uso rete di fallback: {network}", "WARNING")
return network
def check_port(ip: str, port: int, timeout: float = 0.5) -> bool:
"""Controlla se una porta è aperta su un IP"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
sock.close()
return result == 0
except Exception as e:
# Log solo per debug, commentabile se troppo verbose
# log(f"Errore su {ip}: {e}", "ERROR")
return False
def scan_network_for_mixer(port: int = DEFAULT_PORT, max_workers: int = 50) -> Optional[str]:
"""Scansiona la rete locale per trovare il mixer"""
log(f"Ricerca mixer sulla porta {port}...", "SEARCH")
start_time = time.time()
network = get_local_network()
# Genera lista di IP da scansionare
ip_network = ipaddress.ip_network(network, strict=False)
total_hosts = sum(1 for _ in ip_network.hosts())
log(f"Indirizzi da scansionare: {total_hosts}", "INFO")
log(f"Thread paralleli: {max_workers}", "INFO")
found_ip = None
scanned = 0
# Scansione parallela per velocità
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_ip = {
executor.submit(check_port, str(ip), port): str(ip)
for ip in ip_network.hosts()
}
log("Scansione in corso...", "SEARCH")
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
scanned += 1
# Log di progresso ogni 50 IP
if scanned % 50 == 0:
log(f"Progresso: {scanned}/{total_hosts} IP scansionati", "INFO")
try:
if future.result():
found_ip = ip
elapsed = time.time() - start_time
log(f"MIXER TROVATO su {ip}:{port}", "SUCCESS")
log(f"Tempo di scansione: {elapsed:.2f}s", "TIME")
log(f"IP scansionati: {scanned}/{total_hosts}", "INFO")
# Cancella i task rimanenti
executor.shutdown(wait=False, cancel_futures=True)
break
except Exception as e:
log(f"Errore durante scansione di {ip}: {e}", "ERROR")
if not found_ip:
elapsed = time.time() - start_time
log(f"Nessun mixer trovato sulla porta {port}", "ERROR")
log(f"Tempo totale di scansione: {elapsed:.2f}s", "TIME")
log(f"IP scansionati: {scanned}/{total_hosts}", "INFO")
return found_ip
def load_cached_ip() -> Optional[str]:
"""Carica l'IP dalla cache se valido"""
log("Controllo cache IP...", "CACHE")
if not IP_CACHE_FILE.exists():
log("Nessuna cache trovata", "INFO")
return None
try:
with open(IP_CACHE_FILE, 'r') as f:
data = json.load(f)
cached_ip = data.get('ip')
cache_time = data.get('timestamp', 0)
age = time.time() - cache_time
log(f"Cache trovata: {cached_ip} (età: {int(age)}s)", "CACHE")
# Controlla se la cache è ancora valida
if age < IP_CACHE_DURATION:
log(f"Verifica raggiungibilità di {cached_ip}...", "CACHE")
# Verifica che l'IP sia ancora valido
if cached_ip and check_port(cached_ip, DEFAULT_PORT, timeout=1.0):
log(f"IP dalla cache valido: {cached_ip}", "SUCCESS")
return cached_ip
else:
log(f"IP dalla cache non raggiungibile", "WARNING")
else:
log(f"Cache scaduta (max {IP_CACHE_DURATION}s)", "WARNING")
except Exception as e:
log(f"Errore nel leggere cache IP: {e}", "ERROR")
return None
def save_ip_to_cache(ip: str):
"""Salva l'IP nella cache"""
log(f"Salvataggio IP in cache: {ip}", "CACHE")
CACHE_DIR.mkdir(exist_ok=True)
try:
with open(IP_CACHE_FILE, 'w') as f:
json.dump({
'ip': ip,
'timestamp': time.time()
}, f, indent=2)
log(f"Cache salvata: {IP_CACHE_FILE}", "SUCCESS")
except Exception as e:
log(f"Errore nel salvare cache IP: {e}", "ERROR")
def get_mixer_ip() -> str:
"""
Ottiene l'IP del mixer con auto-discovery:
1. Controlla cache
2. Scansiona la rete
3. Fallback su IP predefinito
"""
log("=== INIZIO RICERCA MIXER ===", "INFO")
# Prova con la cache
cached_ip = load_cached_ip()
if cached_ip:
log("=== RICERCA COMPLETATA (CACHE) ===", "SUCCESS")
return cached_ip
# Scansiona la rete
log("Avvio scansione di rete...", "SEARCH")
discovered_ip = scan_network_for_mixer()
if discovered_ip:
save_ip_to_cache(discovered_ip)
log("=== RICERCA COMPLETATA (DISCOVERY) ===", "SUCCESS")
return discovered_ip
# Fallback
fallback_ip = "192.168.1.57"
log(f"Uso IP predefinito di fallback: {fallback_ip}", "WARNING")
log("=== RICERCA COMPLETATA (FALLBACK) ===", "WARNING")
return fallback_ip
# Variabile globale per l'host (si aggiorna automaticamente)
DEFAULT_HOST = None
def get_host() -> str:
"""Ottiene l'host del mixer (con lazy loading)"""
global DEFAULT_HOST
if DEFAULT_HOST is None:
log("Host mixer non inizializzato, avvio discovery...", "INFO")
DEFAULT_HOST = get_mixer_ip()
else:
log(f"Host mixer già inizializzato: {DEFAULT_HOST}", "INFO")
return DEFAULT_HOST
def refresh_mixer_ip():
"""Forza un refresh dell'IP del mixer"""
global DEFAULT_HOST
log("=== REFRESH FORZATO IP MIXER ===", "INFO")
log("Cancellazione cache esistente...", "CACHE")
# Rimuovi cache
if IP_CACHE_FILE.exists():
try:
IP_CACHE_FILE.unlink()
log("Cache rimossa", "SUCCESS")
except Exception as e:
log(f"Errore rimozione cache: {e}", "ERROR")
# Nuova scansione
DEFAULT_HOST = scan_network_for_mixer()
if DEFAULT_HOST:
save_ip_to_cache(DEFAULT_HOST)
log(f"Nuovo IP impostato: {DEFAULT_HOST}", "SUCCESS")
else:
log("Refresh fallito, mantengo IP precedente", "ERROR")
log("=== REFRESH COMPLETATO ===", "INFO")
return DEFAULT_HOST
# Test rapido
if __name__ == "__main__":
log("=== TEST AUTO-DISCOVERY MIXER ===", "INFO")
host = get_host()
log(f"Host finale: {host}", "SUCCESS")
# Test refresh
# log("\n=== TEST REFRESH ===", "INFO")
# refresh_mixer_ip()