ircradio/ircradio/models.py

285 lines
8.8 KiB
Python

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import functools
import os
import logging
import json
import re
from typing import Optional, List
from datetime import datetime, timedelta
from dataclasses import dataclass
import mutagen
from mutagen.oggvorbis import OggVorbisInfo, OggVCommentDict
import aiofiles
from peewee import SqliteDatabase, SQL
import peewee as pw
from quart import current_app
from ircradio.youtube import YouTube
import settings
db = SqliteDatabase(f"{settings.cwd}/data/db.sqlite3")
class Ban(pw.Model):
id = pw.AutoField()
utube_id_or_nick = pw.CharField(index=True)
class Meta:
database = db
@dataclass
class SongMeta:
title: Optional[str] = None
description: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
album_cover: Optional[str] = None # path to image
bitrate: Optional[int] = None
length: Optional[float] = None
sample_rate: Optional[int] = None
channels: Optional[int] = None
mime: Optional[str] = None
class Song(pw.Model):
id = pw.AutoField()
date_added: datetime = pw.DateTimeField(default=datetime.now)
title: str = pw.CharField(index=True)
utube_id: str = pw.CharField(index=True, unique=True)
added_by: str = pw.CharField(index=True, constraints=[SQL('COLLATE NOCASE')]) # ILIKE index
duration: int = pw.IntegerField() # seconds
karma: int = pw.IntegerField(default=5, index=True)
banned: bool = pw.BooleanField(default=False)
meta: SongMeta = None # directly from file (exif) or metadata json
path: Optional[str] = None
remaining: int = None # liquidsoap playing status in seconds
@property
def to_json(self):
return {
"title": self.title,
"utube_id": self.utube_id,
"added_by": self.added_by,
"duration": self.duration,
"karma": self.karma,
"banned": self.banned
}
@staticmethod
def delete_song(utube_id: str) -> bool:
from ircradio.factory import app
try:
fn = f"{settings.dir_music}/{utube_id}.ogg"
Song.delete().where(Song.utube_id == utube_id).execute()
os.remove(fn)
except Exception as ex:
app.logger.error(f"{ex}")
return False
@staticmethod
def search(needle: str, min_chars=3) -> List['Song']:
needle = needle.replace("%", "")
if len(needle) < min_chars:
raise Exception("Search too short. Wow. More typing plz. Much effort.")
if YouTube.is_valid_uid(needle):
try:
song = Song.select().filter(Song.utube_id == needle).get()
return [song]
except:
pass
try:
q = Song.select().filter(Song.title ** f"%{needle}%")
return [s for s in q]
except:
pass
return []
@staticmethod
def by_uid(uid: str) -> Optional['Song']:
try:
return Song.select().filter(Song.utube_id == uid).get()
except:
pass
@classmethod
async def from_filepath(cls, path: str) -> 'Song':
if not os.path.exists(path):
raise Exception("filepath does not exist")
# try to detect youtube id in filename
basename = os.path.splitext(os.path.basename(path))[0]
if YouTube.is_valid_uid(basename):
try:
song = cls.select().where(Song.utube_id == basename).get()
except Exception as ex:
song = Song()
else:
song = Song()
# scan for metadata
await song.scan(path)
return song
async def scan(self, path: str = None):
# update with metadata, etc.
if path is None:
path = self.filepath
if not os.path.exists(path):
raise Exception(f"filepath {path} does not exist")
basename = os.path.splitext(os.path.basename(path))[0]
self.meta = SongMeta()
self.path = path
# EXIF direct
from ircradio.utils import mutagen_file
_m = await mutagen_file(path)
if _m:
if _m.info:
self.meta.channels = _m.info.channels
self.meta.length = int(_m.info.length)
if hasattr(_m.info, 'bitrate'):
self.meta.bitrate = _m.info.bitrate
if hasattr(_m.info, 'sample_rate'):
self.meta.sample_rate = _m.info.sample_rate
if _m.tags:
if hasattr(_m.tags, 'as_dict'):
_tags = _m.tags.as_dict()
else:
try:
_tags = {k: v for k, v in _m.tags.items()}
except:
_tags = {}
for k, v in _tags.items():
if isinstance(v, list):
v = v[0]
elif isinstance(v, (str, int, float)):
pass
else:
continue
if k in ["title", "description", "language", "date", "purl", "artist"]:
if hasattr(self.meta, k):
setattr(self.meta, k, v)
# yt-dlp metadata json file
fn_utube_meta = os.path.join(settings.dir_meta, f"{basename}.info.json")
utube_meta = {}
if os.path.exists(fn_utube_meta):
async with aiofiles.open(fn_utube_meta, mode="r") as f:
try:
utube_meta = json.loads(await f.read())
except Exception as ex:
logging.error(f"could not parse {fn_utube_meta}, {ex}")
if utube_meta:
# utube_meta file does not have anything we care about
pass
if not self.title and not self.meta.title:
# just adopt filename
self.title = os.path.basename(self.path)
return self
@property
def title_for_real_nocap(self):
if self.title:
return self.title
if self.meta.title:
return self.meta.title
return "unknown!"
@property
def title_cleaned(self):
_title = self.title_for_real_nocap
_title = re.sub(r"\(official\)", "", _title, flags=re.IGNORECASE)
_title = re.sub(r"\(official \w+\)", "", _title, flags=re.IGNORECASE)
_title = _title.replace(" - Topic - ", "")
return _title
@property
def filepath(self):
"""Absolute"""
return os.path.join(settings.dir_music, f"{self.utube_id}.ogg")
@property
def filepath_noext(self):
"""Absolute filepath without extension ... maybe"""
try:
return os.path.splitext(self.filepath)[0]
except:
return self.filepath
def image(self) -> Optional[str]:
dirname = os.path.dirname(self.path)
basename = os.path.basename(self.path)
name, ext = os.path.splitext(basename)
for _dirname in [dirname, settings.dir_meta]:
for _ext in ["", ext]:
meta_json = os.path.join(_dirname, name + _ext + ".info.json")
if os.path.exists(meta_json):
try:
f = open(meta_json, "r")
data = f.read()
f.close()
blob = json.loads(data)
if "thumbnails" in blob and isinstance(blob['thumbnails'], list):
thumbs = list(sorted(blob['thumbnails'], key=lambda k: int(k['id']), reverse=True))
image_id = thumbs[0]['id']
for sep in ['.', "_"]:
_fn = os.path.join(_dirname, name + _ext + f"{sep}{image_id}")
for img_ext in ['webp', 'jpg', 'jpeg', 'png']:
_fn_full = f"{_fn}.{img_ext}"
if os.path.exists(_fn_full):
return os.path.basename(_fn_full)
except Exception as ex:
logging.error(f"could not parse {meta_json}, {ex}")
def time_status(self) -> Optional[tuple]:
if not self.remaining:
return
duration = self.duration
if not duration:
if self.meta.length:
duration = int(self.meta.length)
if not duration:
return
_ = lambda k: timedelta(seconds=k)
a = _(duration - self.remaining)
b = _(duration)
return a, b
def time_status_str(self) -> Optional[str]:
_status = self.time_status()
if not _status:
return
a, b = map(str, _status)
if a.startswith("0:") and \
b.startswith("0:"):
a = a[2:]
b = b[2:]
return f"({a}/{b})"
class Meta:
database = db