ircradio/ircradio/models.py

285 lines
8.8 KiB
Python
Raw Normal View History

2021-06-17 00:27:35 +01:00
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import functools
2021-06-17 00:27:35 +01:00
import os
import logging
import json
2021-06-17 00:27:35 +01:00
import re
from typing import Optional, List
from datetime import datetime, timedelta
from dataclasses import dataclass
2021-06-17 00:27:35 +01:00
import mutagen
from mutagen.oggvorbis import OggVorbisInfo, OggVCommentDict
import aiofiles
2021-06-17 00:27:35 +01:00
from peewee import SqliteDatabase, SQL
import peewee as pw
from quart import current_app
2021-06-17 00:27:35 +01:00
from ircradio.youtube import YouTube
import settings
db = SqliteDatabase(f"{settings.cwd}/data/db.sqlite3")
class Ban(pw.Model):
id = pw.AutoField()
utube_id_or_nick = pw.CharField(index=True)
class Meta:
database = db
2022-03-18 08:25:02 +00:00
@dataclass
class SongMeta:
title: Optional[str] = None
description: Optional[str] = None
artist: Optional[str] = None
album: Optional[str] = None
album_cover: Optional[str] = None # path to image
bitrate: Optional[int] = None
length: Optional[float] = None
sample_rate: Optional[int] = None
channels: Optional[int] = None
mime: Optional[str] = None
2021-06-17 00:27:35 +01:00
class Song(pw.Model):
id = pw.AutoField()
date_added: datetime = pw.DateTimeField(default=datetime.now)
title: str = pw.CharField(index=True)
utube_id: str = pw.CharField(index=True, unique=True)
added_by: str = pw.CharField(index=True, constraints=[SQL('COLLATE NOCASE')]) # ILIKE index
duration: int = pw.IntegerField() # seconds
karma: int = pw.IntegerField(default=5, index=True)
banned: bool = pw.BooleanField(default=False)
2021-06-17 00:27:35 +01:00
meta: SongMeta = None # directly from file (exif) or metadata json
path: Optional[str] = None
remaining: int = None # liquidsoap playing status in seconds
2021-06-17 00:27:35 +01:00
2022-03-18 08:25:02 +00:00
@property
def to_json(self):
return {
"title": self.title,
"utube_id": self.utube_id,
"added_by": self.added_by,
"duration": self.duration,
"karma": self.karma,
"banned": self.banned
}
2021-06-17 00:27:35 +01:00
@staticmethod
def delete_song(utube_id: str) -> bool:
from ircradio.factory import app
try:
fn = f"{settings.dir_music}/{utube_id}.ogg"
Song.delete().where(Song.utube_id == utube_id).execute()
os.remove(fn)
except Exception as ex:
app.logger.error(f"{ex}")
return False
@staticmethod
def search(needle: str, min_chars=3) -> List['Song']:
needle = needle.replace("%", "")
if len(needle) < min_chars:
raise Exception("Search too short. Wow. More typing plz. Much effort.")
if YouTube.is_valid_uid(needle):
try:
song = Song.select().filter(Song.utube_id == needle).get()
return [song]
except:
pass
try:
q = Song.select().filter(Song.title ** f"%{needle}%")
return [s for s in q]
except:
pass
return []
@staticmethod
def by_uid(uid: str) -> Optional['Song']:
try:
return Song.select().filter(Song.utube_id == uid).get()
except:
pass
@classmethod
async def from_filepath(cls, path: str) -> 'Song':
if not os.path.exists(path):
raise Exception("filepath does not exist")
2021-06-17 00:27:35 +01:00
# try to detect youtube id in filename
basename = os.path.splitext(os.path.basename(path))[0]
2021-06-17 00:27:35 +01:00
if YouTube.is_valid_uid(basename):
try:
song = cls.select().where(Song.utube_id == basename).get()
except Exception as ex:
song = Song()
else:
song = Song()
2021-06-17 00:27:35 +01:00
# scan for metadata
await song.scan(path)
return song
2021-06-17 00:27:35 +01:00
async def scan(self, path: str = None):
# update with metadata, etc.
if path is None:
path = self.filepath
if not os.path.exists(path):
raise Exception(f"filepath {path} does not exist")
basename = os.path.splitext(os.path.basename(path))[0]
self.meta = SongMeta()
self.path = path
# EXIF direct
from ircradio.utils import mutagen_file
_m = await mutagen_file(path)
if _m:
if _m.info:
self.meta.channels = _m.info.channels
self.meta.length = int(_m.info.length)
if hasattr(_m.info, 'bitrate'):
self.meta.bitrate = _m.info.bitrate
if hasattr(_m.info, 'sample_rate'):
self.meta.sample_rate = _m.info.sample_rate
if _m.tags:
if hasattr(_m.tags, 'as_dict'):
_tags = _m.tags.as_dict()
else:
try:
_tags = {k: v for k, v in _m.tags.items()}
except:
_tags = {}
for k, v in _tags.items():
if isinstance(v, list):
v = v[0]
elif isinstance(v, (str, int, float)):
pass
else:
continue
if k in ["title", "description", "language", "date", "purl", "artist"]:
if hasattr(self.meta, k):
setattr(self.meta, k, v)
# yt-dlp metadata json file
fn_utube_meta = os.path.join(settings.dir_meta, f"{basename}.info.json")
utube_meta = {}
if os.path.exists(fn_utube_meta):
async with aiofiles.open(fn_utube_meta, mode="r") as f:
try:
utube_meta = json.loads(await f.read())
except Exception as ex:
logging.error(f"could not parse {fn_utube_meta}, {ex}")
if utube_meta:
# utube_meta file does not have anything we care about
2021-06-17 00:27:35 +01:00
pass
if not self.title and not self.meta.title:
# just adopt filename
self.title = os.path.basename(self.path)
return self
@property
def title_for_real_nocap(self):
if self.title:
return self.title
if self.meta.title:
return self.meta.title
return "unknown!"
@property
def title_cleaned(self):
_title = self.title_for_real_nocap
_title = re.sub(r"\(official\)", "", _title, flags=re.IGNORECASE)
_title = re.sub(r"\(official \w+\)", "", _title, flags=re.IGNORECASE)
_title = _title.replace(" - Topic - ", "")
return _title
2021-06-17 00:27:35 +01:00
@property
def filepath(self):
"""Absolute"""
return os.path.join(settings.dir_music, f"{self.utube_id}.ogg")
@property
def filepath_noext(self):
"""Absolute filepath without extension ... maybe"""
try:
return os.path.splitext(self.filepath)[0]
except:
return self.filepath
def image(self) -> Optional[str]:
dirname = os.path.dirname(self.path)
basename = os.path.basename(self.path)
name, ext = os.path.splitext(basename)
for _dirname in [dirname, settings.dir_meta]:
for _ext in ["", ext]:
meta_json = os.path.join(_dirname, name + _ext + ".info.json")
if os.path.exists(meta_json):
try:
f = open(meta_json, "r")
data = f.read()
f.close()
blob = json.loads(data)
if "thumbnails" in blob and isinstance(blob['thumbnails'], list):
thumbs = list(sorted(blob['thumbnails'], key=lambda k: int(k['id']), reverse=True))
image_id = thumbs[0]['id']
for sep in ['.', "_"]:
_fn = os.path.join(_dirname, name + _ext + f"{sep}{image_id}")
for img_ext in ['webp', 'jpg', 'jpeg', 'png']:
_fn_full = f"{_fn}.{img_ext}"
if os.path.exists(_fn_full):
return os.path.basename(_fn_full)
except Exception as ex:
logging.error(f"could not parse {meta_json}, {ex}")
def time_status(self) -> Optional[tuple]:
if not self.remaining:
return
duration = self.duration
if not duration:
if self.meta.length:
duration = int(self.meta.length)
if not duration:
return
_ = lambda k: timedelta(seconds=k)
a = _(duration - self.remaining)
b = _(duration)
return a, b
def time_status_str(self) -> Optional[str]:
_status = self.time_status()
if not _status:
return
a, b = map(str, _status)
if a.startswith("0:") and \
b.startswith("0:"):
a = a[2:]
b = b[2:]
return f"({a}/{b})"
2021-06-17 00:27:35 +01:00
class Meta:
database = db