refactor: remove AsyncPath
feat: experimental threaded downloading Also add very little support to Bilibili version. Currently this isn't usable, so it'll need some time to be refactored
This commit is contained in:
parent
7d4f4e7931
commit
c8823ea3b1
|
@ -1,3 +0,0 @@
|
|||
aiohttp==3.8.1
|
||||
appdirs~=1.4.4
|
||||
aiopath~=0.6.10
|
27
setup.py
27
setup.py
|
@ -1,27 +0,0 @@
|
|||
import pathlib
|
||||
from setuptools import setup
|
||||
|
||||
# The directory containing this file
|
||||
HERE = pathlib.Path(__file__).parent
|
||||
|
||||
# The text of the README file
|
||||
README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name='worthless',
|
||||
version='2.2.1',
|
||||
packages=['worthless', 'worthless.classes', 'worthless.classes.launcher', 'worthless.classes.installer'],
|
||||
url='https://git.froggi.es/tretrauit/worthless-launcher',
|
||||
license='MIT License',
|
||||
author='tretrauit',
|
||||
author_email='tretrauit@cachyos.org',
|
||||
description='A worthless CLI launcher written in Python.',
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
classifiers=[
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3"
|
||||
],
|
||||
include_package_data=True,
|
||||
install_requires=["aiohttp", "appdirs", "aiopath"]
|
||||
)
|
|
@ -1,5 +1,4 @@
|
|||
from worthless import launcher, installer
|
||||
from worthless import launcher, gamemanager
|
||||
|
||||
Launcher = launcher.Launcher
|
||||
Installer = installer.Installer
|
||||
|
||||
GameManager = gamemanager.GameManager
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
class mhyResponse:
|
||||
"""Simple class for wrapping miHoYo web response
|
||||
Currently not used for anything.
|
||||
"""
|
||||
def __init__(self, retcode, message, data):
|
||||
self.retcode = retcode
|
||||
self.message = message
|
||||
self.data = data
|
|
@ -6,7 +6,7 @@ import asyncio
|
|||
import appdirs
|
||||
from pathlib import Path
|
||||
from worthless.launcher import Launcher
|
||||
from worthless.installer import Installer
|
||||
from worthless.gamemanager import GameManager as Installer
|
||||
from worthless.patcher import Patcher
|
||||
import worthless.constants as constants
|
||||
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
from enum import Enum
|
||||
|
||||
|
||||
class GameVariant(Enum):
|
||||
INTERNATIONAL = 1
|
||||
CHINESE = 2
|
||||
BILIBILI = 3
|
|
@ -6,44 +6,81 @@ import aiohttp
|
|||
import zipfile
|
||||
import json
|
||||
import hashlib
|
||||
import secrets
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from configparser import ConfigParser
|
||||
from aiopath import AsyncPath
|
||||
from worthless import constants
|
||||
from worthless.launcher import Launcher
|
||||
from worthless.enums import GameVariant
|
||||
from worthless.launcherconfig import LauncherConfig
|
||||
|
||||
logger = logging.getLogger("worthless.GameManager")
|
||||
|
||||
|
||||
async def _download_file(file_url: str, file_name: str, file_path: Path | str, file_len: int = None, overwrite=False,
|
||||
chunks=8192):
|
||||
chunks=None, threads_num=None) -> Path:
|
||||
"""
|
||||
Download file name to temporary directory,
|
||||
:param file_url:
|
||||
:param file_name:
|
||||
:return:
|
||||
:param file_url: The url to download the file from
|
||||
:param file_name: The file name to download into
|
||||
:return: Downloaded file as a Path object
|
||||
"""
|
||||
headers = {}
|
||||
file_path = AsyncPath(file_path).joinpath(file_name)
|
||||
if overwrite:
|
||||
await file_path.unlink(missing_ok=True)
|
||||
if await file_path.exists():
|
||||
cur_len = (await file_path.stat()).st_size
|
||||
headers |= {
|
||||
"Range": f"bytes={cur_len}-{file_len if file_len else ''}"
|
||||
if not chunks:
|
||||
chunks = 8192
|
||||
if not threads_num:
|
||||
threads_num = 8
|
||||
file_path = Path(file_path).joinpath(file_name)
|
||||
|
||||
async def _download(session: aiohttp.ClientSession, from_bytes: int, to_bytes: int, threaded: bool = None) -> Path:
|
||||
headers = {
|
||||
"Range": f"bytes={from_bytes}-{to_bytes if to_bytes else ''}"
|
||||
}
|
||||
else:
|
||||
await file_path.touch()
|
||||
async with aiohttp.ClientSession() as session:
|
||||
if not threaded:
|
||||
p = file_path
|
||||
else:
|
||||
p = file_path.parent.joinpath(secrets.token_urlsafe(16))
|
||||
p.touch(exist_ok=True)
|
||||
rsp = await session.get(file_url, headers=headers, timeout=None)
|
||||
if rsp.status == 416:
|
||||
return
|
||||
# Not an error, so yeah.
|
||||
return p
|
||||
rsp.raise_for_status()
|
||||
while True:
|
||||
chunk = await rsp.content.read(chunks)
|
||||
if not chunk:
|
||||
break
|
||||
async with file_path.open("ab") as f:
|
||||
await f.write(chunk)
|
||||
with p.open("ab") as file:
|
||||
await asyncio.to_thread(file.write, chunk)
|
||||
return p
|
||||
if overwrite:
|
||||
file_path.unlink(missing_ok=True)
|
||||
if file_path.exists():
|
||||
cur_len = file_path.stat().st_size
|
||||
else:
|
||||
file_path.touch()
|
||||
cur_len = 0
|
||||
if not file_len or threads_num == 1:
|
||||
async with aiohttp.ClientSession() as s:
|
||||
return await _download(session=s, from_bytes=cur_len, to_bytes=file_len)
|
||||
|
||||
download_bytes = file_len - cur_len
|
||||
# if bytes * threads is smaller than file_len then we will not get the full file.
|
||||
download_bytes_t = int(download_bytes / threads_num) + 1
|
||||
download_jobs = []
|
||||
current_bytes = cur_len
|
||||
async with aiohttp.ClientSession() as s:
|
||||
for thread in range(threads_num):
|
||||
next_bytes = current_bytes + download_bytes_t
|
||||
if next_bytes > file_len:
|
||||
next_bytes = file_len
|
||||
download_jobs.append(_download(session=s, from_bytes=current_bytes, to_bytes=next_bytes, threaded=True))
|
||||
all_bytes = await asyncio.gather(*download_jobs)
|
||||
# Merge bytes into the file
|
||||
with file_path.open("ab") as f:
|
||||
for bytes_path in all_bytes:
|
||||
f.write(bytes_path.read_bytes())
|
||||
bytes_path.unlink()
|
||||
|
||||
|
||||
class HDiffPatch:
|
||||
|
@ -80,7 +117,6 @@ class HDiffPatch:
|
|||
case "Darwin":
|
||||
return "macos"
|
||||
|
||||
# Rip BSD they need to use Linux compatibility layer to run this (or use Wine if they prefer that)
|
||||
raise RuntimeError("Unsupported platform")
|
||||
|
||||
def _get_hdiffpatch_exec(self, exec_name):
|
||||
|
@ -143,33 +179,34 @@ class HDiffPatch:
|
|||
name = await self.get_latest_release_name()
|
||||
if not url:
|
||||
raise RuntimeError("Unable to find latest release")
|
||||
await _download_file(url, name, self.temp_path, overwrite=True)
|
||||
await _download_file(url, name, self.temp_path, overwrite=True, threads_num=1)
|
||||
if not extract:
|
||||
return
|
||||
with zipfile.ZipFile(self.temp_path.joinpath(name), 'r') as f:
|
||||
await asyncio.to_thread(f.extractall, path=self.data_path)
|
||||
|
||||
|
||||
class Installer:
|
||||
def __init__(self, gamedir: str | Path | AsyncPath = AsyncPath.cwd(),
|
||||
overseas: bool = True, data_dir: str | Path | AsyncPath = None):
|
||||
if isinstance(gamedir, str | Path):
|
||||
gamedir = AsyncPath(gamedir)
|
||||
self._gamedir = gamedir
|
||||
class GameManager:
|
||||
def __init__(self, game_dir: str | Path = None,
|
||||
variant: GameVariant = None, data_dir: str | Path = None):
|
||||
if not game_dir:
|
||||
game_dir = Path.cwd()
|
||||
if not isinstance(game_dir, Path):
|
||||
game_dir = Path(game_dir)
|
||||
self._game_dir = game_dir
|
||||
if not data_dir:
|
||||
self._appdirs = constants.APPDIRS
|
||||
self.temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath("Installer")
|
||||
self._cache_path = Path(constants.APPDIRS.user_cache_dir).joinpath("Installer")
|
||||
else:
|
||||
if isinstance(data_dir, str | AsyncPath):
|
||||
data_dir = AsyncPath(data_dir)
|
||||
if not isinstance(data_dir, Path):
|
||||
data_dir = Path(data_dir)
|
||||
self.temp_path = data_dir.joinpath("Temp/Installer/")
|
||||
Path(self.temp_path).mkdir(parents=True, exist_ok=True)
|
||||
config_file = self._gamedir.joinpath("config.ini")
|
||||
config_file = self._game_dir.joinpath("config.ini")
|
||||
self._config_file = config_file
|
||||
self._download_chunk = 8192
|
||||
self._overseas = overseas
|
||||
self._variant = variant
|
||||
self._version = None
|
||||
self._launcher = Launcher(self._gamedir, overseas=self._overseas)
|
||||
self._launcher = Launcher(self._game_dir, variant=variant)
|
||||
self._hdiffpatch = HDiffPatch(data_dir=data_dir)
|
||||
self._config = LauncherConfig(self._config_file, self._version)
|
||||
self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+")
|
||||
|
@ -184,27 +221,26 @@ class Installer:
|
|||
await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite,
|
||||
chunks=self._download_chunk)
|
||||
|
||||
async def read_version_from_config(self):
|
||||
if not await self._config_file.exists():
|
||||
def read_version_from_config(self):
|
||||
if not self._config_file.exists():
|
||||
raise FileNotFoundError(f"Config file {self._config_file} not found")
|
||||
cfg = ConfigParser()
|
||||
await asyncio.to_thread(cfg.read, str(self._config_file))
|
||||
cfg.read(str(self._config_file))
|
||||
return cfg.get("General", "game_version")
|
||||
|
||||
async def read_version_from_game_file(self, globalgamemanagers: AsyncPath | Path | bytes) -> str:
|
||||
def read_version_from_game_file(self, globalgamemanagers: Path | bytes) -> str:
|
||||
"""
|
||||
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
|
||||
|
||||
Uses `An Anime Game Launcher` method to read the version:
|
||||
https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
|
||||
|
||||
:return: Game version (ex 1.0.0)
|
||||
:return: Game version (e.g. 1.0.0)
|
||||
"""
|
||||
if isinstance(globalgamemanagers, Path | AsyncPath):
|
||||
globalgamemanagers = AsyncPath(globalgamemanagers)
|
||||
data = await globalgamemanagers.read_text("ascii", errors="ignore")
|
||||
if isinstance(globalgamemanagers, Path):
|
||||
data = globalgamemanagers.read_text(encoding="ascii", errors="ignore")
|
||||
else:
|
||||
data = globalgamemanagers.decode("ascii", errors="ignore")
|
||||
data = globalgamemanagers.decode(encoding="ascii", errors="ignore")
|
||||
result = self._game_version_re.search(data)
|
||||
if not result:
|
||||
raise ValueError("Could not find version in game file")
|
||||
|
@ -242,7 +278,7 @@ class Installer:
|
|||
return lang
|
||||
|
||||
@staticmethod
|
||||
async def get_voiceover_archive_language(voiceover_archive: str | Path | AsyncPath) -> str:
|
||||
async def get_voiceover_archive_language(voiceover_archive: str | Path) -> str:
|
||||
if isinstance(voiceover_archive, str | Path):
|
||||
voiceover_archive = Path(voiceover_archive).resolve()
|
||||
if not voiceover_archive.exists():
|
||||
|
@ -253,16 +289,16 @@ class Installer:
|
|||
return file.name.split("_")[1]
|
||||
|
||||
@staticmethod
|
||||
async def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool:
|
||||
def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool:
|
||||
"""
|
||||
Gets voiceover archive type.
|
||||
:param voiceover_archive:
|
||||
:return: True if this is a full archive, else False.
|
||||
"""
|
||||
vo_lang = Installer.get_voiceover_archive_language(voiceover_archive)
|
||||
vo_lang = GameManager.get_voiceover_archive_language(voiceover_archive)
|
||||
with zipfile.ZipFile(voiceover_archive, 'r') as f:
|
||||
archive_path = zipfile.Path(f)
|
||||
files = (await asyncio.to_thread(f.read, "Audio_{}_pkg_version".format(vo_lang))).decode().split("\n")
|
||||
files = f.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n")
|
||||
for file in files:
|
||||
if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists():
|
||||
return False
|
||||
|
@ -272,48 +308,51 @@ class Installer:
|
|||
self._download_chunk = chunk
|
||||
|
||||
def get_game_data_name(self):
|
||||
if self._overseas:
|
||||
return "GenshinImpact_Data/"
|
||||
else:
|
||||
return "YuanShen_Data/"
|
||||
match self._variant:
|
||||
case GameVariant.INTERNATIONAL:
|
||||
return "GenshinImpact_Data/"
|
||||
case GameVariant.CHINESE:
|
||||
return "YuanShen_Data/"
|
||||
case GameVariant.BILIBILI:
|
||||
return "YuanShen_Data/"
|
||||
|
||||
def get_game_data_path(self) -> AsyncPath:
|
||||
return self._gamedir.joinpath(self.get_game_data_name())
|
||||
def get_game_data_path(self) -> Path:
|
||||
return self._game_dir.joinpath(self.get_game_data_name())
|
||||
|
||||
async def get_game_archive_version(self, game_archive: str | Path):
|
||||
def get_game_archive_version(self, game_archive: str | Path):
|
||||
game_archive = Path(game_archive)
|
||||
if not game_archive.is_file():
|
||||
raise FileNotFoundError(f"Game archive {game_archive} not found")
|
||||
with zipfile.ZipFile(game_archive, 'r') as f:
|
||||
return await self.read_version_from_game_file(
|
||||
await asyncio.to_thread(f.read, self.get_game_data_name() + "globalgamemanagers")
|
||||
return self.read_version_from_game_file(
|
||||
f.read(self.get_game_data_name() + "globalgamemanagers")
|
||||
)
|
||||
|
||||
async def get_game_version(self) -> str | None:
|
||||
def get_game_version(self) -> str | None:
|
||||
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
|
||||
if not await globalgamemanagers.exists():
|
||||
if not globalgamemanagers.exists():
|
||||
try:
|
||||
return await self.read_version_from_config()
|
||||
return self.read_version_from_config()
|
||||
except FileNotFoundError:
|
||||
return
|
||||
return await self.read_version_from_game_file(globalgamemanagers)
|
||||
return self.read_version_from_game_file(globalgamemanagers)
|
||||
|
||||
async def get_installed_voiceovers(self) -> list[str]:
|
||||
def get_installed_voiceovers(self) -> list[str]:
|
||||
"""
|
||||
Returns a list of installed voiceovers.
|
||||
|
||||
:return: List of installed voiceovers
|
||||
"""
|
||||
voiceovers = []
|
||||
async for file in self.get_game_data_path()\
|
||||
for file in self.get_game_data_path() \
|
||||
.joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir():
|
||||
if await file.is_dir():
|
||||
if file.is_dir():
|
||||
voiceovers.append(file.name)
|
||||
return voiceovers
|
||||
|
||||
async def update_game(self, game_archive: str | Path | AsyncPath):
|
||||
if not await self.get_game_data_path().exists():
|
||||
raise FileNotFoundError(f"Game not found in {self._gamedir}")
|
||||
async def update_game(self, game_archive: str | Path):
|
||||
if not self.get_game_data_path().exists():
|
||||
raise FileNotFoundError(f"Game not found in {self._game_dir}")
|
||||
if isinstance(game_archive, str | Path):
|
||||
game_archive = Path(game_archive).resolve()
|
||||
if not game_archive.exists():
|
||||
|
@ -340,8 +379,8 @@ class Installer:
|
|||
hdifffiles.append(json.loads(x)["remoteName"])
|
||||
patch_jobs = []
|
||||
for file in hdifffiles:
|
||||
current_game_file = self._gamedir.joinpath(file)
|
||||
if not await current_game_file.exists():
|
||||
current_game_file = self._game_dir.joinpath(file)
|
||||
if not current_game_file.exists():
|
||||
# Not patching since we don't have the file
|
||||
continue
|
||||
|
||||
|
@ -349,15 +388,15 @@ class Installer:
|
|||
|
||||
async def extract_and_patch(old_file, diff_file):
|
||||
diff_path = self.temp_path.joinpath(diff_file)
|
||||
if await diff_path.is_file():
|
||||
await diff_path.unlink(missing_ok=True)
|
||||
if diff_path.is_file():
|
||||
diff_path.unlink(missing_ok=True)
|
||||
await asyncio.to_thread(archive.extract, diff_file, self.temp_path)
|
||||
patch_path = self.temp_path.joinpath(diff_file)
|
||||
old_suffix = old_file.suffix
|
||||
old_file = await old_file.rename(old_file.with_suffix(".bak"))
|
||||
proc = await self._hdiffpatch.patch_file(old_file, old_file.with_suffix(old_suffix),
|
||||
patch_path, wait=True)
|
||||
await patch_path.unlink()
|
||||
patch_path.unlink()
|
||||
if proc.returncode != 0:
|
||||
# Let the game download the file.
|
||||
await old_file.rename(old_file.with_suffix(old_suffix))
|
||||
|
@ -371,16 +410,16 @@ class Installer:
|
|||
|
||||
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
|
||||
for file in deletefiles:
|
||||
current_game_file = Path(self._gamedir.joinpath(file))
|
||||
current_game_file = Path(self._game_dir.joinpath(file))
|
||||
if not current_game_file.exists():
|
||||
continue
|
||||
if current_game_file.is_file():
|
||||
current_game_file.unlink(missing_ok=True)
|
||||
|
||||
await asyncio.to_thread(archive.extractall, self._gamedir, members=files)
|
||||
await asyncio.to_thread(archive.extractall, self._game_dir, members=files)
|
||||
archive.close()
|
||||
# Update game version on local variable.
|
||||
self._version = await self.get_game_version()
|
||||
self._version = self.get_game_version()
|
||||
self.set_version_config()
|
||||
|
||||
def set_version_config(self, version: str = None):
|
||||
|
@ -401,45 +440,45 @@ class Installer:
|
|||
if vo.language == translated_lang:
|
||||
await self._download_file(vo.path, vo.get_name(), vo.size)
|
||||
|
||||
async def uninstall_game(self):
|
||||
await asyncio.to_thread(shutil.rmtree, self._gamedir, ignore_errors=True)
|
||||
def uninstall_game(self):
|
||||
shutil.rmtree(self._game_dir, ignore_errors=True)
|
||||
|
||||
async def _extract_game_file(self, archive: str | Path | AsyncPath):
|
||||
if isinstance(archive, str | AsyncPath):
|
||||
def _extract_game_file(self, archive: str | Path):
|
||||
if isinstance(archive, str):
|
||||
archive = Path(archive).resolve()
|
||||
if not archive.exists():
|
||||
raise FileNotFoundError(f"'{archive}' not found")
|
||||
with zipfile.ZipFile(archive, 'r') as f:
|
||||
await asyncio.to_thread(f.extractall, path=self._gamedir)
|
||||
await asyncio.to_thread(f.extractall, path=self._game_dir)
|
||||
|
||||
async def apply_voiceover(self, voiceover_archive: str | Path):
|
||||
def apply_voiceover(self, voiceover_archive: str | Path):
|
||||
# Since Voiceover packages are unclear about diff package or full package
|
||||
# we will try to extract the voiceover package and apply it to the game
|
||||
# making this function universal for both cases
|
||||
if not await self.get_game_data_path().exists():
|
||||
raise FileNotFoundError(f"Game not found in {self._gamedir}")
|
||||
await self._extract_game_file(voiceover_archive)
|
||||
if not self.get_game_data_path().exists():
|
||||
raise FileNotFoundError(f"Game not found in {self._game_dir}")
|
||||
self._extract_game_file(voiceover_archive)
|
||||
|
||||
async def install_game(self, game_archive: str | Path | AsyncPath, force_reinstall: bool = False):
|
||||
async def install_game(self, game_archive: str | Path, force_reinstall: bool = False):
|
||||
"""Installs the game to the current directory
|
||||
|
||||
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
|
||||
"""
|
||||
if await self.get_game_data_path().exists():
|
||||
if self.get_game_data_path().exists():
|
||||
if not force_reinstall:
|
||||
raise ValueError(f"Game is already installed in {self._gamedir}")
|
||||
await self.uninstall_game()
|
||||
raise ValueError(f"Game is already installed in {self._game_dir}")
|
||||
self.uninstall_game()
|
||||
|
||||
await self._gamedir.mkdir(parents=True, exist_ok=True)
|
||||
await self._extract_game_file(game_archive)
|
||||
self._version = await self.get_game_version()
|
||||
self._game_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._extract_game_file(game_archive)
|
||||
self._version = self.get_game_version()
|
||||
self.set_version_config()
|
||||
|
||||
async def _get_game_version(self):
|
||||
if self._version:
|
||||
from_version = self._version
|
||||
else:
|
||||
from_version = self._version = await self.get_game_version()
|
||||
from_version = self._version = self.get_game_version()
|
||||
return from_version
|
||||
|
||||
async def _get_game_resource(self):
|
||||
|
@ -502,21 +541,21 @@ class Installer:
|
|||
if v.version == from_version:
|
||||
return v
|
||||
|
||||
async def verify_from_pkg_version(self, pkg_version: AsyncPath, ignore_mismatch=False):
|
||||
contents = await pkg_version.read_text()
|
||||
async def verify_from_pkg_version(self, pkg_version: Path, ignore_mismatch=False):
|
||||
contents = pkg_version.read_text()
|
||||
|
||||
async def calculate_md5(file_to_calculate):
|
||||
file_to_calculate = AsyncPath(file_to_calculate)
|
||||
if not await file_to_calculate.exists():
|
||||
def calculate_md5(file_to_calculate):
|
||||
file_to_calculate = Path(file_to_calculate)
|
||||
if not file_to_calculate.exists():
|
||||
return ""
|
||||
async with file_to_calculate.open("rb") as f:
|
||||
with file_to_calculate.open("rb") as f:
|
||||
file_hash = hashlib.md5()
|
||||
while chunk := await f.read(self._download_chunk):
|
||||
while chunk := f.read(self._download_chunk):
|
||||
file_hash.update(chunk)
|
||||
return file_hash.hexdigest()
|
||||
|
||||
async def verify_file(file_to_verify, md5):
|
||||
file_md5 = await calculate_md5(file_to_verify)
|
||||
def verify_file(file_to_verify, md5):
|
||||
file_md5 = calculate_md5(file_to_verify)
|
||||
if file_md5 == md5:
|
||||
return None
|
||||
if ignore_mismatch:
|
||||
|
@ -528,7 +567,7 @@ class Installer:
|
|||
if not content.strip():
|
||||
continue
|
||||
info = json.loads(content)
|
||||
verify_jobs.append(verify_file(self._gamedir.joinpath(info["remoteName"]), info["md5"]))
|
||||
verify_jobs.append(asyncio.to_thread(verify_file, self._game_dir.joinpath(info["remoteName"]), info["md5"]))
|
||||
|
||||
verify_result = await asyncio.gather(*verify_jobs)
|
||||
failed_files = []
|
||||
|
@ -538,9 +577,9 @@ class Installer:
|
|||
|
||||
return None if not failed_files else failed_files
|
||||
|
||||
async def verify_game(self, pkg_version: str | Path | AsyncPath = None, ignore_mismatch=False):
|
||||
async def verify_game(self, pkg_version: str | Path = None, ignore_mismatch=False):
|
||||
if pkg_version is None:
|
||||
pkg_version = self._gamedir.joinpath("pkg_version")
|
||||
pkg_version = self._game_dir.joinpath("pkg_version")
|
||||
return await self.verify_from_pkg_version(pkg_version, ignore_mismatch)
|
||||
|
||||
async def clear_cache(self):
|
|
@ -1,32 +1,16 @@
|
|||
import aiohttp
|
||||
import locale
|
||||
|
||||
from aiopath import AsyncPath
|
||||
|
||||
from worthless import constants
|
||||
from pathlib import Path
|
||||
from worthless.classes import launcher, installer
|
||||
|
||||
|
||||
async def _get(url, **kwargs) -> dict:
|
||||
# Workaround because miHoYo uses retcode for their API instead of HTTP status code
|
||||
async with aiohttp.ClientSession() as session:
|
||||
rsp = await session.get(url, **kwargs)
|
||||
rsp_json = await rsp.json()
|
||||
if rsp_json["retcode"] != 0:
|
||||
# TODO: Add more information to the error message
|
||||
raise aiohttp.ClientResponseError(code=rsp_json["retcode"],
|
||||
message=rsp_json["message"],
|
||||
history=rsp.history,
|
||||
request_info=rsp.request_info)
|
||||
return rsp_json
|
||||
from worthless.enums import GameVariant
|
||||
|
||||
|
||||
def _get_system_language() -> str:
|
||||
"""Gets system language compatible with server parameters.
|
||||
|
||||
Return:
|
||||
System language with format xx-xx.
|
||||
System language with format xx-xx (e.g. en-us).
|
||||
"""
|
||||
|
||||
try:
|
||||
|
@ -42,53 +26,83 @@ class Launcher:
|
|||
Contains functions to get information from server and client like the official launcher.
|
||||
"""
|
||||
|
||||
def __init__(self, gamedir: str | Path = Path.cwd(), language: str = None, overseas=True):
|
||||
def __init__(self, game_dir: str | Path = None, language: str = None, variant: GameVariant = None):
|
||||
"""Initialize the launcher API
|
||||
|
||||
Args:
|
||||
gamedir (Path): Path to the game directory.
|
||||
game_dir (Path): Path to the game directory.
|
||||
"""
|
||||
self._overseas = overseas
|
||||
if overseas:
|
||||
self._api = constants.LAUNCHER_API_URL_OS
|
||||
self._params = {
|
||||
"key": "gcStgarh",
|
||||
"launcher_id": "10",
|
||||
}
|
||||
self._lang = language.lower().replace("_", "-") if language else _get_system_language()
|
||||
else:
|
||||
self._api = constants.LAUNCHER_API_URL_CN
|
||||
self._params = {
|
||||
"key": "eYd89JmJ",
|
||||
"launcher_id": "18",
|
||||
"channel_id": "1"
|
||||
}
|
||||
self._lang = "zh-cn" # Use chinese language because this is chinese version
|
||||
if isinstance(gamedir, str | AsyncPath):
|
||||
gamedir = Path(gamedir)
|
||||
self._gamedir = gamedir.resolve()
|
||||
if not variant:
|
||||
variant = GameVariant.INTERNATIONAL
|
||||
self._variant = variant
|
||||
match variant:
|
||||
case GameVariant.INTERNATIONAL:
|
||||
self._api = constants.LAUNCHER_API_URL_OS
|
||||
self._params = {
|
||||
"key": "gcStgarh",
|
||||
"launcher_id": "10",
|
||||
}
|
||||
self._lang = language.lower().replace("_", "-") if language else _get_system_language()
|
||||
case GameVariant.CHINESE:
|
||||
self._api = constants.LAUNCHER_API_URL_CN
|
||||
self._params = {
|
||||
"key": "eYd89JmJ",
|
||||
"launcher_id": "18",
|
||||
"channel_id": "1"
|
||||
}
|
||||
self._lang = "zh-cn" # Use chinese language because this is chinese version
|
||||
case GameVariant.BILIBILI:
|
||||
raise NotImplementedError()
|
||||
if not isinstance(game_dir, Path):
|
||||
game_dir = Path(game_dir)
|
||||
self._game_dir = game_dir.resolve()
|
||||
self._session = aiohttp.ClientSession()
|
||||
|
||||
async def _get(self, url, **kwargs) -> dict:
|
||||
# Workaround because miHoYo uses retcode for their API instead of HTTP status code
|
||||
rsp = await self._session.get(url, **kwargs)
|
||||
rsp_json = await rsp.json()
|
||||
if rsp_json["retcode"] != 0:
|
||||
# TODO: Add more information to the error message
|
||||
raise aiohttp.ClientResponseError(code=rsp_json["retcode"],
|
||||
message=rsp_json["message"],
|
||||
history=rsp.history,
|
||||
request_info=rsp.request_info)
|
||||
return rsp_json
|
||||
|
||||
@property
|
||||
def game_dir(self):
|
||||
return self._game_dir
|
||||
|
||||
@property
|
||||
def lang(self):
|
||||
return self._lang
|
||||
|
||||
async def _get_launcher_info(self, adv=True) -> launcher.Info:
|
||||
params = self._params | {"filter_adv": str(adv).lower(),
|
||||
"language": self._lang}
|
||||
rsp = await _get(self._api + "/content", params=params)
|
||||
rsp = await self._get(self._api + "/content", params=params)
|
||||
if rsp["data"]["adv"] is None:
|
||||
params["language"] = "en-us"
|
||||
rsp = await _get(self._api + "/content", params=params)
|
||||
rsp = await self._get(self._api + "/content", params=params)
|
||||
lc_info = launcher.Info.from_dict(rsp["data"])
|
||||
return lc_info
|
||||
|
||||
async def override_gamedir(self, gamedir: str | Path) -> None:
|
||||
@game_dir.setter
|
||||
def game_dir(self, game_dir: str | Path) -> None:
|
||||
"""Overrides game directory with another directory.
|
||||
|
||||
Args:
|
||||
gamedir (str): New directory to override with.
|
||||
game_dir (str): New directory to override with.
|
||||
"""
|
||||
if isinstance(gamedir, str):
|
||||
gamedir = Path(gamedir).resolve()
|
||||
self._gamedir = gamedir
|
||||
if not isinstance(game_dir, Path):
|
||||
game_dir = Path(game_dir)
|
||||
if not game_dir.is_dir():
|
||||
game_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._game_dir = game_dir
|
||||
|
||||
async def override_language(self, language: str) -> None:
|
||||
@lang.setter
|
||||
def lang(self, language: str) -> None:
|
||||
"""Overrides system detected language with another language.
|
||||
|
||||
Args:
|
||||
|
@ -108,7 +122,7 @@ class Launcher:
|
|||
aiohttp.ClientResponseError: An error occurred while fetching the information.
|
||||
"""
|
||||
|
||||
rsp = await _get(self._api + "/resource", params=self._params)
|
||||
rsp = await self._get(self._api + "/resource", params=self._params)
|
||||
return installer.Resource.from_dict(rsp["data"])
|
||||
|
||||
async def get_launcher_info(self) -> launcher.Info:
|
||||
|
|
|
@ -10,7 +10,7 @@ from aiopath import AsyncPath
|
|||
|
||||
from worthless import constants
|
||||
from worthless.launcher import Launcher
|
||||
from worthless.installer import Installer
|
||||
from worthless.gamemanager import GameManager as Installer
|
||||
|
||||
match platform.system():
|
||||
case "Linux":
|
||||
|
|
Loading…
Reference in New Issue