87 lines
2.8 KiB
Python
87 lines
2.8 KiB
Python
import os
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from . import models, schemas
|
|
|
|
def get_celebrity(db: Session, celebrity_id: int):
|
|
# Usiamo joinedload per caricare in anticipo le relazioni e evitare query N+1
|
|
return db.query(models.Celebrity).options(
|
|
joinedload(models.Celebrity.profile_image),
|
|
joinedload(models.Celebrity.images)
|
|
).filter(models.Celebrity.id == celebrity_id).first()
|
|
|
|
def get_celebrities(db: Session, skip: int = 0, limit: int = 100):
|
|
return db.query(models.Celebrity).offset(skip).limit(limit).all()
|
|
|
|
def create_celebrity(db: Session, celebrity: schemas.CelebrityCreate):
|
|
db_celebrity = models.Celebrity(**celebrity.model_dump())
|
|
db.add(db_celebrity)
|
|
db.commit()
|
|
db.refresh(db_celebrity)
|
|
return db_celebrity
|
|
|
|
def update_celebrity(db: Session, celebrity_id: int, celebrity_update: schemas.CelebrityUpdate):
|
|
db_celebrity = get_celebrity(db, celebrity_id)
|
|
if not db_celebrity:
|
|
return None
|
|
|
|
update_data = celebrity_update.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(db_celebrity, key, value)
|
|
|
|
db.commit()
|
|
db.refresh(db_celebrity)
|
|
return db_celebrity
|
|
|
|
def delete_celebrity(db: Session, celebrity_id: int):
|
|
db_celebrity = get_celebrity(db, celebrity_id)
|
|
if not db_celebrity:
|
|
return None
|
|
|
|
db.delete(db_celebrity)
|
|
db.commit()
|
|
return db_celebrity
|
|
|
|
# --- Funzioni CRUD per le Immagini ---
|
|
|
|
def get_image(db: Session, image_id: int):
|
|
return db.query(models.Image).filter(models.Image.id == image_id).first()
|
|
|
|
def create_celebrity_image(db: Session, celebrity_id: int, file_path: str):
|
|
db_image = models.Image(celebrity_id=celebrity_id, file_path=file_path)
|
|
db.add(db_image)
|
|
db.commit()
|
|
db.refresh(db_image)
|
|
return db_image
|
|
|
|
def set_profile_image(db: Session, celebrity_id: int, image_id: int):
|
|
db_celebrity = get_celebrity(db, celebrity_id)
|
|
db_image = get_image(db, image_id)
|
|
|
|
if not db_celebrity or not db_image or db_image.celebrity_id != celebrity_id:
|
|
return None
|
|
|
|
db_celebrity.profile_image_id = image_id
|
|
db.commit()
|
|
db.refresh(db_celebrity)
|
|
return get_celebrity(db, celebrity_id) # Ritorna con le relazioni aggiornate
|
|
|
|
def delete_image(db: Session, image_id: int):
|
|
db_image = get_image(db, image_id)
|
|
if not db_image:
|
|
return None
|
|
|
|
db_celebrity = get_celebrity(db, db_image.celebrity_id)
|
|
if db_celebrity and db_celebrity.profile_image_id == image_id:
|
|
db_celebrity.profile_image_id = None
|
|
db.add(db_celebrity)
|
|
|
|
try:
|
|
file_on_disk = os.path.join("uploads", db_image.file_path)
|
|
if os.path.exists(file_on_disk):
|
|
os.remove(file_on_disk)
|
|
except OSError as e:
|
|
print(f"Error deleting file {file_on_disk}: {e}")
|
|
|
|
db.delete(db_image)
|
|
db.commit()
|
|
return db_image |