refactor: more cleanup

feat: initial support for callbacks
This commit is contained in:
tretrauit 2022-09-18 18:41:58 +07:00
parent f909297ae0
commit 2abbdf066b
Signed by: tretrauit
GPG Key ID: CDDE1C97EE305DAF
7 changed files with 459 additions and 406 deletions

View File

@ -2,6 +2,26 @@ from enum import Enum
class GameVariant(Enum):
INTERNATIONAL = 1
CHINESE = 2
BILIBILI = 3
INTERNATIONAL = "international"
CHINESE = "chinese"
BILIBILI = "bilibili"
class GameVersionLocation(Enum):
GAME_FILE = "globalgamemanagers"
LAUNCHER_CONFIG = "config.ini"
class VoicepackArchiveType(Enum):
FULL = 0
UPDATE = 1
class GameUpdateStatus(Enum):
DOWNLOADING_PATCHER = 0
PREPARING_PATCH = 1
PATCHING = 2
PREPARING_REMOVE_UNUSED = 3
REMOVING_UNUSED = 4
EXTRACTING = 5
COMPLETED = 6

View File

@ -1,18 +1,18 @@
import asyncio
import re
import shutil
import platform
import aiohttp
import zipfile
import json
import hashlib
import logging
from os import PathLike
from pathlib import Path
from configparser import ConfigParser
import worthless.helper as helper
from worthless import constants
from worthless.launcher import Launcher
from worthless.enums import GameVariant
from worthless.classes import installer
from worthless.enums import GameVariant, VoicepackArchiveType, GameUpdateStatus
from worthless.hdiffpatch import HDiffPatch
from worthless.launcherconfig import LauncherConfig
@ -31,23 +31,24 @@ class GameManager:
if not isinstance(game_dir, Path):
game_dir = Path(game_dir)
self._game_dir = game_dir
if not data_dir:
if data_dir:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
self._cache_path = data_dir.joinpath("Temp/Installer/")
else:
self._cache_path = Path(constants.APPDIRS.user_cache_dir).joinpath(
"Installer"
)
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
self.temp_path = data_dir.joinpath("Temp/Installer/")
Path(self.temp_path).mkdir(parents=True, exist_ok=True)
config_file = self._game_dir.joinpath("config.ini")
self._config_file = config_file
Path(self._cache_path).mkdir(parents=True, exist_ok=True)
self._download_chunk = 8192
self._variant = variant
self._version = None
self._launcher = Launcher(self._game_dir, variant=variant)
self._launcher = Launcher(variant=variant)
self._hdiffpatch = HDiffPatch(data_dir=data_dir)
self._config = LauncherConfig(self._config_file, self._version)
self._game_config_file = self._game_dir.joinpath("config.ini")
self._game_config = LauncherConfig(
self._game_config_file, game_version=self.version
)
self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+")
async def _download_file(
@ -62,25 +63,30 @@ class GameManager:
await helper.download_file(
file_url,
file_name,
self.temp_path,
self._cache_path,
file_len=file_len,
overwrite=overwrite,
chunks=self._download_chunk,
)
def read_version_from_config(self):
if not self._config_file.exists():
raise FileNotFoundError(f"Config file {self._config_file} not found")
cfg = ConfigParser()
cfg.read(str(self._config_file))
return cfg.get("General", "game_version")
def _read_version_from_config(self) -> str | None:
"""
Reads the version from config.ini
def read_version_from_game_file(self, globalgamemanagers: Path | bytes) -> str:
:return:
"""
if not self._game_config_file.exists():
raise FileNotFoundError(f"Config file {self._game_config_file} not found")
return self._game_config.get("General", "game_version", fallback=None)
def _read_version_from_game_file(
self, globalgamemanagers: Path | bytes
) -> str | None:
"""
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
Uses `An Anime Game Launcher` method to read the version:
https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
https://github.com/an-anime-team/an-anime-game-launcher/blob/main/src/ts/Game.ts#L26
:return: Game version (e.g. 1.0.0)
"""
@ -90,334 +96,23 @@ class GameManager:
data = globalgamemanagers.decode(encoding="ascii", errors="ignore")
result = self._game_version_re.search(data)
if not result:
raise ValueError("Could not find version in game file")
return None
return result.group(1)
@staticmethod
def voiceover_lang_translate(lang: str, base_language="game") -> str:
"""
Translates the voiceover language to the language code used by the game.
:param lang: Language to translate
:param base_language: Base language type (game/locale/both)
:return: Language code
"""
if base_language == "game" or base_language == "both":
match lang.lower():
case "english(us)":
return "en-us"
case "japanese":
return "ja-jp"
case "chinese":
return "zh-cn"
case "korean":
return "ko-kr"
if base_language == "locale" or base_language == "both":
match lang.lower().replace("_", "-"):
case "en-us":
return "English(US)"
case "ja-jp":
return "Japanese"
case "zh-cn":
return "Chinese"
case "ko-kr":
return "Korean"
# If nothing else matches
return lang
@staticmethod
async def get_voiceover_archive_language(voiceover_archive: str | Path) -> str:
if isinstance(voiceover_archive, str | Path):
voiceover_archive = Path(voiceover_archive).resolve()
if not voiceover_archive.exists():
raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found")
with zipfile.ZipFile(voiceover_archive, "r") as f:
for file in zipfile.Path(f).iterdir():
if file.name.endswith("_pkg_version"):
return file.name.split("_")[1]
@staticmethod
def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool:
"""
Gets voiceover archive type.
:param voiceover_archive:
:return: True if this is a full archive, else False.
"""
vo_lang = GameManager.get_voiceover_archive_language(voiceover_archive)
with zipfile.ZipFile(voiceover_archive, "r") as f:
archive_path = zipfile.Path(f)
files = f.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n")
for file in files:
if (
file.strip()
and not archive_path.joinpath(
json.loads(file)["remoteName"]
).exists()
):
return False
return True
def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
def get_game_data_name(self):
match self._variant:
case GameVariant.INTERNATIONAL:
return "GenshinImpact_Data/"
case GameVariant.CHINESE:
return "YuanShen_Data/"
case GameVariant.BILIBILI:
return "YuanShen_Data/"
def get_game_data_path(self) -> Path:
return self._game_dir.joinpath(self.get_game_data_name())
def get_game_archive_version(self, game_archive: str | Path):
game_archive = Path(game_archive)
if not game_archive.is_file():
raise FileNotFoundError(f"Game archive {game_archive} not found")
with zipfile.ZipFile(game_archive, "r") as f:
return self.read_version_from_game_file(
f.read(self.get_game_data_name() + "globalgamemanagers")
)
def get_game_version(self) -> str | None:
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
try:
return self.read_version_from_config()
except FileNotFoundError:
return
return self.read_version_from_game_file(globalgamemanagers)
def get_installed_voiceovers(self) -> list[str]:
"""
Returns a list of installed voiceovers.
:return: List of installed voiceovers
"""
voiceovers = []
for file in (
self.get_game_data_path()
.joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows")
.iterdir()
):
if file.is_dir():
voiceovers.append(file.name)
return voiceovers
async def update_game(self, game_archive: str | Path):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
if isinstance(game_archive, str | Path):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, "r")
if not self._hdiffpatch.get_hpatchz_executable():
await self._hdiffpatch.download_latest_release()
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then it'll
# raise 31-4xxx error ingame)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
try:
files.remove(file)
except ValueError:
pass
# hdiffpatch implementation
hdifffiles = []
for x in (
(await asyncio.to_thread(archive.read, "hdifffiles.txt"))
.decode()
.split("\n")
):
if x:
hdifffiles.append(json.loads(x)["remoteName"])
patch_jobs = []
for file in hdifffiles:
current_game_file = self._game_dir.joinpath(file)
if not current_game_file.exists():
# Not patching since we don't have the file
continue
patch_file = str(file) + ".hdiff"
async def extract_and_patch(old_file, diff_file):
diff_path = self.temp_path.joinpath(diff_file)
if diff_path.is_file():
diff_path.unlink(missing_ok=True)
await asyncio.to_thread(archive.extract, diff_file, self.temp_path)
patch_path = self.temp_path.joinpath(diff_file)
old_suffix = old_file.suffix
old_file = await old_file.rename(old_file.with_suffix(".bak"))
proc = await self._hdiffpatch.patch_file(
old_file, old_file.with_suffix(old_suffix), patch_path, wait=True
)
patch_path.unlink()
if proc.returncode != 0:
# Let the game download the file.
await old_file.rename(old_file.with_suffix(old_suffix))
return
await old_file.unlink()
files.remove(patch_file)
patch_jobs.append(extract_and_patch(current_game_file, patch_file))
await asyncio.gather(*patch_jobs)
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
for file in deletefiles:
current_game_file = Path(self._game_dir.joinpath(file))
if not current_game_file.exists():
continue
if current_game_file.is_file():
current_game_file.unlink(missing_ok=True)
await asyncio.to_thread(archive.extractall, self._game_dir, members=files)
archive.close()
# Update game version on local variable.
self._version = self.get_game_version()
self.set_version_config()
def set_version_config(self, version: str = None):
if not version:
version = self._version
self._config.set_game_version(version)
self._config.save()
async def download_full_game(self, pre_download=False):
game = await self._get_game(pre_download)
archive_name = game.latest.path.split("/")[-1]
await self._download_file(game.latest.path, archive_name, game.latest.size)
async def download_full_voiceover(self, language: str, pre_download=False):
game = await self._get_game(pre_download)
translated_lang = self.voiceover_lang_translate(language)
for vo in game.latest.voice_packs:
if vo.language == translated_lang:
await self._download_file(vo.path, vo.get_name(), vo.size)
def uninstall_game(self):
shutil.rmtree(self._game_dir, ignore_errors=True)
def _extract_game_file(self, archive: str | Path):
if isinstance(archive, str):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"'{archive}' not found")
with zipfile.ZipFile(archive, "r") as f:
await asyncio.to_thread(f.extractall, path=self._game_dir)
def apply_voiceover(self, voiceover_archive: str | Path):
# Since Voiceover packages are unclear about diff package or full package
# we will try to extract the voiceover package and apply it to the game
# making this function universal for both cases
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
self._extract_game_file(voiceover_archive)
async def install_game(
self, game_archive: str | Path, force_reinstall: bool = False
):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
if self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._game_dir}")
self.uninstall_game()
self._game_dir.mkdir(parents=True, exist_ok=True)
self._extract_game_file(game_archive)
self._version = self.get_game_version()
self.set_version_config()
async def _get_game_version(self):
if self._version:
from_version = self._version
else:
from_version = self._version = self.get_game_version()
return from_version
async def _get_game_resource(self):
game_resource = await self._launcher.get_resource_info()
if not game_resource:
raise ValueError("Could not fetch game resource")
return game_resource
async def _get_game(self, pre_download=False):
async def _get_game(self, pre_download=False) -> installer.Game:
game_resource = await self._get_game_resource()
game = game_resource.game
if pre_download:
game = game_resource.pre_download_game
return game
async def download_game_update(self, from_version: str = None, pre_download=False):
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
if self._version == game.latest.version:
raise ValueError("Game is already up to date.")
diff_archive = await self.get_game_diff_archive(from_version, pre_download)
if diff_archive is None:
raise ValueError(
"Game diff archive is not available for this version, please reinstall."
)
await self._download_file(
diff_archive.path, diff_archive.name, diff_archive.size
)
async def download_voiceover_update(
self, language: str, from_version: str = None, pre_download=False
):
if not from_version:
from_version = await self._get_game_version()
diff_archive = await self.get_voiceover_diff_archive(
language, from_version, pre_download
)
if diff_archive is None:
raise ValueError(
"Voiceover diff archive is not available for this version, please reinstall."
)
await self._download_file(
diff_archive.path, diff_archive.name, diff_archive.size
)
async def get_voiceover_diff_archive(
self, lang: str, from_version: str = None, pre_download=False
):
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
translated_lang = self.voiceover_lang_translate(lang)
for v in game.diffs:
if v.version != from_version:
continue
for vo in v.voice_packs:
if vo.language == translated_lang:
return vo
async def get_game_diff_archive(self, from_version: str = None, pre_download=False):
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
from_version = await self._get_game_version()
game = await self._get_game(pre_download=pre_download)
for v in game.diffs:
if v.version == from_version:
return v
async def verify_from_pkg_version(self, pkg_version: Path, ignore_mismatch=False):
async def _verify_from_pkg_version(self, pkg_version: Path, ignore_mismatch=False):
contents = pkg_version.read_text()
def calculate_md5(file_to_calculate):
@ -461,10 +156,333 @@ class GameManager:
return None if not failed_files else failed_files
async def verify_game(self, pkg_version: str | Path = None, ignore_mismatch=False):
@property
def version(self) -> str | None:
if not self._version:
self._version = self.get_current_game_version()
return self._version
@staticmethod
def voicepack_lang_translate(lang: str, base_language="game") -> str | None:
"""
Translates the voicepack language to the language code used by the game.
:param lang: Language to translate
:param base_language: Base language type (game/locale/both)
:return: Language code
"""
if base_language == "game" or base_language == "both":
match lang.lower():
case "english(us)":
return "en-us"
case "japanese":
return "ja-jp"
case "chinese":
return "zh-cn"
case "korean":
return "ko-kr"
if base_language == "locale" or base_language == "both":
match lang.lower().replace("_", "-"):
case "en-us":
return "English(US)"
case "ja-jp":
return "Japanese"
case "zh-cn":
return "Chinese"
case "ko-kr":
return "Korean"
# If nothing else matches
return
@staticmethod
async def get_voicepack_archive_language(archive: PathLike) -> str:
if not isinstance(archive, Path):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"Voiceover archive {archive} not found")
with zipfile.ZipFile(archive, "r") as f:
for file in zipfile.Path(f).iterdir():
if file.name.startswith("Audio_") and file.name.endswith(
"_pkg_version"
):
# Audio_<language>_pkg_version
return file.name.split("_")[1]
@staticmethod
def get_voicepack_archive_type(archive: str | Path) -> VoicepackArchiveType:
"""
Gets voicepack archive type.
:param archive: The voicepack archive to get type
:return: True if this is a full archive, else False.
"""
vo_lang = GameManager.get_voicepack_archive_language(archive)
with zipfile.ZipFile(archive, "r") as f:
archive_path = zipfile.Path(f)
files = f.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n")
for file in files:
if (
file.strip()
and not archive_path.joinpath(
json.loads(file)["remoteName"]
).exists()
):
return VoicepackArchiveType.UPDATE
return VoicepackArchiveType.FULL
def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
def get_game_data_name(self):
match self._variant:
case GameVariant.INTERNATIONAL:
return "GenshinImpact_Data/"
case GameVariant.CHINESE:
return "YuanShen_Data/"
case GameVariant.BILIBILI:
return "YuanShen_Data/"
def get_game_data_path(self) -> Path:
return self._game_dir.joinpath(self.get_game_data_name())
def get_archive_game_version(self, game_archive: str | Path):
game_archive = Path(game_archive)
if not game_archive.is_file():
raise FileNotFoundError(f"Game archive {game_archive} not found")
with zipfile.ZipFile(game_archive, "r") as f:
return self._read_version_from_game_file(
f.read(self.get_game_data_name() + "globalgamemanagers")
)
def get_current_game_version(self) -> str | None:
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
try:
return self._read_version_from_config()
except FileNotFoundError:
return
return self._read_version_from_game_file(globalgamemanagers)
def get_installed_voicepacks(self) -> list[str]:
"""
Returns a list of installed voicepacks.
:return: List of installed voicepacks
"""
voicepacks = []
for file in (
self.get_game_data_path()
.joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows")
.iterdir()
):
if file.is_dir():
voicepacks.append(file.name)
return voicepacks
async def update_game(self, game_archive: str | Path, callback=None):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
if isinstance(game_archive, str | Path):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
def call(status, file=None):
if not callback:
return
callback(status=status, file=file)
if not self._hdiffpatch.get_hpatchz_executable():
call(GameUpdateStatus.DOWNLOADING_PATCHER)
await self._hdiffpatch.download_latest_release()
archive = zipfile.ZipFile(game_archive, "r")
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then it'll
# raise 31-4xxx error ingame)
call(GameUpdateStatus.PREPARING_PATCH)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
try:
files.remove(file)
except ValueError:
pass
# hdiffpatch implementation
hdifffiles = []
for x in (
(await asyncio.to_thread(archive.read, "hdifffiles.txt"))
.decode()
.split("\n")
):
if x:
hdifffiles.append(json.loads(x)["remoteName"])
patch_jobs = []
for file in hdifffiles:
call(GameUpdateStatus.PREPARING_PATCH, file=file)
current_game_file = self._game_dir.joinpath(file)
if not current_game_file.exists():
# Not patching since we don't have the file
continue
patch_file = str(file) + ".hdiff"
async def extract_and_patch(old_file, diff_file):
call(GameUpdateStatus.PATCHING, file=old_file)
diff_path = self._cache_path.joinpath(diff_file)
if diff_path.is_file():
diff_path.unlink(missing_ok=True)
await asyncio.to_thread(archive.extract, diff_file, self._cache_path)
patch_path = self._cache_path.joinpath(diff_file)
old_suffix = old_file.suffix
old_file = await old_file.rename(old_file.with_suffix(".bak"))
proc = await self._hdiffpatch.patch_file(
old_file, old_file.with_suffix(old_suffix), patch_path, wait=True
)
patch_path.unlink()
if proc.returncode != 0:
# Let the game download the file.
await old_file.rename(old_file.with_suffix(old_suffix))
return
await old_file.unlink()
files.remove(patch_file)
patch_jobs.append(extract_and_patch(current_game_file, patch_file))
await asyncio.gather(*patch_jobs)
call(GameUpdateStatus.PREPARING_REMOVE_UNUSED)
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
for file in deletefiles:
current_game_file = Path(self._game_dir.joinpath(file))
if not current_game_file.exists():
continue
if current_game_file.is_file():
call(GameUpdateStatus.REMOVING_UNUSED, file=file)
current_game_file.unlink(missing_ok=True)
call(GameUpdateStatus.EXTRACTING)
await asyncio.to_thread(archive.extractall, self._game_dir, members=files)
archive.close()
# Update game version on local variable.
self._version = self.get_current_game_version()
self.set_version_config()
call(GameUpdateStatus.COMPLETED)
def set_version_config(self, version: str = None):
if not version:
version = self.version
self._game_config.set_game_version(version)
self._game_config.save()
async def download_full_game(self, pre_download=False):
game = await self._get_game(pre_download)
archive_name = game.latest.path.split("/")[-1]
await self._download_file(game.latest.path, archive_name, game.latest.size)
async def download_full_voicepack(self, language: str, pre_download=False):
game = await self._get_game(pre_download)
translated_lang = self.voicepack_lang_translate(language)
for vo in game.latest.voice_packs:
if vo.language == translated_lang:
await self._download_file(vo.path, vo.get_name(), vo.size)
def _extract_game_file(self, archive: str | Path):
if isinstance(archive, str):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"'{archive}' not found")
with zipfile.ZipFile(archive, "r") as f:
await asyncio.to_thread(f.extractall, path=self._game_dir)
def apply_voicepack(self, archive: str | Path):
# Since voicepack packages are unclear about diff package or full package
# we will try to extract the voicepack package and apply it to the game
# making this function universal for both cases
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
self._extract_game_file(archive)
async def install_game(
self, game_archive: str | Path, force_reinstall: bool = False
):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
if self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._game_dir}")
self.delete_game()
self._game_dir.mkdir(parents=True, exist_ok=True)
self._extract_game_file(game_archive)
self._version = self.get_current_game_version()
self.set_version_config()
async def download_game_update(self, from_version: str = None, pre_download=False):
from_version = from_version if from_version else self.version
game = await self._get_game(pre_download=pre_download)
if self.version == game.latest.version:
raise ValueError("Game is already up to date.")
diff_archive = await self.get_game_diff_archive(from_version, pre_download)
if diff_archive is None:
raise ValueError(
"Game diff archive is not available for this version, please reinstall."
)
await self._download_file(
diff_archive.path, diff_archive.name, diff_archive.size
)
async def download_voicepack_update(
self, language: str, from_version: str = None, pre_download=False
):
from_version = from_version if from_version else self.version
diff_archive = await self.get_voicepack_diff_archive(
language, from_version, pre_download
)
if diff_archive is None:
raise ValueError("Voiceover diff archive is not available for this version")
await self._download_file(
diff_archive.path, diff_archive.name, diff_archive.size
)
async def get_voicepack_diff_archive(
self, lang: str, from_version: str = None, pre_download=False
) -> installer.Diff:
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
from_version = from_version if from_version else self.version
game = await self._get_game(pre_download=pre_download)
translated_lang = self.voicepack_lang_translate(lang)
for v in game.diffs:
if v.version != from_version:
continue
for vo in v.voice_packs:
if vo.language == translated_lang:
return vo
async def get_game_diff_archive(
self, from_version: str = None, pre_download=False
) -> installer.Diff:
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
from_version = from_version if from_version else self.version
game = await self._get_game(pre_download=pre_download)
for v in game.diffs:
if v.version == from_version:
return v
async def verify_game(self, pkg_version: str | Path = None, ignore_mismatch=None):
if pkg_version is None:
pkg_version = self._game_dir.joinpath("pkg_version")
return await self.verify_from_pkg_version(pkg_version, ignore_mismatch)
if ignore_mismatch is None:
ignore_mismatch = True
return await self._verify_from_pkg_version(pkg_version, ignore_mismatch)
async def clear_cache(self):
await asyncio.to_thread(shutil.rmtree, self.temp_path, ignore_errors=True)
def delete_game(self):
shutil.rmtree(self._game_dir, ignore_errors=True)
def clear_cache(self):
shutil.rmtree(self._cache_path, ignore_errors=True)

View File

@ -1,6 +1,8 @@
import platform
import shutil
import asyncio
import zipfile
import aiohttp
from pathlib import Path
from worthless import constants, helper
@ -9,7 +11,10 @@ from worthless import constants, helper
class HDiffPatch:
"""
Contains legacy HDiffPatch support for worthless
You should not use this class directly, since it's automatically used by GameManager
"""
def __init__(self, git_url=None, data_dir=None):
if not git_url:
git_url = constants.HDIFFPATCH_GIT_URL
@ -97,11 +102,11 @@ class HDiffPatch:
rsp.raise_for_status()
for asset in (await rsp.json())["assets"]:
if (
asset["name"].endswith(".zip")
and "linux" not in asset["name"]
and "windows" not in asset["name"]
and "macos" not in asset["name"]
and "android" not in asset["name"]
asset["name"].endswith(".zip")
and "linux" not in asset["name"]
and "windows" not in asset["name"]
and "macos" not in asset["name"]
and "android" not in asset["name"]
):
return asset

View File

@ -5,7 +5,7 @@ import aiohttp
import logging
from pathlib import Path
logger = logging.getLogger("worthless.GameManager")
logger = logging.getLogger("worthless.helper")
async def download_file(
@ -44,10 +44,10 @@ async def download_file(
threaded: bool = None,
) -> Path:
headers = {"Range": f"bytes={from_bytes}-{to_bytes if to_bytes else ''}"}
if not threaded:
p = file_path
else:
if threaded:
p = file_path.parent.joinpath(secrets.token_urlsafe(16))
else:
p = file_path
p.touch(exist_ok=True)
rsp = await session.get(file_url, headers=headers, timeout=None)
if rsp.status == 416:

View File

@ -1,7 +1,6 @@
import aiohttp
import locale
from worthless import constants
from pathlib import Path
from worthless.classes import launcher, installer
from worthless.enums import GameVariant
@ -28,15 +27,10 @@ class Launcher:
def __init__(
self,
game_dir: str | Path = None,
language: str = None,
variant: GameVariant = None,
):
"""Initialize the launcher API
Args:
game_dir (Path): Path to the game directory.
"""
"""Initialize the launcher API"""
if not variant:
variant = GameVariant.INTERNATIONAL
self._variant = variant
@ -63,10 +57,15 @@ class Launcher:
"zh-cn" # Use chinese language because this is chinese version
)
case GameVariant.BILIBILI:
raise NotImplementedError()
if not isinstance(game_dir, Path):
game_dir = Path(game_dir)
self._game_dir = game_dir.resolve()
self._api = constants.LAUNCHER_API_URL_CN
self._params = {
"key": "KAtdSsoQ",
"launcher_id": "17",
"channel_id": "14",
}
self._lang = (
"zh-cn" # Use chinese language because this is chinese version
)
self._session = aiohttp.ClientSession()
async def _get(self, url, **kwargs) -> dict:
@ -83,10 +82,6 @@ class Launcher:
)
return rsp_json
@property
def game_dir(self):
return self._game_dir
@property
def lang(self):
return self._lang
@ -100,19 +95,6 @@ class Launcher:
lc_info = launcher.Info.from_dict(rsp["data"])
return lc_info
@game_dir.setter
def game_dir(self, game_dir: str | Path) -> None:
"""Overrides game directory with another directory.
Args:
game_dir (str): New directory to override with.
"""
if not isinstance(game_dir, Path):
game_dir = Path(game_dir)
if not game_dir.is_dir():
game_dir.mkdir(parents=True, exist_ok=True)
self._game_dir = game_dir
@lang.setter
def lang(self, language: str) -> None:
"""Overrides system detected language with another language.

View File

@ -1,7 +1,7 @@
from configparser import ConfigParser
from os import PathLike
from pathlib import Path
from aiopath import AsyncPath
from worthless.enums import GameVariant
class LauncherConfig:
@ -10,42 +10,72 @@ class LauncherConfig:
"""
@staticmethod
def create_config(game_version, overseas=True):
def create_config(game_version, variant: GameVariant):
"""
Creates config.ini
Creates `config.ini`
https://notabug.org/Krock/dawn/src/master/updater/update_gi.sh#L212
"""
sub_channel = "0" if overseas else "1"
sub_channel = 0
channel = 1
cps = "mihoyo"
match variant:
case GameVariant.INTERNATIONAL:
channel = 1
sub_channel = 0
cps = "mihoyo"
case GameVariant.CHINESE:
channel = 1
sub_channel = 1
cps = "mihoyo"
case GameVariant.BILIBILI:
channel = 14
sub_channel = 0
cps = "bilibili"
config = ConfigParser()
config.add_section("General")
config.set("General", "channel", "1")
config.set("General", "cps", "mihoyo")
config.set("General", "channel", str(channel))
config.set("General", "cps", cps)
config.set("General", "game_version", game_version)
config.set("General", "sdk_version", "")
config.set("General", "sub_channel", sub_channel)
config.set("General", "sub_channel", str(sub_channel))
return config
def __init__(self, config_path, game_version=None, overseas=True):
if isinstance(config_path, str | AsyncPath):
def __init__(
self, config_path: PathLike, game_version=None, variant: GameVariant = None
):
if not variant:
variant = GameVariant.INTERNATIONAL
if not isinstance(config_path, Path):
config_path = Path(config_path)
if not game_version:
game_version = "0.0.0"
self.config_path = config_path
self.config = ConfigParser()
if self.config_path.exists():
self.config.read(self.config_path)
self._config_path = config_path
self._config = ConfigParser()
if self._config_path.exists():
self._config.read(self._config_path)
else:
self.config = self.create_config(game_version, overseas)
self._config = self.create_config(game_version, variant)
def set_game_version(self, game_version):
self.config.set("General", "game_version", game_version)
self._config.set("General", "game_version", game_version)
def set_overseas(self, overseas=True):
sub_channel = "0" if overseas else "1"
self.config.set("General", "sub_channel", sub_channel)
def set_variant(self, variant: GameVariant):
sub_channel = 0
match variant:
case GameVariant.INTERNATIONAL:
sub_channel = 0
case GameVariant.CHINESE:
sub_channel = 1
case GameVariant.BILIBILI:
sub_channel = 0
self._config.set("General", "sub_channel", str(sub_channel))
def save(self):
"""
Saves config.ini
"""
with self.config_path.open("w") as config_file:
self.config.write(config_file)
with self._config_path.open("w") as config_file:
self._config.write(config_file)
def get(self, section: str, option: str, **kwargs) -> str:
return self._config.get(section=section, option=option, **kwargs)

View File

@ -34,7 +34,7 @@ class Patcher:
data_dir: str | Path | AsyncPath = None,
patch_url: str = None,
overseas=True,
patch_provider="y0soro",
patch_provider="Krock",
):
if isinstance(gamedir, str | Path):
gamedir = AsyncPath(gamedir)
@ -43,9 +43,7 @@ class Patcher:
patch_url = constants.PATCH_LIST[patch_provider].replace(
"http://", "https://"
)
self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace(
"http://", "https://"
)
self._patch_url = patch_url
if not data_dir:
self._appdirs = constants.APPDIRS
self._patch_path = AsyncPath(self._appdirs.user_data_dir).joinpath("Patch")