diff --git a/README.md b/README.md index e7d5a4d..3f095e3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ # worthless-launcher -A worthless CLI launcher written in Python. \ No newline at end of file +A worthless CLI launcher written in Python. + +Check out its website at https://tretrauit.gitlab.io/worthless-launcher for more information. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 55feafa..0787310 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ aiohttp==3.8.1 appdirs~=1.4.4 -aiopath~=0.6.10 -xdelta3~=0.0.5 \ No newline at end of file +aiopath~=0.6.10 \ No newline at end of file diff --git a/setup.py b/setup.py index 061595b..d726aa5 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ README = (HERE / "README.md").read_text() setup( name='worthless', - version='1.3.1-2', + version='2.0.0', packages=['worthless', 'worthless.classes', 'worthless.classes.launcher', 'worthless.classes.installer'], url='https://git.froggi.es/tretrauit/worthless-launcher', license='MIT License', diff --git a/worthless/__main__.py b/worthless/__main__.py index c3a60a9..2760c3f 100755 --- a/worthless/__main__.py +++ b/worthless/__main__.py @@ -1,6 +1,6 @@ #!/usr/bin/python3 +import asyncio +from worthless import cli -from worthless import gui - -if __name__ == '__main__': - gui.main() +if __name__ == "__main__": + asyncio.run(cli.main()) diff --git a/worthless/classes/installer/diff.py b/worthless/classes/installer/diff.py index 55b27e8..921b1fc 100644 --- a/worthless/classes/installer/diff.py +++ b/worthless/classes/installer/diff.py @@ -12,6 +12,9 @@ class Diff: self.voice_packs = voice_packs self.raw = raw + def get_name(self): + return self.path.split("/")[-1] + @staticmethod def from_dict(data): voice_packs = [] diff --git a/worthless/classes/installer/latest.py b/worthless/classes/installer/latest.py index fdae997..5cace04 100644 --- a/worthless/classes/installer/latest.py +++ b/worthless/classes/installer/latest.py @@ -14,6 +14,9 @@ class Latest: self.segments = segments self.raw = raw + def get_name(self): + return self.path.split("/")[-1] + @staticmethod def from_dict(data): voice_packs = [] diff --git a/worthless/classes/installer/voicepack.py b/worthless/classes/installer/voicepack.py index 26c5383..1c325da 100644 --- a/worthless/classes/installer/voicepack.py +++ b/worthless/classes/installer/voicepack.py @@ -7,6 +7,9 @@ class Voicepack: self.md5 = md5 self.raw = raw + def get_name(self): + return self.path.split("/")[-1] + @staticmethod def from_dict(data): return Voicepack(data["language"], data["name"], data["path"], data["size"], data["md5"], data) diff --git a/worthless/gui.py b/worthless/cli.py similarity index 59% rename from worthless/gui.py rename to worthless/cli.py index 04dda37..db5e624 100755 --- a/worthless/gui.py +++ b/worthless/cli.py @@ -22,145 +22,146 @@ class UI: def _ask(self, question): if self._noconfirm: # Fake dialog - print(question + " (y/n): y") + print(question + " [Y/n]:") return True answer = "" - while answer.lower() not in ['y', 'n']: + while answer.lower() not in ['y', 'n', '']: if answer != "": print("Invalid choice, please try again.") - answer = input(question + " (y/n): ") - return answer.lower() == 'y' + answer = input(question + " [Y/n]: ") + return answer.lower() == 'y' or answer == '' def override_game_version(self, version: str): self._installer._version = version - def get_game_version(self): - print(self._installer.get_game_version()) + async def get_game_version(self): + print(await self._installer.get_game_version()) - def block_telemetry(self): + async def block_telemetry(self): print("Checking for available telemetry to block...") try: - asyncio.run(self._patcher.block_telemetry()) + await self._patcher.block_telemetry() except ValueError: print("No telemetry to block.") else: print("Telemetry blocked.") - def check_telemetry(self): - block_status = asyncio.run(self._patcher.is_telemetry_blocked()) + async def check_telemetry(self): + block_status = await self._patcher.is_telemetry_blocked() if not block_status: print("Telemetry is blocked.") else: print("Telemetry is not blocked, you need to block these hosts below.") - for block in block_status: - print(block) + for hosts in block_status: + print(hosts) - def _update_from_archive(self, filepath): + async def _update_from_archive(self, filepath): print("Reverting patches if patched...") - self._patcher.revert_patch(True) - print("Updating game from archive (this may takes some time)...") - asyncio.run(self._installer.update_game(filepath)) + await self._patcher.revert_patch(True) + print("Updating game from archive (This may takes some time)...") + await self._installer.update_game(filepath) self._installer.set_version_config() - def _install_from_archive(self, filepath, force_reinstall): - print("Installing game from archive (this may takes some time)...") - self._installer.install_game(filepath, force_reinstall) + async def _install_from_archive(self, filepath, force_reinstall): + print("Installing game from archive (This may takes some time)...") + print(filepath) + await self._installer.install_game(filepath, force_reinstall) self._installer.set_version_config() - def _apply_voiceover_from_archive(self, filepath): - print("Applying voiceover from archive (this may takes some time)...") - self._installer.apply_voiceover(filepath) + async def _apply_voiceover_from_archive(self, filepath): + print("Applying voiceover from archive (This may takes some time)...") + print("Voiceover archive:", filepath) + await self._installer.apply_voiceover(filepath) - def install_voiceover_from_file(self, filepath): + async def install_voiceover_from_file(self, filepath): print("Archive voiceover language: {} ({})".format( - self._installer.get_voiceover_archive_language(filepath), - "Full archive" if self._installer.get_voiceover_archive_type(filepath) else "Update archive")) + await self._installer.get_voiceover_archive_language(filepath), + "Full archive" if await self._installer.get_voiceover_archive_type(filepath) else "Update archive")) if not self._ask("Do you want to apply this voiceover pack? ({})".format(filepath)): print("Aborting apply process.") return - self._apply_voiceover_from_archive(filepath) + await self._apply_voiceover_from_archive(filepath) print("Voiceover applied successfully.") - def revert_patch(self): + async def revert_patch(self): print("Reverting patches...") - self._patcher.revert_patch(True) + await self._patcher.revert_patch(True) print("Patches reverted.") - def patch_game(self, login_fix: bool = False): + async def patch_game(self, login_fix: bool = False): print("NOTE: Hereby you are violating the game's Terms of Service!") print("Do not patch the game if you don't know what you are doing!") if not self._ask("Do you want to patch the game? (This will overwrite your game files!)"): print("Aborting patch process.") return - self.block_telemetry() + await self.block_telemetry() print("Updating patches...") - asyncio.run(self._patcher.download_patch()) + await self._patcher.download_patch() print("Patching game...") - self._patcher.apply_patch(login_fix) + await self._patcher.apply_patch(login_fix) print("Game patched.") print("Please refrain from sharing this project to public, thank you.") - def install_from_file(self, filepath): - gamever = self._installer.get_game_version() - print("Archive game version: " + self._installer.get_game_archive_version(filepath)) + async def install_from_file(self, filepath): + gamever = await self._installer.get_game_version() + print("Archive game version:", await self._installer.get_game_archive_version(filepath)) if gamever: - print("Current game installation detected. ({})".format(self._installer.get_game_version())) + print("Current game installation detected. ({})".format(await self._installer.get_game_version())) if not self._ask("Do you want to update the game? ({})".format(filepath)): print("Aborting update process.") return - self._update_from_archive(filepath) + await self._update_from_archive(filepath) print("Game updated successfully.") else: print("No game installation detected.") if not self._ask("Do you want to install the game? ({})".format(filepath)): print("Aborting installation process.") return - self._install_from_archive(filepath, False) + await self._install_from_archive(filepath, False) print("Game installed successfully.") - def download_patch(self): + async def download_patch(self): print("Downloading patches...") - asyncio.run(self._patcher.download_patch()) + await self._patcher.download_patch() - def download_game(self): + async def download_game(self): print("Downloading full game (This will take a long time)...") - asyncio.run(self._installer.download_full_game()) + await self._installer.download_full_game() - def download_game_update(self): + async def download_game_update(self): print("Downloading game update (This will take a long time)...") - asyncio.run(self._installer.download_game_update()) + await self._installer.download_game_update() - def download_voiceover(self, languages: str): - res_info = asyncio.run(self._launcher.get_resource_info()) + async def download_voiceover(self, languages: str): + res_info = await self._launcher.get_resource_info() for lng in languages.split(" "): for vo in res_info.game.latest.voice_packs: if not self._installer.voiceover_lang_translate(lng) == vo.language: continue print("Downloading voiceover pack for {} (This will take a long time)...".format(lng)) - asyncio.run(self._installer.download_full_voiceover(lng)) + await self._installer.download_full_voiceover(lng) - def download_voiceover_update(self, languages: str): - res_info = asyncio.run(self._launcher.get_resource_info()) + async def download_voiceover_update(self, languages: str): + res_info = await self._launcher.get_resource_info() for lng in languages.split(" "): for vo in res_info.game.latest.voice_packs: if not self._installer.voiceover_lang_translate(lng) == vo.language: continue print("Downloading voiceover update pack for {} (This will take a long time)...".format(lng)) - asyncio.run(self._installer.download_voiceover_update(lng)) + await self._installer.download_voiceover_update(lng) - def install_game(self, forced: bool = False): - res_info = asyncio.run(self._launcher.get_resource_info()) + async def install_game(self, forced: bool = False): + res_info = await self._launcher.get_resource_info() print("Latest game version: {}".format(res_info.game.latest.version)) if not self._ask("Do you want to install the game?"): print("Aborting game installation process.") return - self.download_game() - print("Installing game...") - self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name), forced) + await self.download_game() + print("Game archive:", res_info.game.latest.get_name()) + await self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.get_name()), forced) - def install_voiceover(self, languages: str): - res_info = asyncio.run(self._launcher.get_resource_info()) - print("Latest game version: {}".format(res_info.game.latest.version)) + async def install_voiceover(self, languages: str): + res_info = await self._launcher.get_resource_info() for lng in languages.split(" "): for vo in res_info.game.latest.voice_packs: if not self._installer.voiceover_lang_translate(lng) == vo.language: @@ -169,19 +170,20 @@ class UI: print("Aborting voiceover installation process.") return print("Downloading voiceover pack (This will take a long time)...") - asyncio.run(self._installer.download_full_voiceover(lng)) - print("Installing voiceover pack...") - self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name)) + await self._installer.download_full_voiceover(lng) + await self._apply_voiceover_from_archive( + self._installer.temp_path.joinpath(vo.get_name()) + ) break - def update_game(self): - game_ver = self._installer.get_game_version() + async def update_game(self): + game_ver = await self._installer.get_game_version() if not game_ver: - self.install_game() + await self.install_game() return - print("Current game installation detected. ({})".format(game_ver)) - diff_archive = asyncio.run(self._installer.get_game_diff_archive()) - res_info = asyncio.run(self._launcher.get_resource_info()) + print("Current game installation detected: {}".format(game_ver)) + diff_archive = await self._installer.get_game_diff_archive() + res_info = await self._launcher.get_resource_info() if not diff_archive: print("No game updates available.") return @@ -190,19 +192,24 @@ class UI: print("Aborting game update process.") return print("Downloading game update (This will take a long time)...") - asyncio.run(self._installer.download_game_update()) + await self._installer.download_game_update() print("Installing game update...") - self.install_from_file(self._installer.temp_path.joinpath(res_info.game.latest.name)) + await self.install_from_file(self._installer.temp_path.joinpath(res_info.game.latest.name)) - def update_voiceover(self, languages: str): - game_ver = self._installer.get_game_version() + async def update_voiceover(self, languages: str | list): + if isinstance(languages, str): + languages = languages.split(" ") + game_ver = await self._installer.get_game_version() if not game_ver: - self.install_voiceover(languages) + print("Couldn't detect current game installation, is game installed?") return - print("Current game installation detected. ({})".format(game_ver)) - for lng in languages.split(" "): - diff_archive = asyncio.run(self._installer.get_voiceover_diff_archive(lng)) - # res_info = asyncio.run(self._launcher.get_resource_info()) + installed_voiceovers = await self._installer.get_installed_voiceovers() + print(f"Installed voiceovers: {None if installed_voiceovers == [] else ', '.join(installed_voiceovers)}") + for lng in languages: + if self._installer.voiceover_lang_translate(lng, "locale") not in installed_voiceovers: + await self.install_voiceover(lng) + continue + diff_archive = await self._installer.get_voiceover_diff_archive(lng) if not diff_archive: print("No voiceover updates available for {}.".format(lng)) continue @@ -210,19 +217,40 @@ class UI: print("Aborting this voiceover language update process.") continue print("Downloading voiceover update (This may takes some time)...") - asyncio.run(self._installer.download_voiceover_update(lng)) + await self._installer.download_voiceover_update(lng) print("Installing voiceover update for {}...".format(lng)) - self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(diff_archive.name)) + await self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(diff_archive.get_name())) - def update_game_voiceover(self, languages: str): - self.update_game() - self.update_voiceover(languages) + async def update_game_voiceover(self, languages: str): + await self.update_game() + await self.update_voiceover(languages) - def interactive_ui(self): - raise NotImplementedError() + async def update_all(self): + await self.update_game() + await self.update_voiceover(await self._installer.get_installed_voiceovers()) + + async def verify_game(self): + game_ver = await self._installer.get_game_version() + if not game_ver: + print("Couldn't detect current game installation, is game installed?") + return + print("Verifying game contents... (This may takes a long time)") + failed_files = await self._installer.verify_game(ignore_mismatch=True) + if not failed_files: + print("All good.") + return + print("Some game files got corrupted (mismatch md5), uh oh.") + for file in failed_files: + print("{}: expected {}, actual {}".format(file[0], file[1], file[2])) + + async def clear_cache(self): + if self._ask("Do you want to clear Installer cache (contains downloaded game files, etc)"): + await self._installer.clear_cache() + if self._ask("Do you want to clear Patcher cache (contains files used to patch)"): + await self._patcher.clear_cache() -def main(): +async def main(): default_dirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) parser = argparse.ArgumentParser(prog="worthless", description="A worthless launcher written in Python.") parser.add_argument("-D", "--dir", action="store", type=Path, default=Path.cwd(), @@ -259,19 +287,15 @@ def main(): parser.add_argument("-Rs", "--remove", action="store_true", help="Remove the game (if installed)") parser.add_argument("-Rp", "--remove-patch", action="store_true", help="Revert the game patch (if patched)") parser.add_argument("-Rv", "--remove-voiceover", action="store_true", help="Remove a Voiceover pack (if installed)") + parser.add_argument("-V", "--verify", action="store_true", help="Verify the game installation") parser.add_argument("--get-game-version", action="store_true", help="Get the current game version") parser.add_argument("--no-overseas", action="store_true", help="Don't use overseas server") parser.add_argument("--check-telemetry", action="store_true", help="Check for the telemetry information") + parser.add_argument("--clear-cache", action="store_true", help="Clear cache used by worthless") parser.add_argument("--from-ver", action="store", help="Override the detected game version", type=str, default=None) parser.add_argument("--noconfirm", action="store_true", help="Do not ask any for confirmation. (Ignored in interactive mode)") args = parser.parse_args() - interactive_mode = not args.install and not args.install_from_file and not args.patch and not args.update and not \ - args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version and not \ - args.install_voiceover_from_file and not args.update_voiceover and not args.download_game and not \ - args.download_voiceover and not args.download_game_update and not args.download_voiceover_update and not \ - args.install_voiceover_from_file and not args.update_all and not args.login_fix and not args.check_telemetry\ - and not args.from_ver if args.temporary_dir: args.temporary_dir.mkdir(parents=True, exist_ok=True) @@ -293,50 +317,62 @@ def main(): ui.override_game_version(args.from_ver) if args.get_game_version: - ui.get_game_version() + await ui.get_game_version() if args.check_telemetry: - ui.check_telemetry() + await ui.check_telemetry() + + # Download if args.download_game: - ui.download_game() + await ui.download_game() if args.download_voiceover: - ui.download_voiceover(args.download_voiceover) + await ui.download_voiceover(args.download_voiceover) if args.download_game_update: - ui.download_game_update() + await ui.download_game_update() if args.download_voiceover_update: - ui.download_voiceover_update(args.download_voiceover_update) + await ui.download_voiceover_update(args.download_voiceover_update) + + # Install if args.install: - ui.install_game() - - if args.update_all: - raise NotImplementedError() # TODO - - if args.update: - ui.update_game_voiceover(args.update) - - if args.update_voiceover: - ui.update_voiceover(args.update_voiceover) + await ui.install_game() if args.install_from_file: - ui.install_from_file(args.install_from_file) + await ui.install_from_file(args.install_from_file) if args.install_voiceover_from_file: - ui.install_voiceover_from_file(args.install_voiceover_from_file) + await ui.install_voiceover_from_file(args.install_voiceover_from_file) + + # Update + + if args.update_all: + await ui.update_all() + + if args.update: + await ui.update_game_voiceover(args.update) + + if args.update_voiceover: + await ui.update_voiceover(args.update_voiceover) + + # Patch if args.patch: - ui.patch_game(args.login_fix) + await ui.patch_game(args.login_fix) if args.remove_patch: - ui.revert_patch() + await ui.revert_patch() - if interactive_mode: - ui.interactive_ui() + # Verify + if args.verify: + await ui.verify_game() + + if args.clear_cache: + await ui.clear_cache() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/worthless/constants.py b/worthless/constants.py index c72fd88..a5376d4 100644 --- a/worthless/constants.py +++ b/worthless/constants.py @@ -1,5 +1,9 @@ +from appdirs import AppDirs + + APP_NAME="worthless" APP_AUTHOR="tretrauit" +APPDIRS=AppDirs(APP_NAME, APP_AUTHOR) LAUNCHER_API_URL_OS = "https://sdk-os-static.hoyoverse.com/hk4e_global/mdk/launcher/api" LAUNCHER_API_URL_CN = "https://sdk-static.mihoyo.com/hk4e_cn/mdk/launcher/api" HDIFFPATCH_GIT_URL="https://github.com/sisong/HDiffPatch" diff --git a/worthless/installer.py b/worthless/installer.py index e4336bc..17213ba 100644 --- a/worthless/installer.py +++ b/worthless/installer.py @@ -3,10 +3,9 @@ import re import shutil import platform import aiohttp -import appdirs import zipfile -import warnings import json +import hashlib from pathlib import Path from configparser import ConfigParser from aiopath import AsyncPath @@ -23,19 +22,21 @@ async def _download_file(file_url: str, file_name: str, file_path: Path | str, f :param file_name: :return: """ - params = {} + headers = {} file_path = AsyncPath(file_path).joinpath(file_name) if overwrite: await file_path.unlink(missing_ok=True) if await file_path.exists(): - cur_len = len(await file_path.read_bytes()) - params |= { + cur_len = (await file_path.stat()).st_size + headers |= { "Range": f"bytes={cur_len}-{file_len if file_len else ''}" } else: await file_path.touch() async with aiohttp.ClientSession() as session: - rsp = await session.get(file_url, params=params, timeout=None) + rsp = await session.get(file_url, headers=headers, timeout=None) + if rsp.status == 416: + return rsp.raise_for_status() while True: chunk = await rsp.content.read(chunks) @@ -51,7 +52,7 @@ class HDiffPatch: git_url = constants.HDIFFPATCH_GIT_URL self._git_url = git_url if not data_dir: - self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) + self._appdirs = constants.APPDIRS self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("HDiffPatch") self.data_path = Path(self._appdirs.user_data_dir).joinpath("Tools/HDiffPatch") else: @@ -98,7 +99,7 @@ class HDiffPatch: hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "") return self._get_hdiffpatch_exec(hpatchz_name) - async def patch_file(self, in_file, out_file, patch_file, wait=False): + async def patch_file(self, in_file, out_file, patch_file, error=False, wait=False): hpatchz = self.get_hpatchz_executable() if not hpatchz: raise RuntimeError("hpatchz executable not found") @@ -106,6 +107,8 @@ class HDiffPatch: if not wait: return proc await proc.wait() + if error and proc.returncode != 0: + raise RuntimeError(f"Patching failed, return code is {proc.returncode}") return proc def get_hdiffz_executable(self): @@ -141,91 +144,33 @@ class HDiffPatch: await _download_file(url, name, self.temp_path, overwrite=True) if not extract: return - archive = zipfile.ZipFile(self.temp_path.joinpath(name)) - archive.extractall(self.data_path) - archive.close() + with zipfile.ZipFile(self.temp_path.joinpath(name), 'r') as f: + await asyncio.to_thread(f.extractall, path=self.data_path) class Installer: - def _read_version_from_config(self): - warnings.warn("This function is not reliable as upgrading game version from worthless\ - doesn't write the config.", DeprecationWarning) - if not self._config_file.exists(): - raise FileNotFoundError(f"Config file {self._config_file} not found") - cfg = ConfigParser() - cfg.read(str(self._config_file)) - return cfg.get("General", "game_version") - - @staticmethod - def read_version_from_game_file(globalgamemanagers: Path | bytes): - """ - Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) - - Uses `An Anime Game Launcher` method to read the version: - https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26 - - :return: Game version (ex 1.0.0) - """ - if isinstance(globalgamemanagers, Path): - with globalgamemanagers.open("rb") as f: - data = f.read().decode("ascii", errors="ignore") - else: - data = globalgamemanagers.decode("ascii", errors="ignore") - result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data) - if not result: - raise ValueError("Could not find version in game file") - return result.group(1) - - def get_game_data_name(self): - if self._overseas: - return "GenshinImpact_Data/" - else: - return "YuanShen_Data/" - - def get_game_data_path(self): - return self._gamedir.joinpath(self.get_game_data_name()) - - def get_game_version(self): - globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers") - if not globalgamemanagers.exists(): - return - return self.read_version_from_game_file(globalgamemanagers) - - def get_installed_voiceovers(self): - """ - Returns a list of installed voiceovers. - - :return: List of installed voiceovers - """ - voiceovers = [] - for file in self.get_game_data_path().joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir(): - if file.is_dir(): - voiceovers.append(file.name) - return voiceovers - - def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None): - if isinstance(gamedir, str): - gamedir = Path(gamedir) + def __init__(self, gamedir: str | Path | AsyncPath = AsyncPath.cwd(), + overseas: bool = True, data_dir: str | Path | AsyncPath = None): + if isinstance(gamedir, str | Path): + gamedir = AsyncPath(gamedir) self._gamedir = gamedir if not data_dir: - self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) - self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("Installer") + self._appdirs = constants.APPDIRS + self.temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath("Installer") else: - if not isinstance(data_dir, Path): - data_dir = Path(data_dir) + if isinstance(data_dir, str | AsyncPath): + data_dir = AsyncPath(data_dir) self.temp_path = data_dir.joinpath("Temp/Installer/") - self.temp_path.mkdir(parents=True, exist_ok=True) + Path(self.temp_path).mkdir(parents=True, exist_ok=True) config_file = self._gamedir.joinpath("config.ini") - self._config_file = config_file.resolve() + self._config_file = config_file self._download_chunk = 8192 self._overseas = overseas - self._version = self.get_game_version() + self._version = None self._launcher = Launcher(self._gamedir, overseas=self._overseas) self._hdiffpatch = HDiffPatch(data_dir=data_dir) self._config = LauncherConfig(self._config_file, self._version) - - def set_download_chunk(self, chunk: int): - self._download_chunk = chunk + self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+") async def _download_file(self, file_url: str, file_name: str, file_len: int = None, overwrite=False): """ @@ -234,76 +179,143 @@ class Installer: :param file_name: :return: """ - await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite) + await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite, + chunks=self._download_chunk) - def get_game_archive_version(self, game_archive: str | Path): - if not game_archive.exists(): - raise FileNotFoundError(f"Game archive {game_archive} not found") - archive = zipfile.ZipFile(game_archive, 'r') - return self.read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers")) + async def read_version_from_config(self): + if not await self._config_file.exists(): + raise FileNotFoundError(f"Config file {self._config_file} not found") + cfg = ConfigParser() + await asyncio.to_thread(cfg.read, str(self._config_file)) + return cfg.get("General", "game_version") + + async def read_version_from_game_file(self, globalgamemanagers: AsyncPath | Path | bytes) -> str: + """ + Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) + + Uses `An Anime Game Launcher` method to read the version: + https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26 + + :return: Game version (ex 1.0.0) + """ + if isinstance(globalgamemanagers, Path | AsyncPath): + globalgamemanagers = AsyncPath(globalgamemanagers) + data = await globalgamemanagers.read_text("ascii", errors="ignore") + else: + data = globalgamemanagers.decode("ascii", errors="ignore") + result = self._game_version_re.search(data) + if not result: + raise ValueError("Could not find version in game file") + return result.group(1) @staticmethod - def voiceover_lang_translate(lang: str): + def voiceover_lang_translate(lang: str, base_language="game") -> str: """ Translates the voiceover language to the language code used by the game. - :param lang: Language to translate + :param base_language: Base language type (game/locale/both) :return: Language code """ - match lang: - case "English(US)": - return "en-us" - case "Japanese": - return "ja-jp" - case "Chinese": - return "zh-cn" - case "Korean": - return "ko-kr" + if base_language == "game" or base_language == "both": + match lang.lower(): + case "english(us)": + return "en-us" + case "japanese": + return "ja-jp" + case "chinese": + return "zh-cn" + case "korean": + return "ko-kr" + if base_language == "locale" or base_language == "both": + match lang.lower().replace("_", "-"): + case "en-us": + return "English(US)" + case "ja-jp": + return "Japanese" + case "zh-cn": + return "Chinese" + case "ko-kr": + return "Korean" + # If nothing else matches return lang @staticmethod - def get_voiceover_archive_language(voiceover_archive: str | Path): - if isinstance(voiceover_archive, str): + async def get_voiceover_archive_language(voiceover_archive: str | Path | AsyncPath) -> str: + if isinstance(voiceover_archive, str | Path): voiceover_archive = Path(voiceover_archive).resolve() if not voiceover_archive.exists(): raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") - archive = zipfile.ZipFile(voiceover_archive, 'r') - archive_path = zipfile.Path(archive) - for file in archive_path.iterdir(): - if file.name.endswith("_pkg_version"): - return file.name.split("_")[1] + with zipfile.ZipFile(voiceover_archive, 'r') as f: + for file in zipfile.Path(f).iterdir(): + if file.name.endswith("_pkg_version"): + return file.name.split("_")[1] - def get_voiceover_archive_type(self, voiceover_archive: str | Path): - vo_lang = self.get_voiceover_archive_language(voiceover_archive) - archive = zipfile.ZipFile(voiceover_archive, 'r') - archive_path = zipfile.Path(archive) - files = archive.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n") - for file in files: - if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists(): - return False + @staticmethod + async def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool: + """ + Gets voiceover archive type. + :param voiceover_archive: + :return: True if this is a full archive, else False. + """ + vo_lang = Installer.get_voiceover_archive_language(voiceover_archive) + with zipfile.ZipFile(voiceover_archive, 'r') as f: + archive_path = zipfile.Path(f) + files = (await asyncio.to_thread(f.read, "Audio_{}_pkg_version".format(vo_lang))).decode().split("\n") + for file in files: + if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists(): + return False return True - def apply_voiceover(self, voiceover_archive: str | Path): - # Since Voiceover packages are unclear about diff package or full package - # we will try to extract the voiceover package and apply it to the game - # making this function universal for both cases - if not self.get_game_data_path().exists(): - raise FileNotFoundError(f"Game not found in {self._gamedir}") - if isinstance(voiceover_archive, str): - voiceover_archive = Path(voiceover_archive).resolve() - if not voiceover_archive.exists(): - raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found") - archive = zipfile.ZipFile(voiceover_archive, 'r') - archive.extractall(self._gamedir) - archive.close() + def set_download_chunk(self, chunk: int): + self._download_chunk = chunk - async def update_game(self, game_archive: str | Path): - if not self.get_game_data_path().exists(): + def get_game_data_name(self): + if self._overseas: + return "GenshinImpact_Data/" + else: + return "YuanShen_Data/" + + def get_game_data_path(self) -> AsyncPath: + return self._gamedir.joinpath(self.get_game_data_name()) + + async def get_game_archive_version(self, game_archive: str | Path): + if not game_archive.exists(): + raise FileNotFoundError(f"Game archive {game_archive} not found") + with zipfile.ZipFile(game_archive, 'r') as f: + return await self.read_version_from_game_file( + await asyncio.to_thread(f.read, self.get_game_data_name() + "globalgamemanagers") + ) + + async def get_game_version(self) -> str | None: + globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers") + if not await globalgamemanagers.exists(): + try: + return await self.read_version_from_config() + except FileNotFoundError: + return + return await self.read_version_from_game_file(globalgamemanagers) + + async def get_installed_voiceovers(self) -> list[str]: + """ + Returns a list of installed voiceovers. + + :return: List of installed voiceovers + """ + voiceovers = [] + async for file in self.get_game_data_path()\ + .joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir(): + if await file.is_dir(): + voiceovers.append(file.name) + return voiceovers + + async def update_game(self, game_archive: str | Path | AsyncPath): + if not await self.get_game_data_path().exists(): raise FileNotFoundError(f"Game not found in {self._gamedir}") - if isinstance(game_archive, str): + if isinstance(game_archive, str | Path): game_archive = Path(game_archive).resolve() if not game_archive.exists(): raise FileNotFoundError(f"Update archive {game_archive} not found") + archive = zipfile.ZipFile(game_archive, 'r') if not self._hdiffpatch.get_hpatchz_executable(): @@ -320,13 +332,13 @@ class Installer: # hdiffpatch implementation hdifffiles = [] - for x in archive.read("hdifffiles.txt").decode().split("\n"): + for x in (await asyncio.to_thread(archive.read, "hdifffiles.txt")).decode().split("\n"): if x: hdifffiles.append(json.loads(x)["remoteName"]) patch_jobs = [] for file in hdifffiles: current_game_file = self._gamedir.joinpath(file) - if not current_game_file.exists(): + if not await current_game_file.exists(): # Not patching since we don't have the file continue @@ -341,10 +353,11 @@ class Installer: patch_path, wait=True) patch_path.unlink() if proc.returncode != 0: - # Let the game redownload the file. + # Let the game download the file. old_file.rename(old_file.with_suffix(old_suffix)) return old_file.unlink() + files.remove(patch_file) patch_jobs.append(extract_and_patch(current_game_file, patch_file)) @@ -353,15 +366,16 @@ class Installer: deletefiles = archive.read("deletefiles.txt").decode().split("\n") for file in deletefiles: current_game_file = self._gamedir.joinpath(file) - if not current_game_file.exists(): + if not await current_game_file.exists(): continue if current_game_file.is_file(): current_game_file.unlink(missing_ok=True) - archive.extractall(self._gamedir, members=files) + await asyncio.to_thread(archive.extractall, self._gamedir, members=files) archive.close() # Update game version on local variable. - self._version = self.get_game_version() + self._version = await self.get_game_version() + self.set_version_config() def set_version_config(self, version: str = None): if not version: @@ -370,11 +384,11 @@ class Installer: self._config.save() async def download_full_game(self): - archive = await self._launcher.get_resource_info() - if archive is None: + resource = await self._launcher.get_resource_info() + if resource is None: raise RuntimeError("Failed to fetch game resource info.") - archive_name = archive.game.latest.path.split("/")[-1] - await self._download_file(archive.game.latest.path, archive_name, archive.game.latest.size) + archive_name = resource.game.latest.path.split("/")[-1] + await self._download_file(resource.game.latest.path, archive_name, resource.game.latest.size) async def download_full_voiceover(self, language: str): archive = await self._launcher.get_resource_info() @@ -383,19 +397,57 @@ class Installer: translated_lang = self.voiceover_lang_translate(language) for vo in archive.game.latest.voice_packs: if vo.language == translated_lang: - await self._download_file(vo.path, vo.name, vo.size) + await self._download_file(vo.path, vo.get_name(), vo.size) - async def download_game_update(self, from_version: str = None): + async def uninstall_game(self): + await asyncio.to_thread(shutil.rmtree, self._gamedir, ignore_errors=True) + + async def _extract_game_file(self, archive: str | Path | AsyncPath): + if isinstance(archive, str | AsyncPath): + archive = Path(archive).resolve() + if not archive.exists(): + raise FileNotFoundError(f"'{archive}' not found") + with zipfile.ZipFile(archive, 'r') as f: + await asyncio.to_thread(f.extractall, path=self._gamedir) + + async def apply_voiceover(self, voiceover_archive: str | Path): + # Since Voiceover packages are unclear about diff package or full package + # we will try to extract the voiceover package and apply it to the game + # making this function universal for both cases + if not await self.get_game_data_path().exists(): + raise FileNotFoundError(f"Game not found in {self._gamedir}") + await self._extract_game_file(voiceover_archive) + + async def install_game(self, game_archive: str | Path | AsyncPath, force_reinstall: bool = False): + """Installs the game to the current directory + + If `force_reinstall` is True, the game will be uninstalled then reinstalled. + """ + if await self.get_game_data_path().exists(): + if not force_reinstall: + raise ValueError(f"Game is already installed in {self._gamedir}") + await self.uninstall_game() + + await self._gamedir.mkdir(parents=True, exist_ok=True) + await self._extract_game_file(game_archive) + self._version = await self.get_game_version() + self.set_version_config() + + async def _get_game_resource(self, from_version: str = None): if not from_version: if self._version: from_version = self._version else: - from_version = self._version = self.get_game_version() + from_version = self._version = await self.get_game_version() if not from_version: raise ValueError("No game version found") - version_info = await self._launcher.get_resource_info() - if version_info is None: - raise RuntimeError("Failed to fetch game resource info.") + game_resource = await self._launcher.get_resource_info() + if not game_resource: + raise ValueError("Could not fetch game resource") + return game_resource + + async def download_game_update(self, from_version: str = None): + version_info = await self._get_game_resource() if self._version == version_info.game.latest.version: raise ValueError("Game is already up to date.") diff_archive = await self.get_game_diff_archive(from_version) @@ -404,56 +456,17 @@ class Installer: await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size) async def download_voiceover_update(self, language: str, from_version: str = None): - if not from_version: - if self._version: - from_version = self._version - else: - from_version = self._version = self.get_game_version() - if not from_version: - raise ValueError("No game version found") - version_info = await self._launcher.get_resource_info() - if version_info is None: - raise RuntimeError("Failed to fetch game resource info.") diff_archive = await self.get_voiceover_diff_archive(language, from_version) if diff_archive is None: raise ValueError("Voiceover diff archive is not available for this version, please reinstall.") await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size) - def uninstall_game(self): - shutil.rmtree(self._gamedir) - - def install_game(self, game_archive: str | Path, force_reinstall: bool = False): - """Installs the game to the current directory - - If `force_reinstall` is True, the game will be uninstalled then reinstalled. - """ - if self.get_game_data_path().exists(): - if not force_reinstall: - raise ValueError(f"Game is already installed in {self._gamedir}") - self.uninstall_game() - - self._gamedir.mkdir(parents=True, exist_ok=True) - if isinstance(game_archive, str): - game_archive = Path(game_archive).resolve() - if not game_archive.exists(): - raise FileNotFoundError(f"Install archive {game_archive} not found") - archive = zipfile.ZipFile(game_archive, 'r') - archive.extractall(self._gamedir) - archive.close() - async def get_voiceover_diff_archive(self, lang: str, from_version: str = None): """Gets a diff archive from `from_version` to the latest one If from_version is not specified, it will be taken from the game version. """ - if not from_version: - if self._version: - from_version = self._version - else: - from_version = self._version = self.get_game_version() - if not from_version: - raise ValueError("No game version found") - game_resource = await self._launcher.get_resource_info() + game_resource = await self._get_game_resource() if not game_resource: raise ValueError("Could not fetch game resource") translated_lang = self.voiceover_lang_translate(lang) @@ -470,16 +483,48 @@ class Installer: If from_version is not specified, it will be taken from the game version. """ - if not from_version: - if self._version: - from_version = self._version - else: - from_version = self._version = self.get_game_version() - if not from_version: - raise ValueError("No game version found") - game_resource = await self._launcher.get_resource_info() - if not game_resource: - raise ValueError("Could not fetch game resource") + game_resource = await self._get_game_resource() for v in game_resource.game.diffs: if v.version == from_version: return v + + async def verify_from_pkg_version(self, pkg_version: AsyncPath, ignore_mismatch=False): + contents = await pkg_version.read_text() + + async def calculate_md5(file_to_calculate): + async with AsyncPath(file_to_calculate).open("rb") as f: + file_hash = hashlib.md5() + while chunk := await f.read(self._download_chunk): + file_hash.update(chunk) + return file_hash.hexdigest() + + async def verify_file(file_to_verify, md5): + file_md5 = await calculate_md5(file_to_verify) + if file_md5 == md5: + return None + if ignore_mismatch: + return file_to_verify, md5, file_md5 + raise ValueError(f"MD5 does not match for {file_to_verify}, expected md5: {md5}, actual md5: {file_md5}") + + verify_jobs = [] + for content in contents.split("\r\n"): + if not content.strip(): + continue + info = json.loads(content) + verify_jobs.append(verify_file(self._gamedir.joinpath(info["remoteName"]), info["md5"])) + + verify_result = await asyncio.gather(*verify_jobs) + failed_files = [] + for file in verify_result: + if file is not None: + failed_files.append(file) + + return None if not failed_files else failed_files + + async def verify_game(self, pkg_version: str | Path | AsyncPath = None, ignore_mismatch=False): + if pkg_version is None: + pkg_version = self._gamedir.joinpath("pkg_version") + return await self.verify_from_pkg_version(pkg_version, ignore_mismatch) + + async def clear_cache(self): + await asyncio.to_thread(shutil.rmtree, self.temp_path, ignore_errors=True) diff --git a/worthless/launcher.py b/worthless/launcher.py index a320e90..ffb948f 100644 --- a/worthless/launcher.py +++ b/worthless/launcher.py @@ -1,5 +1,8 @@ import aiohttp import locale + +from aiopath import AsyncPath + from worthless import constants from pathlib import Path from worthless.classes import launcher, installer @@ -60,8 +63,8 @@ class Launcher: "launcher_id": "18", "channel_id": "1" } - self._lang = "zh-cn" # Use chinese language because this is Pooh version - if isinstance(gamedir, str): + self._lang = "zh-cn" # Use chinese language because this is chinese version + if isinstance(gamedir, str | AsyncPath): gamedir = Path(gamedir) self._gamedir = gamedir.resolve() diff --git a/worthless/launcherconfig.py b/worthless/launcherconfig.py index 5446bd9..bc02e4b 100644 --- a/worthless/launcherconfig.py +++ b/worthless/launcherconfig.py @@ -1,6 +1,8 @@ from configparser import ConfigParser from pathlib import Path +from aiopath import AsyncPath + class LauncherConfig: """ @@ -23,8 +25,8 @@ class LauncherConfig: return config def __init__(self, config_path, game_version=None, overseas=True): - if isinstance(config_path, str): - self.config_path = Path(config_path) + if isinstance(config_path, str | AsyncPath): + config_path = Path(config_path) if not game_version: game_version = "0.0.0" self.config_path = config_path diff --git a/worthless/linux.py b/worthless/linux.py index 61c7b28..18dd613 100644 --- a/worthless/linux.py +++ b/worthless/linux.py @@ -1,5 +1,6 @@ import asyncio from pathlib import Path +from aiopath import AsyncPath class LinuxUtils: @@ -8,9 +9,12 @@ class LinuxUtils: def __init__(self): pass - async def _exec_command(self, args): + @staticmethod + async def _exec_command(args): """Execute a command using pkexec (friendly gui) """ + if not await AsyncPath("/usr/bin/pkexec").exists(): + raise FileNotFoundError("pkexec not found.") rsp = await asyncio.create_subprocess_shell(args) await rsp.wait() match rsp.returncode: @@ -21,14 +25,14 @@ class LinuxUtils: return rsp - async def write_text_to_file(self, text, file_path: str | Path): + async def write_text_to_file(self, text, file_path: str | Path | AsyncPath): """Write text to a file using pkexec (friendly gui) """ - if isinstance(file_path, Path): + if isinstance(file_path, Path | AsyncPath): file_path = str(file_path) await self._exec_command('echo -e "{}" | pkexec tee {}'.format(text, file_path)) - async def append_text_to_file(self, text, file_path: str | Path): + async def append_text_to_file(self, text, file_path: str | Path | AsyncPath): """Append text to a file using pkexec (friendly gui) """ if isinstance(file_path, Path): diff --git a/worthless/patcher.py b/worthless/patcher.py index ba37f64..7766036 100644 --- a/worthless/patcher.py +++ b/worthless/patcher.py @@ -1,11 +1,13 @@ import os import platform import tarfile -import appdirs from pathlib import Path import shutil import aiohttp import asyncio + +from aiopath import AsyncPath + from worthless import constants from worthless.launcher import Launcher from worthless.installer import Installer @@ -26,16 +28,19 @@ except ImportError: class Patcher: - def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True): + def __init__(self, gamedir: Path | AsyncPath | str = AsyncPath.cwd(), data_dir: str | Path | AsyncPath = None, + patch_url: str = None, overseas=True): + if isinstance(gamedir, str | Path): + gamedir = AsyncPath(gamedir) self._gamedir = gamedir self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://') if not data_dir: - self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) - self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch") - self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Patcher") + self._appdirs = constants.APPDIRS + self._patch_path = AsyncPath(self._appdirs.user_data_dir).joinpath("Patch") + self._temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath("Patcher") else: - if not isinstance(data_dir, Path): - data_dir = Path(data_dir) + if isinstance(data_dir, str | Path): + data_dir = AsyncPath(data_dir) self._patch_path = data_dir.joinpath("Patch") self._temp_path = data_dir.joinpath("Temp/Patcher") self._overseas = overseas @@ -43,7 +48,7 @@ class Patcher: self._launcher = Launcher(self._gamedir, overseas=overseas) match platform.system(): case "Linux": - self._linuxutils = linux.LinuxUtils() + self._linux = linux.LinuxUtils() @staticmethod async def _get(url, **kwargs) -> aiohttp.ClientResponse: @@ -76,15 +81,15 @@ class Patcher: else: return await archive.read() - async def _download_repo(self): - if shutil.which("git"): - if not self._patch_path.exists() or not self._patch_path.is_dir() \ - or not self._patch_path.joinpath(".git").exists(): + async def _download_repo(self, fallback=False): + if shutil.which("git") and not fallback: + if not await self._patch_path.is_dir() or not await self._patch_path.joinpath(".git").exists(): proc = await asyncio.create_subprocess_exec("git", "clone", self._patch_url, str(self._patch_path)) - await proc.wait() else: proc = await asyncio.create_subprocess_exec("git", "pull", cwd=str(self._patch_path)) - await proc.wait() + await proc.wait() + if proc.returncode != 0: + raise RuntimeError("Cannot download patch repository through git.") else: archive = await self._get_git_archive() if not archive: @@ -120,24 +125,18 @@ class Patcher: telemetry_url = constants.TELEMETRY_URL_LIST else: telemetry_url = constants.TELEMETRY_URL_CN_LIST + if optional: + telemetry_url |= constants.TELEMETRY_OPTIONAL_URL_LIST unblocked_list = [] async with aiohttp.ClientSession() as session: for url in telemetry_url: try: - await session.get("https://" + url) - except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): + await session.get("https://" + url, timeout=15) + except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError, asyncio.exceptions.TimeoutError): continue else: unblocked_list.append(url) - if optional: - for url in constants.TELEMETRY_OPTIONAL_URL_LIST: - try: - await session.get("https://" + url) - except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError): - continue - else: - unblocked_list.append(url) - return None if unblocked_list == [] else unblocked_list + return None if not unblocked_list else unblocked_list async def block_telemetry(self, optional=False): telemetry = await self.is_telemetry_blocked(optional) @@ -148,20 +147,15 @@ class Patcher: telemetry_hosts += "0.0.0.0 " + url + "\n" match platform.system(): case "Linux": - await self._linuxutils.append_text_to_file(telemetry_hosts, "/etc/hosts") + await self._linux.append_text_to_file(telemetry_hosts, "/etc/hosts") return # TODO: Windows and macOS raise NotImplementedError("Platform not implemented.") - async def _patch_unityplayer_fallback(self): - # xdelta3-python doesn't work because it's outdated. - if self._overseas: - patch = "unityplayer_patch_os.vcdiff" - else: - patch = "unityplayer_patch_cn.vcdiff" - gamever = "".join(self._installer.get_game_version().split(".")) + async def _patch_unityplayer_fallback(self, patch): + gamever = "".join((await self._installer.get_game_version()).split(".")) unity_path = self._gamedir.joinpath("UnityPlayer.dll") - unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak")) + await unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak")) proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s", str(self._gamedir.joinpath("UnityPlayer.dll.bak")), str(self._patch_path.joinpath( @@ -169,16 +163,11 @@ class Patcher: str(self._gamedir.joinpath("UnityPlayer.dll")), cwd=self._gamedir) await proc.wait() - async def _patch_xlua_fallback(self): - # xdelta3-python doesn't work becuase it's outdated. - if self._overseas: - patch = "unityplayer_patch_os.vcdiff" - else: - patch = "unityplayer_patch_cn.vcdiff" - gamever = "".join(self._installer.get_game_version().split(".")) + async def _patch_xlua_fallback(self, patch): + gamever = "".join((await self._installer.get_game_version()).split(".")) data_name = self._installer.get_game_data_name() xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name)) - xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name))) + await xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name))) proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s", str(self._gamedir.joinpath( "{}/Plugins/xlua.dll.bak".format(data_name))), @@ -189,12 +178,8 @@ class Patcher: cwd=self._gamedir) await proc.wait() - def _patch_unityplayer(self): - if self._overseas: - patch = "unityplayer_patch_os.vcdiff" - else: - patch = "unityplayer_patch_cn.vcdiff" - gamever = "".join(self._installer.get_game_version().split(".")) + async def _patch_unityplayer(self, patch): + gamever = "".join((await self._installer.get_game_version()).split(".")) unity_path = self._gamedir.joinpath("UnityPlayer.dll") patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes() patched_unity_bytes = xdelta3.decode(unity_path.read_bytes(), patch_bytes) @@ -202,9 +187,8 @@ class Patcher: with Path(self._gamedir.joinpath("UnityPlayer.dll")).open("wb") as f: f.write(patched_unity_bytes) - def _patch_xlua(self): - patch = "xlua_patch.vcdiff" - gamever = "".join(self._installer.get_game_version().split(".")) + async def _patch_xlua(self, patch): + gamever = "".join((await self._installer.get_game_version()).split(".")) data_name = self._installer.get_game_data_name() xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name)) patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes() @@ -213,13 +197,17 @@ class Patcher: with Path(self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))).open("wb") as f: f.write(patched_xlua_bytes) - def apply_xlua_patch(self, fallback=True): + async def apply_xlua_patch(self, fallback=True): + if self._overseas: + patch = "xlua_patch_os.vcdiff" + else: + patch = "xlua_patch_cn.vcdiff" if NO_XDELTA3_MODULE or fallback: - asyncio.run(self._patch_xlua_fallback()) + await self._patch_xlua_fallback(patch) return - self._patch_xlua() + await self._patch_xlua(patch) - def apply_patch(self, crash_fix=False, fallback=True) -> None: + async def apply_patch(self, crash_fix=False, fallback=True) -> None: """ Patch the game (and optionally patch xLua if specified) @@ -228,25 +216,36 @@ class Patcher: :return: None """ # Patch UnityPlayer.dll - if NO_XDELTA3_MODULE or fallback: - asyncio.run(self._patch_unityplayer_fallback()) + # xdelta3-python doesn't work because it's outdated. + if self._overseas: + patch = "unityplayer_patch_os.vcdiff" else: - self._patch_unityplayer() + patch = "unityplayer_patch_cn.vcdiff" + patch_jobs = [] + if NO_XDELTA3_MODULE or fallback: + patch_jobs.append(self._patch_unityplayer_fallback(patch)) + else: + patch_jobs.append(self._patch_unityplayer(patch)) # Patch xLua.dll if crash_fix: - self.apply_xlua_patch(fallback=fallback) + patch_jobs.append(self.apply_xlua_patch(fallback=fallback)) # Disable crash reporters - disable_files = [ - self._installer.get_game_data_name() + "upload_crash.exe", - self._installer.get_game_data_name() + "Plugins/crashreport.exe", - ] - for file in disable_files: - file_path = Path(file).resolve() - if file_path.exists(): - file_path.rename(str(file_path) + ".bak") + + async def disable_crashreporters(): + disable_files = [ + self._installer.get_game_data_name() + "upload_crash.exe", + self._installer.get_game_data_name() + "Plugins/crashreport.exe", + ] + for file in disable_files: + file_path = Path(file).resolve() + if file_path.exists(): + file_path.rename(str(file_path) + ".bak") + + patch_jobs.append(disable_crashreporters()) + await asyncio.gather(*patch_jobs) @staticmethod - def _creation_date(file_path: Path): + async def _creation_date(file_path: AsyncPath): """ Try to get the date that a file was created, falling back to when it was last modified if that isn't possible. @@ -255,7 +254,7 @@ class Patcher: if platform.system() == 'Windows': return os.path.getctime(file_path) else: - stat = file_path.stat() + stat = await file_path.stat() try: return stat.st_birthtime except AttributeError: @@ -263,11 +262,11 @@ class Patcher: # so we'll settle for when its content was last modified. return stat.st_mtime - def _revert_file(self, original_file: str, base_file: Path, ignore_error=False): - original_path = self._gamedir.joinpath(original_file + ".bak").resolve() - target_file = self._gamedir.joinpath(original_file).resolve() - if original_path.exists(): - if abs(self._creation_date(base_file) - self._creation_date(original_path)) > 86400: # 24 hours + async def _revert_file(self, original_file: str, base_file: AsyncPath, ignore_error=False): + original_path = await self._gamedir.joinpath(original_file + ".bak").resolve() + target_file = await self._gamedir.joinpath(original_file).resolve() + if await original_path.exists(): + if abs(await self._creation_date(base_file) - await self._creation_date(original_path)) > 86400: # 24 hours if not ignore_error: raise RuntimeError("{} is not for this game version.".format(original_path.name)) original_path.unlink(missing_ok=True) @@ -275,30 +274,36 @@ class Patcher: target_file.unlink(missing_ok=True) original_path.rename(target_file) - def revert_patch(self, ignore_errors=True) -> None: + async def revert_patch(self, ignore_errors=True) -> None: """ - Revert the patch (and revert the login door crash fix if patched) + Revert the patch (and revert the xLua patch if patched) :return: None """ - game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry) + game_exec = self._gamedir.joinpath((await self._launcher.get_resource_info()).game.latest.entry) revert_files = [ "UnityPlayer.dll", self._installer.get_game_data_name() + "upload_crash.exe", self._installer.get_game_data_name() + "Plugins/crashreport.exe", self._installer.get_game_data_name() + "Plugins/xlua.dll", ] + revert_job = [] for file in revert_files: - self._revert_file(file, game_exec, ignore_errors) + revert_job.append(self._revert_file(file, game_exec, ignore_errors)) for file in ["launcher.bat", "mhyprot2_running.reg"]: - self._gamedir.joinpath(file).unlink(missing_ok=True) + revert_job.append(self._gamedir.joinpath(file).unlink(missing_ok=True)) - def get_files(extensions): + async def get_files(extensions): all_files = [] for ext in extensions: - all_files.extend(self._gamedir.glob(ext)) + all_files.extend(await self._gamedir.glob(ext)) return all_files - files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log')) + files = await get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log')) for file in files: - file.unlink(missing_ok=True) + revert_job.append(file.unlink(missing_ok=True)) + + await asyncio.gather(*revert_job) + + async def clear_cache(self): + await asyncio.to_thread(shutil.rmtree, self._temp_path, ignore_errors=True)