ircradio/ircradio/utils.py

254 lines
7.7 KiB
Python

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
from typing import List, Optional, Union
import re
import shutil
import os
import sys
import random
import time
import asyncio
from asyncio.subprocess import Process
from io import TextIOWrapper
from dataclasses import dataclass
import mutagen
import aiofiles
import aiohttp
import jinja2
from aiocache import cached, Cache
from aiocache.serializers import PickleSerializer
from jinja2 import Environment, PackageLoader, select_autoescape
from quart import current_app
import settings
class AsyncSubProcess(object):
def __init__(self, *args, **kwargs):
self.proc: Process = None
self.max_buffer: int = 1000
self.buffer = []
@property
async def is_running(self) -> bool:
return self.proc and self.proc.returncode is None
async def run(self, args: List[str], ws_type_prefix: str):
loop = asyncio.get_event_loop()
read_stdout, write_stdout = os.pipe()
read_stderr, write_stderr = os.pipe()
self.proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE,
stdout=write_stdout,
stderr=write_stderr,
cwd=settings.cwd
)
os.close(write_stdout)
os.close(write_stderr)
f_stdout = os.fdopen(read_stdout, "r")
f_stderr = os.fdopen(read_stderr, "r")
try:
await asyncio.gather(
self.consume(fd=f_stdout, _type='stdout', _type_prefix=ws_type_prefix),
self.consume(fd=f_stderr, _type='stderr', _type_prefix=ws_type_prefix),
self.proc.communicate()
)
finally:
f_stdout.close()
f_stderr.close()
async def consume(self, fd: TextIOWrapper, _type: str, _type_prefix: str):
from ircradio.factory import app
import wow.websockets as websockets
_type_int = 0 if _type == "stdout" else 1
reader = asyncio.StreamReader()
loop = asyncio.get_event_loop()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
fd
)
async for line in reader:
line = line.strip()
msg = line.decode(errors="ignore")
_logger = app.logger.info if _type_int == 0 else app.logger.error
_logger(msg)
self.buffer.append((int(time.time()), _type_int, msg))
if len(self.buffer) >= self.max_buffer:
self.buffer.pop(0)
await websockets.broadcast(
message=line,
message_type=f"{_type_prefix}_{_type}",
)
async def loopyloop(secs: int, func, after_func=None):
while True:
result = await func()
if after_func:
await after_func(result)
await asyncio.sleep(secs)
def jinja2_render(template_name: str, **data):
loader = jinja2.FileSystemLoader(searchpath=[
os.path.join(settings.cwd, "utils"),
os.path.join(settings.cwd, "ircradio/templates")
])
env = jinja2.Environment(loader=loader, autoescape=select_autoescape())
template = env.get_template(template_name)
return template.render(**data)
async def write_file(fn: str, data: Union[str, bytes], mode="w"):
async with aiofiles.open(fn, mode=mode) as f:
f.write(data)
def write_file_sync(fn: str, data: bytes):
f = open(fn, "wb")
f.write(data)
f.close()
async def executeSQL(sql: str, params: tuple = None):
from ircradio.factory import db
async with db.pool.acquire() as connection:
async with connection.transaction():
result = connection.fetch(sql, params)
return result
def systemd_servicefile(
name: str, description: str, user: str, group: str,
path_executable: str, args_executable: str, env: str = None
) -> bytes:
template = jinja2_render(
"acme.service.jinja2",
name=name,
description=description,
user=user,
group=group,
env=env,
path_executable=path_executable,
args_executable=args_executable
)
return template.encode()
async def httpget(url: str, json=True, timeout: int = 5, raise_for_status=True, verify_tls=True):
headers = {"User-Agent": random_agent()}
opts = {"timeout": aiohttp.ClientTimeout(total=timeout)}
async with aiohttp.ClientSession(**opts) as session:
async with session.get(url, headers=headers, ssl=verify_tls) as response:
if raise_for_status:
response.raise_for_status()
result = await response.json() if json else await response.text()
if result is None or (isinstance(result, str) and result == ''):
raise Exception("empty response from request")
return result
def random_agent():
from ircradio import user_agents
return random.choice(user_agents)
def print_banner():
print("""\033[91m ▪ ▄▄▄ ▄▄· ▄▄▄ ▄▄▄· ·▄▄▄▄ ▪
██ ▀▄ █·▐█ ▌▪▀▄ █·▐█ ▀█ ██▪ ██ ██ ▪
▐█·▐▀▀▄ ██ ▄▄▐▀▀▄ ▄█▀▀█ ▐█· ▐█▌▐█· ▄█▀▄
▐█▌▐█•█▌▐███▌▐█•█▌▐█ ▪▐▌██. ██ ▐█▌▐█▌.▐▌
▀▀▀.▀ ▀·▀▀▀ .▀ ▀ ▀ ▀ ▀▀▀▀▀• ▀▀▀ ▀█▄▀▪\033[0m
""".strip())
async def radio_update_task_run_forever():
while True:
sleep_secs = 15
try:
sleep_secs = await radio_update_task(sleep_secs)
await asyncio.sleep(sleep_secs)
except Exception as ex:
current_app.logger.error(ex)
await asyncio.sleep(sleep_secs)
async def radio_update_task(sleep_secs) -> int:
from ircradio.factory import websocket_status_bus
from ircradio.station import SongDataclass
from ircradio.radio import Radio
if len(websocket_status_bus.subscribers) >= 1:
sleep_secs = 4
blob = {}
radio_stations = list(settings.radio_stations.values())
# radio_stations = radio_stations[1:]
# radio_stations = radio_stations[:2]
for radio_station in radio_stations:
radio_station.song = None
data = {
'added_by': 'system',
'image': None,
'duration': None,
'progress': None,
}
np = await radio_station.np()
if np:
listeners = await radio_station.get_listeners()
if listeners is not None:
radio_station.listeners = listeners
data['title'] = np.title_cleaned
data['karma'] = np.karma
data['utube_id'] = np.utube_id
data['image'] = np.image()
data['duration'] = np.duration
data['added_by'] = np.added_by
time_status = np.time_status()
if time_status:
a, b = time_status
pct = percentage(a.seconds, b.seconds)
if pct >= 100:
pct = 100
data['progress'] = int(pct)
data['progress_str'] = " / ".join(map(str, time_status))
radio_station.song = SongDataclass(**data)
blob[radio_station.id] = radio_station
if blob:
await websocket_status_bus.put(blob)
return sleep_secs
@cached(ttl=3600, cache=Cache.MEMORY,
key_builder=lambda *args, **kw: f"mutagen_file_{args[1]}",
serializer=PickleSerializer())
async def mutagen_file(path):
from quart import current_app
if current_app:
return await current_app.sync_to_async(mutagen.File)(path)
else:
return mutagen.File(path)
def percentage(part, whole):
return 100 * float(part)/float(whole)