mirror of https://git.wownero.com/dsc/ircradio.git
285 lines
8.8 KiB
Python
285 lines
8.8 KiB
Python
# SPDX-License-Identifier: BSD-3-Clause
|
|
# Copyright (c) 2021, dsc@xmr.pm
|
|
|
|
import functools
|
|
import os
|
|
import logging
|
|
import json
|
|
import re
|
|
from typing import Optional, List
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
|
|
import mutagen
|
|
from mutagen.oggvorbis import OggVorbisInfo, OggVCommentDict
|
|
import aiofiles
|
|
from peewee import SqliteDatabase, SQL
|
|
import peewee as pw
|
|
from quart import current_app
|
|
|
|
from ircradio.youtube import YouTube
|
|
import settings
|
|
|
|
db = SqliteDatabase(f"{settings.cwd}/data/db.sqlite3")
|
|
|
|
|
|
class Ban(pw.Model):
|
|
id = pw.AutoField()
|
|
utube_id_or_nick = pw.CharField(index=True)
|
|
|
|
class Meta:
|
|
database = db
|
|
|
|
|
|
@dataclass
|
|
class SongMeta:
|
|
title: Optional[str] = None
|
|
description: Optional[str] = None
|
|
artist: Optional[str] = None
|
|
album: Optional[str] = None
|
|
album_cover: Optional[str] = None # path to image
|
|
bitrate: Optional[int] = None
|
|
length: Optional[float] = None
|
|
sample_rate: Optional[int] = None
|
|
channels: Optional[int] = None
|
|
mime: Optional[str] = None
|
|
|
|
|
|
class Song(pw.Model):
|
|
id = pw.AutoField()
|
|
date_added: datetime = pw.DateTimeField(default=datetime.now)
|
|
|
|
title: str = pw.CharField(index=True)
|
|
utube_id: str = pw.CharField(index=True, unique=True)
|
|
added_by: str = pw.CharField(index=True, constraints=[SQL('COLLATE NOCASE')]) # ILIKE index
|
|
duration: int = pw.IntegerField() # seconds
|
|
karma: int = pw.IntegerField(default=5, index=True)
|
|
banned: bool = pw.BooleanField(default=False)
|
|
|
|
meta: SongMeta = None # directly from file (exif) or metadata json
|
|
path: Optional[str] = None
|
|
remaining: int = None # liquidsoap playing status in seconds
|
|
|
|
@property
|
|
def to_json(self):
|
|
return {
|
|
"title": self.title,
|
|
"utube_id": self.utube_id,
|
|
"added_by": self.added_by,
|
|
"duration": self.duration,
|
|
"karma": self.karma,
|
|
"banned": self.banned
|
|
}
|
|
|
|
@staticmethod
|
|
def delete_song(utube_id: str) -> bool:
|
|
from ircradio.factory import app
|
|
try:
|
|
fn = f"{settings.dir_music}/{utube_id}.ogg"
|
|
Song.delete().where(Song.utube_id == utube_id).execute()
|
|
os.remove(fn)
|
|
except Exception as ex:
|
|
app.logger.error(f"{ex}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def search(needle: str, min_chars=3) -> List['Song']:
|
|
needle = needle.replace("%", "")
|
|
if len(needle) < min_chars:
|
|
raise Exception("Search too short. Wow. More typing plz. Much effort.")
|
|
|
|
if YouTube.is_valid_uid(needle):
|
|
try:
|
|
song = Song.select().filter(Song.utube_id == needle).get()
|
|
return [song]
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
q = Song.select().filter(Song.title ** f"%{needle}%")
|
|
return [s for s in q]
|
|
except:
|
|
pass
|
|
|
|
return []
|
|
|
|
@staticmethod
|
|
def by_uid(uid: str) -> Optional['Song']:
|
|
try:
|
|
return Song.select().filter(Song.utube_id == uid).get()
|
|
except:
|
|
pass
|
|
|
|
@classmethod
|
|
async def from_filepath(cls, path: str) -> 'Song':
|
|
if not os.path.exists(path):
|
|
raise Exception("filepath does not exist")
|
|
|
|
# try to detect youtube id in filename
|
|
basename = os.path.splitext(os.path.basename(path))[0]
|
|
|
|
if YouTube.is_valid_uid(basename):
|
|
try:
|
|
song = cls.select().where(Song.utube_id == basename).get()
|
|
except Exception as ex:
|
|
song = Song()
|
|
else:
|
|
song = Song()
|
|
|
|
# scan for metadata
|
|
await song.scan(path)
|
|
return song
|
|
|
|
async def scan(self, path: str = None):
|
|
# update with metadata, etc.
|
|
if path is None:
|
|
path = self.filepath
|
|
|
|
if not os.path.exists(path):
|
|
raise Exception(f"filepath {path} does not exist")
|
|
basename = os.path.splitext(os.path.basename(path))[0]
|
|
|
|
self.meta = SongMeta()
|
|
self.path = path
|
|
|
|
# EXIF direct
|
|
from ircradio.utils import mutagen_file
|
|
_m = await mutagen_file(path)
|
|
|
|
if _m:
|
|
if _m.info:
|
|
self.meta.channels = _m.info.channels
|
|
self.meta.length = int(_m.info.length)
|
|
if hasattr(_m.info, 'bitrate'):
|
|
self.meta.bitrate = _m.info.bitrate
|
|
if hasattr(_m.info, 'sample_rate'):
|
|
self.meta.sample_rate = _m.info.sample_rate
|
|
if _m.tags:
|
|
if hasattr(_m.tags, 'as_dict'):
|
|
_tags = _m.tags.as_dict()
|
|
else:
|
|
try:
|
|
_tags = {k: v for k, v in _m.tags.items()}
|
|
except:
|
|
_tags = {}
|
|
|
|
for k, v in _tags.items():
|
|
if isinstance(v, list):
|
|
v = v[0]
|
|
elif isinstance(v, (str, int, float)):
|
|
pass
|
|
else:
|
|
continue
|
|
|
|
if k in ["title", "description", "language", "date", "purl", "artist"]:
|
|
if hasattr(self.meta, k):
|
|
setattr(self.meta, k, v)
|
|
|
|
# yt-dlp metadata json file
|
|
fn_utube_meta = os.path.join(settings.dir_meta, f"{basename}.info.json")
|
|
utube_meta = {}
|
|
if os.path.exists(fn_utube_meta):
|
|
async with aiofiles.open(fn_utube_meta, mode="r") as f:
|
|
try:
|
|
utube_meta = json.loads(await f.read())
|
|
except Exception as ex:
|
|
logging.error(f"could not parse {fn_utube_meta}, {ex}")
|
|
|
|
if utube_meta:
|
|
# utube_meta file does not have anything we care about
|
|
pass
|
|
|
|
if not self.title and not self.meta.title:
|
|
# just adopt filename
|
|
self.title = os.path.basename(self.path)
|
|
|
|
return self
|
|
|
|
@property
|
|
def title_for_real_nocap(self):
|
|
if self.title:
|
|
return self.title
|
|
if self.meta.title:
|
|
return self.meta.title
|
|
return "unknown!"
|
|
|
|
@property
|
|
def title_cleaned(self):
|
|
_title = self.title_for_real_nocap
|
|
_title = re.sub(r"\(official\)", "", _title, flags=re.IGNORECASE)
|
|
_title = re.sub(r"\(official \w+\)", "", _title, flags=re.IGNORECASE)
|
|
_title = _title.replace(" - Topic - ", "")
|
|
return _title
|
|
|
|
@property
|
|
def filepath(self):
|
|
"""Absolute"""
|
|
return os.path.join(settings.dir_music, f"{self.utube_id}.ogg")
|
|
|
|
@property
|
|
def filepath_noext(self):
|
|
"""Absolute filepath without extension ... maybe"""
|
|
try:
|
|
return os.path.splitext(self.filepath)[0]
|
|
except:
|
|
return self.filepath
|
|
|
|
def image(self) -> Optional[str]:
|
|
dirname = os.path.dirname(self.path)
|
|
basename = os.path.basename(self.path)
|
|
name, ext = os.path.splitext(basename)
|
|
|
|
for _dirname in [dirname, settings.dir_meta]:
|
|
for _ext in ["", ext]:
|
|
meta_json = os.path.join(_dirname, name + _ext + ".info.json")
|
|
if os.path.exists(meta_json):
|
|
try:
|
|
f = open(meta_json, "r")
|
|
data = f.read()
|
|
f.close()
|
|
|
|
blob = json.loads(data)
|
|
if "thumbnails" in blob and isinstance(blob['thumbnails'], list):
|
|
thumbs = list(sorted(blob['thumbnails'], key=lambda k: int(k['id']), reverse=True))
|
|
image_id = thumbs[0]['id']
|
|
|
|
for sep in ['.', "_"]:
|
|
_fn = os.path.join(_dirname, name + _ext + f"{sep}{image_id}")
|
|
for img_ext in ['webp', 'jpg', 'jpeg', 'png']:
|
|
_fn_full = f"{_fn}.{img_ext}"
|
|
if os.path.exists(_fn_full):
|
|
return os.path.basename(_fn_full)
|
|
except Exception as ex:
|
|
logging.error(f"could not parse {meta_json}, {ex}")
|
|
|
|
def time_status(self) -> Optional[tuple]:
|
|
if not self.remaining:
|
|
return
|
|
|
|
duration = self.duration
|
|
if not duration:
|
|
if self.meta.length:
|
|
duration = int(self.meta.length)
|
|
if not duration:
|
|
return
|
|
|
|
_ = lambda k: timedelta(seconds=k)
|
|
a = _(duration - self.remaining)
|
|
b = _(duration)
|
|
return a, b
|
|
|
|
def time_status_str(self) -> Optional[str]:
|
|
_status = self.time_status()
|
|
if not _status:
|
|
return
|
|
|
|
a, b = map(str, _status)
|
|
if a.startswith("0:") and \
|
|
b.startswith("0:"):
|
|
a = a[2:]
|
|
b = b[2:]
|
|
return f"({a}/{b})"
|
|
|
|
class Meta:
|
|
database = db
|