import os import sqlalchemy as sa from sqlalchemy.orm import Session, joinedload from . import models, schemas def get_celebrity(db: Session, celebrity_id: int): # Usiamo joinedload per caricare in anticipo le relazioni e evitare query N+1 return db.query(models.Celebrity).options( joinedload(models.Celebrity.profile_image), joinedload(models.Celebrity.images), joinedload(models.Celebrity.aliases) # Carica anche gli alias ).filter(models.Celebrity.id == celebrity_id).first() def get_celebrities(db: Session, skip: int = 0, limit: int = 100, search: str = None): query = db.query(models.Celebrity).options( joinedload(models.Celebrity.profile_image) ) if search: search_term = f"%{search}%" # Esegue un join con la tabella degli alias e filtra per nome o per alias query = query.outerjoin(models.Celebrity.aliases).filter( sa.or_( models.Celebrity.name.ilike(search_term), models.CelebrityAlias.alias_name.ilike(search_term) ) ).distinct() # distinct() evita duplicati se una celebrità ha più alias che matchano return query.offset(skip).limit(limit).all() def create_celebrity(db: Session, celebrity: schemas.CelebrityCreate): db_celebrity = models.Celebrity(**celebrity.model_dump()) db.add(db_celebrity) db.commit() db.refresh(db_celebrity) return db_celebrity def update_celebrity(db: Session, celebrity_id: int, celebrity_update: schemas.CelebrityUpdate): db_celebrity = get_celebrity(db, celebrity_id) if not db_celebrity: return None update_data = celebrity_update.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(db_celebrity, key, value) db.commit() db.refresh(db_celebrity) return db_celebrity def delete_celebrity(db: Session, celebrity_id: int): db_celebrity = get_celebrity(db, celebrity_id) if not db_celebrity: return None db.delete(db_celebrity) db.commit() return db_celebrity # --- Funzioni CRUD per gli Alias --- def create_celebrity_alias(db: Session, celebrity_id: int, alias: schemas.CelebrityAliasCreate): db_alias = models.CelebrityAlias(celebrity_id=celebrity_id, alias_name=alias.alias_name) db.add(db_alias) db.commit() db.refresh(db_alias) return db_alias def delete_celebrity_alias(db: Session, alias_id: int): db_alias = db.query(models.CelebrityAlias).filter(models.CelebrityAlias.id == alias_id).first() if not db_alias: return None db.delete(db_alias) db.commit() return db_alias # --- Funzioni CRUD per le Immagini --- def get_image(db: Session, image_id: int): return db.query(models.Image).filter(models.Image.id == image_id).first() def create_celebrity_image(db: Session, celebrity_id: int, file_path: str): db_image = models.Image(celebrity_id=celebrity_id, file_path=file_path) db.add(db_image) db.commit() db.refresh(db_image) return db_image def set_profile_image(db: Session, celebrity_id: int, image_id: int): db_celebrity = get_celebrity(db, celebrity_id) db_image = get_image(db, image_id) if not db_celebrity or not db_image or db_image.celebrity_id != celebrity_id: return None db_celebrity.profile_image_id = image_id db.commit() db.refresh(db_celebrity) return get_celebrity(db, celebrity_id) # Ritorna con le relazioni aggiornate def delete_image(db: Session, image_id: int): db_image = get_image(db, image_id) if not db_image: return None db_celebrity = get_celebrity(db, db_image.celebrity_id) if db_celebrity and db_celebrity.profile_image_id == image_id: db_celebrity.profile_image_id = None db.add(db_celebrity) try: file_on_disk = os.path.join("uploads", db_image.file_path) if os.path.exists(file_on_disk): os.remove(file_on_disk) except OSError as e: print(f"Error deleting file {file_on_disk}: {e}") db.delete(db_image) db.commit() return db_image