diff --git a/worthless/cli.py b/worthless/cli.py index d7b0835..336e825 100755 --- a/worthless/cli.py +++ b/worthless/cli.py @@ -1,592 +1,2 @@ -#!/usr/bin/python3 - -import argparse -import asyncio - -import appdirs -from pathlib import Path -from worthless.launcher import Launcher -from worthless.game import Game as Installer -from worthless.patcher import Patcher -import worthless.constants as constants - - -class UI: - def __init__( - self, - gamedir: str, - noconfirm: bool, - tempdir: str | Path = None, - pre_download=False, - ) -> None: - self._vo_version = None - self._noconfirm = noconfirm - self._gamedir = gamedir - self._launcher = Launcher(gamedir) - self._installer = Installer(gamedir, data_dir=tempdir) - self._patcher = Patcher(gamedir, data_dir=tempdir) - self._pre_download = pre_download - if self._pre_download: - print("Pre-download is enabled, use at your own risk!") - - def _ask(self, question): - if self._noconfirm: - # Fake dialog - print(question + " [Y/n]:") - return True - - # random text - answer = "kurumi3" - while answer.lower() not in ["y", "n", ""]: - answer = input(question + " [Y/n]: ") - return answer.lower() == "y" or answer == "" - - def override_game_version(self, version: str): - self._installer._version = version - - def override_voiceover_version(self, version: str): - self._vo_version = version - - async def get_game_version(self): - print(await self._installer.get_game_version()) - - async def block_telemetry(self): - print("Checking for available telemetry to block...") - try: - await self._patcher.block_telemetry() - except ValueError: - print("No telemetry to block.") - else: - print("Telemetry blocked.") - - async def check_telemetry(self): - block_status = await self._patcher.is_telemetry_blocked() - if not block_status: - print("Telemetry is blocked.") - else: - print("Telemetry is not blocked, you need to block these hosts below.") - for hosts in block_status: - print(hosts) - - async def _update_from_archive(self, filepath): - print("Reverting patches if patched...") - await self._patcher.revert_patch(True) - print("Updating game from archive (This may takes some time)...") - await self._installer.update_game(filepath) - self._installer.set_version_config() - - async def _install_from_archive(self, filepath, force_reinstall): - print("Installing game from archive (This may takes some time)...") - print(filepath) - await self._installer.install_game(filepath, force_reinstall) - self._installer.set_version_config() - - async def _apply_voiceover_from_archive(self, filepath): - print("Applying voiceover from archive (This may takes some time)...") - print("Voiceover archive:", filepath) - await self._installer.apply_voiceover(filepath) - - async def install_voiceover_from_file(self, filepath): - print( - "Archive voiceover language: {} ({})".format( - await self._installer.get_voiceover_archive_language(filepath), - "Full archive" - if await self._installer.get_voiceover_archive_type(filepath) - else "Update archive", - ) - ) - if not self._ask( - "Do you want to apply this voiceover pack? ({})".format(filepath) - ): - print("Aborting apply process.") - return - await self._apply_voiceover_from_archive(filepath) - print("Voiceover applied successfully.") - - async def revert_patch(self): - print("Reverting patches...") - await self._patcher.revert_patch(True) - print("Patches reverted.") - - async def patch_game(self, login_fix: bool = False): - print("NOTE: Hereby you are violating the game's Terms of Service!") - print("Do not patch the game if you don't know what you are doing!") - if not self._ask( - "Do you want to patch the game? (This will overwrite your game files!)" - ): - print("Aborting patch process.") - return - await self.block_telemetry() - print("Updating patches...") - await self._patcher.download_patch() - print("Patching game...") - await self._patcher.apply_patch(login_fix) - print("Game patched.") - print("Please refrain from sharing this project to public, thank you.") - - async def install_from_file(self, filepath): - gamever = await self._installer.get_game_version() - print( - "Archive game version:", - await self._installer.get_game_archive_version(filepath), - ) - if gamever: - print( - "Current game installation detected. ({})".format( - await self._installer.get_game_version() - ) - ) - if not self._ask("Do you want to update the game? ({})".format(filepath)): - print("Aborting update process.") - return - await self._update_from_archive(filepath) - print("Game updated successfully.") - else: - print("No game installation detected.") - if not self._ask("Do you want to install the game? ({})".format(filepath)): - print("Aborting installation process.") - return - await self._install_from_archive(filepath, False) - print("Game installed successfully.") - - async def download_patch(self): - print("Downloading patches...") - await self._patcher.download_patch() - - async def download_game(self): - print("Downloading full game (This will take a long time)...") - await self._installer.download_full_game(self._pre_download) - - async def download_game_update(self): - print("Downloading game update (This will take a long time)...") - await self._installer.download_game_update(pre_download=self._pre_download) - - async def download_voiceover(self, languages: str): - res_info = await self._launcher.get_resource_info() - for lng in languages.split(" "): - for vo in res_info.game.latest.voice_packs: - if not self._installer.voiceover_lang_translate(lng) == vo.language: - continue - print( - "Downloading voiceover pack for {} (This will take a long time)...".format( - lng - ) - ) - await self._installer.download_full_voiceover( - lng, pre_download=self._pre_download - ) - - async def download_voiceover_update(self, languages: str): - res_info = await self._launcher.get_resource_info() - for lng in languages.split(" "): - for vo in res_info.game.latest.voice_packs: - if not self._installer.voiceover_lang_translate(lng) == vo.language: - continue - print( - "Downloading voiceover update pack for {} (This will take a long time)...".format( - lng - ) - ) - await self._installer.download_voiceover_update( - lng, pre_download=self._pre_download - ) - - async def install_game(self, forced: bool = False): - res_info = await self._launcher.get_resource_info() - game = res_info.game - if self._pre_download: - game = res_info.pre_download_game - print("Latest game version: {}".format(game.latest.version)) - if not self._ask("Do you want to install the game?"): - print("Aborting game installation process.") - return - await self.download_game() - print("Game archive:", game.latest.get_name()) - await self._install_from_archive( - self._installer.temp_path.joinpath(game.latest.get_name()), forced - ) - - async def install_voiceover(self, languages: str): - res_info = await self._launcher.get_resource_info() - for lng in languages.split(" "): - for vo in res_info.game.latest.voice_packs: - if not self._installer.voiceover_lang_translate(lng) == vo.language: - continue - if not self._ask( - "Do you want to install this voiceover pack? ({})".format(lng) - ): - print("Aborting voiceover installation process.") - break - print("Downloading voiceover pack (This will take a long time)...") - await self._installer.download_full_voiceover( - lng, pre_download=self._pre_download - ) - await self._apply_voiceover_from_archive( - self._installer.temp_path.joinpath(vo.get_name()) - ) - - async def update_game(self): - game_ver = await self._installer.get_game_version() - if not game_ver: - await self.install_game() - return - print("Current game installation detected: {}".format(game_ver)) - diff_archive = await self._installer.get_game_diff_archive() - res_info = await self._launcher.get_resource_info() - game = res_info.game - if self._pre_download: - game = res_info.pre_download_game - if not diff_archive: - print("No game updates available.") - return - print("Latest game version: {}".format(game.latest.version)) - if not self._ask("Do you want to update the game?"): - print("Aborting game update process.") - return - print("Downloading game update (This will take a long time)...") - await self._installer.download_game_update(pre_download=self._pre_download) - print("Installing game update...") - await self.install_from_file( - self._installer.temp_path.joinpath(diff_archive.get_name()) - ) - - async def update_voiceover(self, languages: str | list): - if isinstance(languages, str): - languages = languages.split(" ") - game_ver = self._vo_version or await self._installer.get_game_version() - if not game_ver: - print("Couldn't detect current game installation, is game installed?") - return - installed_voiceovers = await self._installer.get_installed_voiceovers() - print( - f"Installed voiceovers: {None if installed_voiceovers == [] else ', '.join(installed_voiceovers)}" - ) - for lng in languages: - if ( - self._installer.voiceover_lang_translate(lng, "locale") - not in installed_voiceovers - ): - await self.install_voiceover(lng) - continue - diff_archive = await self._installer.get_voiceover_diff_archive( - lng, pre_download=self._pre_download - ) - if not diff_archive: - print("No voiceover updates available for {}.".format(lng)) - continue - if not self._ask("Do you want to update this voiceover? ({})".format(lng)): - print("Aborting this voiceover language update process.") - continue - print("Downloading voiceover update (This may takes some time)...") - await self._installer.download_voiceover_update( - lng, pre_download=self._pre_download - ) - print("Installing voiceover update for {}...".format(lng)) - await self._apply_voiceover_from_archive( - self._installer.temp_path.joinpath(diff_archive.get_name()) - ) - - async def update_game_voiceover(self, languages: str): - await self.update_game() - await self.update_voiceover(languages) - - async def update_voiceover_all(self): - await self.update_voiceover(await self._installer.get_installed_voiceovers()) - - async def update_all(self): - await self.update_game() - await self.update_voiceover(await self._installer.get_installed_voiceovers()) - - async def verify_game(self): - game_ver = await self._installer.get_game_version() - if not game_ver: - print("Couldn't detect current game installation, is game installed?") - return - print("Verifying game contents... (This may takes a long time)") - failed_files = await self._installer.verify_game(ignore_mismatch=True) - if not failed_files: - print("All good.") - return - print("Some game files got corrupted (mismatch md5), uh oh.") - for file in failed_files: - print("{}: expected {}, actual {}".format(file[0], file[1], file[2])) - - async def clear_cache(self): - if self._ask( - "Do you want to clear Installer cache (contains downloaded game files, etc)?" - ): - await self._installer.clear_cache() - if self._ask( - "Do you want to clear Patcher cache (contains files used to patch)?" - ): - await self._patcher.clear_cache() - - -async def main_async(): - default_dirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) - parser = argparse.ArgumentParser( - prog="worthless", description="A worthless launcher written in Python." - ) - parser.add_argument( - "-D", - "--dir", - action="store", - type=Path, - default=Path.cwd(), - help="Specify the game directory (default current working directory)", - ) - parser.add_argument( - "-W", - "--temporary-dir", - action="store", - type=Path, - default=None, - help="Specify the temporary directory (default {} and {})".format( - default_dirs.user_data_dir, default_dirs.user_cache_dir - ), - ) - parser.add_argument( - "-S", - "--install", - action="store_true", - help="Install/update the game (if not already installed, else do nothing)", - ) - parser.add_argument( - "-U", - "--install-from-file", - action="store", - type=Path, - default=None, - help="Install the game from an archive (if not already installed, \ - else update from archive)", - ) - parser.add_argument( - "-Uv", - "--install-voiceover-from-file", - action="store", - type=Path, - default=None, - help="Install the voiceover from an archive (if not already installed, \ - else update from archive)", - ) - parser.add_argument( - "-Sp", - "--patch", - action="store_true", - help="Patch the game (if not already patched, else do nothing)", - ) - parser.add_argument( - "--login-fix", - action="store_true", - help="Patch the game to fix login issues (if not already patched, else do nothing)", - ) - parser.add_argument( - "-Sy", - "--update", - action="store", - type=str, - default="", - help="Update the game and specified voiceover pack only (or install if not found)", - ) - parser.add_argument( - "-Sw", - "--download-game", - action="store_true", - help="Download the full game to the temporary directory", - ) - parser.add_argument( - "-Swp", - "--download-patch", - action="store_true", - help="Download/Update the game patch to the temporary directory", - ) - parser.add_argument( - "-Swv", - "--download-voiceover", - action="store", - type=str, - help="Download the full voiceover to the temporary directory", - ) - parser.add_argument( - "-Syw", - "--download-game-update", - action="store", - type=str, - default="", - help="Download the game and the voiceover update to the temporary directory", - ) - parser.add_argument( - "-Sywv", - "--download-voiceover-update", - action="store", - type=str, - help="Download the voiceover update to the temporary directory", - ) - parser.add_argument( - "-Sv", - "--update-voiceover", - action="store", - type=str, - help="Update the voiceover pack only (or install if not found)", - ) - parser.add_argument( - "-Syu", - "--update-all", - action="store_true", - help="Update the game and all installed voiceover packs (or install if not found)", - ) - parser.add_argument( - "-Scc", - "--clear-cache", - action="store_true", - help="Clear cache used by worthless", - ) - parser.add_argument( - "-Rs", "--remove", action="store_true", help="Remove the game (if installed)" - ) - parser.add_argument( - "-Rp", - "--remove-patch", - action="store_true", - help="Revert the game patch (if patched)", - ) - parser.add_argument( - "-Rv", - "--remove-voiceover", - action="store_true", - help="Remove a Voiceover pack (if installed)", - ) - parser.add_argument( - "-V", "--verify", action="store_true", help="Verify the game installation" - ) - parser.add_argument( - "--predownload", - action="store_true", - help="Download the game for the next update", - default=False, - ) - parser.add_argument( - "--get-game-version", action="store_true", help="Get the current game version" - ) - parser.add_argument( - "--no-overseas", action="store_true", help="Don't use overseas server" - ) - parser.add_argument( - "--check-telemetry", - action="store_true", - help="Check for the telemetry information", - ) - parser.add_argument( - "--from-ver", - action="store", - help="Override the detected game version", - type=str, - default=None, - ) - parser.add_argument( - "--from-vo-ver", - action="store", - help="Override the detected game version for voiceover " "detection", - type=str, - default=None, - ) - parser.add_argument( - "--noconfirm", - action="store_true", - help="Do not ask any for confirmation. (Ignored in interactive mode)", - ) - args = parser.parse_args() - if args.temporary_dir: - args.temporary_dir.mkdir(parents=True, exist_ok=True) - - ui = UI(args.dir, args.noconfirm, args.temporary_dir, args.predownload) - - if args.install and args.update: - raise ValueError("Cannot specify both --install and --update arguments.") - - if args.install_from_file and args.update: - raise ValueError( - "Cannot specify both --install-from-file and --update arguments." - ) - - if args.install_voiceover_from_file and args.update: - raise ValueError( - "Cannot specify both --install-voiceover-from-file and --update arguments." - ) - - if args.install_from_file and args.install: - raise ValueError( - "Cannot specify both --install-from-file and --install arguments." - ) - - if args.from_ver: - ui.override_game_version(args.from_ver) - - if args.get_game_version: - await ui.get_game_version() - - if args.check_telemetry: - await ui.check_telemetry() - - if args.clear_cache: - await ui.clear_cache() - - # Download - - if args.download_game: - await ui.download_game() - - if args.download_patch: - await ui.download_patch() - - if args.download_voiceover: - await ui.download_voiceover(args.download_voiceover) - - if args.download_game_update: - await ui.download_game_update() - - if args.download_voiceover_update: - await ui.download_voiceover_update(args.download_voiceover_update) - - # Install - - if args.install: - await ui.install_game() - - if args.install_from_file: - await ui.install_from_file(args.install_from_file) - - if args.install_voiceover_from_file: - await ui.install_voiceover_from_file(args.install_voiceover_from_file) - - # Update - - if args.update_all: - await ui.update_all() - - if args.update: - await ui.update_game_voiceover(args.update) - - if args.update_voiceover: - await ui.update_voiceover(args.update_voiceover) - - # Patch - - if args.patch: - await ui.patch_game(args.login_fix) - - if args.remove_patch: - await ui.revert_patch() - - # Verify - - if args.verify: - await ui.verify_game() - - def main(): - asyncio.run(main_async()) - - -if __name__ == "__main__": - main() + pass diff --git a/worthless/game/__init__.py b/worthless/game/__init__.py index 32a4888..2ccc1fe 100644 --- a/worthless/game/__init__.py +++ b/worthless/game/__init__.py @@ -1,3 +1,3 @@ -from worthless.game.game import * -from worthless.game.gamehelper import Helper -from worthless.game import gameenums +from . import gameenums as enums +from ._game import * +from .gamehelper import Helper diff --git a/worthless/game/game.py b/worthless/game/_game.py similarity index 91% rename from worthless/game/game.py rename to worthless/game/_game.py index 703150c..3806d5c 100644 --- a/worthless/game/game.py +++ b/worthless/game/_game.py @@ -1,14 +1,14 @@ import asyncio +import hashlib +import json +import logging import re import shutil import zipfile -import json -import hashlib -import logging from os import PathLike from pathlib import Path -from worthless import constants +from worthless import constants, launcher from worthless.classes import installer from worthless.game.gameenums import ( Variant, @@ -19,10 +19,9 @@ from worthless.game.gameenums import ( ) from worthless.game.hdiffpatch import HDiffPatch from worthless.game.launcherconfig import LauncherConfig -# Workaround for ImportError from partially initialized module -import worthless.launcher as launcher _logger = logging.getLogger("worthless.Game") +_game_version_regex = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+") def _calculate_md5(file: str | bytes | PathLike): @@ -36,6 +35,26 @@ def _calculate_md5(file: str | bytes | PathLike): return file_hash.hexdigest() +def _read_version_from_game_file(globalgamemanagers: Path | bytes) -> str | None: + """ + Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) + + This function uses `An Anime Game Launcher` method to read the version: + https://github.com/an-anime-team/an-anime-game-launcher/blob/main/src/ts/Game.ts#L26 + + Returns: + Version as string "x.x.x" or None if not found. + """ + if isinstance(globalgamemanagers, Path): + data = globalgamemanagers.read_text(encoding="ascii", errors="ignore") + else: + data = globalgamemanagers.decode(encoding="ascii", errors="ignore") + result = _game_version_regex.search(data) + if not result: + return + return result.group(1) + + def parse_pkg_version(pkg_version: str | bytes | PathLike) -> dict: """ Parse a pkg_version file @@ -64,7 +83,7 @@ def voicepack_lang_translate(lang: str, base_language="game") -> str | None: lang: Language to translate base_language: Base language type (game/locale/both) Returns: - Translated language code such as `en-us` or English(US) + Translated language code such as "en-us" or "English(US)" """ if base_language == "game" or base_language == "both": match lang.lower(): @@ -86,8 +105,6 @@ def voicepack_lang_translate(lang: str, base_language="game") -> str | None: return "Chinese" case "ko-kr": return "Korean" - # If nothing else matches - return def get_voicepack_archive_language( @@ -139,7 +156,7 @@ def verify_game_file( game_dir: str | bytes | PathLike, pkg_version: dict[str, str] | str | bytes | PathLike, ignore_mismatch=True, -): +) -> (str, str, str) | None: """ Verifies a game file @@ -148,9 +165,9 @@ def verify_game_file( to save resource and verify faster. Args: - file: The file to verify (e.g. GenshinImpact_Data/sharedassets0.assets.resS) - game_dir: The game directory - pkg_version: The pkg_version file to read MD5 from, if not specified it'll use + file: File to verify (e.g. GenshinImpact_Data/sharedassets0.assets.resS) + game_dir: Game directory + pkg_version: pkg_version file to read MD5 from, if not specified it'll use the file from the current game installation ignore_mismatch: Do not raise exception if a file has mismatch MD5, instead return as a (file, expected md5, actual md5) tuple @@ -253,7 +270,6 @@ class Game: self._game_config = LauncherConfig( self._game_config_file, game_version=self.version ) - self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+") def _read_version_from_config(self) -> str | None: """ @@ -266,39 +282,20 @@ class Game: raise FileNotFoundError(f"Config file {self._game_config_file} not found") return self._game_config.get("General", "game_version", fallback=None) - def _read_version_from_game_file( - self, globalgamemanagers: Path | bytes - ) -> str | None: + async def get_resource_info(self) -> installer.Resource: """ - Reads the version from the globalgamemanagers file. (Data/globalgamemanagers) - - This function uses `An Anime Game Launcher` method to read the version: - https://github.com/an-anime-team/an-anime-game-launcher/blob/main/src/ts/Game.ts#L26 - - Returns: - Version as string "x.x.x" or None if not found. + Alias for `launcher.get_resource_info()` """ - if isinstance(globalgamemanagers, Path): - data = globalgamemanagers.read_text(encoding="ascii", errors="ignore") - else: - data = globalgamemanagers.decode(encoding="ascii", errors="ignore") - result = self._game_version_re.search(data) - if not result: - return - return result.group(1) + return await self._launcher.get_resource_info() - async def _get_game_resource(self): - game_resource = await self._launcher.get_resource_info() - if not game_resource: - raise ValueError("Could not fetch game resource") - return game_resource - - async def _get_game(self, pre_download=False) -> installer.Game: - game_resource = await self._get_game_resource() + async def get_server_game_info(self, pre_download: bool = False) -> installer.Game: + game_resource = await self.get_resource_info() game = game_resource.game - if pre_download: - game = game_resource.pre_download_game - return game + if not pre_download: + return game + game = game_resource.pre_download_game + if game is None: + raise RuntimeError("Pre-download version is not available.") @property def version(self) -> str | None: @@ -327,7 +324,7 @@ class Game: return self._variant @property - def cache_path(self): + def cache_path(self) -> Path: return self._cache_path @staticmethod @@ -392,7 +389,7 @@ class Game: if not game_archive.is_file(): raise FileNotFoundError(f"Game archive {game_archive} not found") with zipfile.ZipFile(game_archive, "r") as f: - return self._read_version_from_game_file( + return _read_version_from_game_file( f.read(self.get_game_data_name() + "globalgamemanagers") ) @@ -426,7 +423,7 @@ class Game: return self._read_version_from_config() except FileNotFoundError: return - return self._read_version_from_game_file(globalgamemanagers) + return _read_version_from_game_file(globalgamemanagers) def get_installed_voicepacks(self) -> list[str]: """ @@ -593,16 +590,17 @@ class Game: self._extract_game_file(game_archive) self._version = self.get_current_game_version() self.set_version_config() + call(InstallStatus.COMPLETED) async def get_voicepack_diff_archive( self, lang: str, from_version: str = None, pre_download=False ) -> installer.Diff: """Gets a diff archive from `from_version` to the latest one - If from_version is not specified, it will be taken from the game version. + If from_version is not specified, it will be taken from the current game version. """ from_version = from_version if from_version else self.version - game = await self._get_game(pre_download=pre_download) + game = await self.get_server_game_info(pre_download=pre_download) translated_lang = voicepack_lang_translate(lang) for v in game.diffs: if v.version != from_version: @@ -616,10 +614,10 @@ class Game: ) -> installer.Diff: """Gets a diff archive from `from_version` to the latest one - If from_version is not specified, it will be taken from the game version. + If from_version is not specified, it will be taken from the current game version. """ from_version = from_version if from_version else self.version - game = await self._get_game(pre_download=pre_download) + game = await self.get_server_game_info(pre_download=pre_download) for v in game.diffs: if v.version == from_version: return v @@ -691,6 +689,6 @@ class Game: def clear_cache(self): """ - Clears the GameManager cache (e.g. downloaded game files) + Clears the cache (e.g. downloaded game files) """ shutil.rmtree(self._cache_path, ignore_errors=True) diff --git a/worthless/game/gamehelper.py b/worthless/game/gamehelper.py index 045420c..c7f0685 100644 --- a/worthless/game/gamehelper.py +++ b/worthless/game/gamehelper.py @@ -1,34 +1,20 @@ -from os import PathLike from pathlib import Path + from worthless import helper -from worthless.game import Game, Variant +from worthless.game import Game -class Helper(Game): +class Helper: """ Quick and dirty extra functions for Game Since this is quick and dirty, you are recommended to write your own method instead. Args: - game: A worthless.game.Game instance, if not specified it'll initialize a new one - using super().__init__ and use the following arguments for that instance creation - game_dir: Game directory - variant: Game variant - cache_dir: Cache directory, if not specified it'll automatically detect your - system cache directory and use that. + game: A worthless.game.Game instance """ - def __init__( - self, - game: Game = None, - game_dir: PathLike = None, - variant: Variant = None, - cache_dir: PathLike = None, - ): - if not game: - super().__init__(game_dir, variant, cache_dir) - game = self + def __init__(self, game: Game): self._download_chunk = 8192 self._game = game @@ -41,8 +27,8 @@ class Helper(Game): This function is a wrapper for helper.download_file Args: - file_url: The file url to download - file_name: The file name to download into + file_url: File url to download + file_name: File name to download into Returns: A Path object containing downloaded file """ @@ -55,34 +41,41 @@ class Helper(Game): chunks=self._download_chunk, ) - def set_download_chunk(self, chunk: int): + @property + def download_chunk(self): + return self._download_chunk + + @download_chunk.setter + def download_chunk(self, chunk: int) -> None: """ Sets the download chunk for the internal download function Args: - chunk: The chunk to set into + chunk: Chunk to set into """ self._download_chunk = chunk - async def download_full_game(self, pre_download=False) -> Path: - game = await self._game._get_game(pre_download) + async def download_full_game(self, pre_download: bool = False) -> Path: + game = await self._game.get_server_game_info(pre_download) archive_name = game.latest.path.split("/")[-1] return await self._download_file( game.latest.path, archive_name, game.latest.size ) - async def download_full_voicepack(self, language: str, pre_download=False) -> Path: - game = await self._game._get_game(pre_download) + async def download_full_voicepack( + self, language: str, pre_download: bool = False + ) -> Path: + game = await self._game.get_server_game_info(pre_download) translated_lang = self._game.voicepack_lang_translate(language) for vo in game.latest.voice_packs: if vo.language == translated_lang: return await self._download_file(vo.path, vo.get_name(), vo.size) async def download_game_update( - self, from_version: str = None, pre_download=False + self, from_version: str = None, pre_download: bool = False ) -> Path: - from_version = from_version if from_version else self.version - game = await self._game._get_game(pre_download=pre_download) + from_version = from_version if from_version else self._game.version + game = await self._game.get_server_game_info(pre_download=pre_download) if self._game.version == game.latest.version: raise ValueError("Game is already up to date.") diff_archive = await self._game.get_game_diff_archive( @@ -97,9 +90,9 @@ class Helper(Game): ) async def download_voicepack_update( - self, language: str, from_version: str = None, pre_download=False + self, language: str, from_version: str = None, pre_download: bool = False ) -> Path: - from_version = from_version if from_version else self.version + from_version = from_version if from_version else self._game.version diff_archive = await self._game.get_voicepack_diff_archive( language, from_version, pre_download ) diff --git a/worthless/helper.py b/worthless/helper.py index a6ce9fc..d2c8543 100644 --- a/worthless/helper.py +++ b/worthless/helper.py @@ -1,11 +1,12 @@ -import secrets -import math import asyncio -import aiohttp import logging +import math +import secrets from pathlib import Path -logger = logging.getLogger("worthless.helper") +import aiohttp + +_logger = logging.getLogger("worthless.helper") async def download_file( @@ -13,28 +14,32 @@ async def download_file( file_name: str, file_path: Path | str, file_len: int = None, - overwrite=False, + overwrite: bool = False, chunks: int = None, threads_num: int = None, ) -> Path: """ - Download file name to a file_path. + Download file name to file_path. + You should implement your own download method instead of using this. - :param file_url: the url to download the file from - :param file_name: the file name to download into - :param file_path: the path to download file into - :param file_len: file length, to support threaded downloading - :param overwrite: whether overwrite existing file or not - :param chunks: chunks to write file into memory before writing to disk - :param threads_num: number of download threads - :return: Downloaded file as a Path object + Args: + file_url: Url to download the file from + file_name: File name to download into + file_path: Path to download file into + file_len: File length to support threaded downloading + overwrite: Whether overwrite existing file or not + chunks: Chunks to write file into memory before writing to disk + threads_num: Number of download threads + Return: + Downloaded file as a Path object """ + logger = _logger.getChild("download_file") if not chunks: chunks = 8192 if not threads_num: threads_num = 8 - logger.debug("Download chunks {} with {} thread".format(chunks, threads_num)) + logger.debug("Downloading chunks {} with {} thread".format(chunks, threads_num)) file_path = Path(file_path).joinpath(file_name) async def _download( diff --git a/worthless/launcher.py b/worthless/launcher.py index 30b8310..1f3b859 100644 --- a/worthless/launcher.py +++ b/worthless/launcher.py @@ -1,5 +1,7 @@ -import aiohttp import locale + +import aiohttp + from worthless import constants from worthless.classes import launcher, installer from worthless.game.gameenums import Variant @@ -33,6 +35,8 @@ class Launcher: """Initialize the launcher API""" if not variant: variant = Variant.INTERNATIONAL + if not language: + language = _get_system_language() self._variant = variant match variant: case Variant.INTERNATIONAL: @@ -41,11 +45,7 @@ class Launcher: "key": "gcStgarh", "launcher_id": "10", } - self._lang = ( - language.lower().replace("_", "-") - if language - else _get_system_language() - ) + self._lang = language.lower().replace("_", "-") case Variant.CHINESE: self._api = constants.LAUNCHER_API_URL_CN self._params = { @@ -54,7 +54,7 @@ class Launcher: "channel_id": "1", } self._lang = ( - "zh-cn" # Use chinese language because this is chinese version + "zh-cn" # Uses Chinese language because this is Chinese version ) case Variant.BILIBILI: self._api = constants.LAUNCHER_API_URL_CN @@ -64,21 +64,23 @@ class Launcher: "channel_id": "14", } self._lang = ( - "zh-cn" # Use chinese language because this is chinese version + "zh-cn" # Use Chinese language because this is Chinese version ) + # There is one for beta version too, but I don't have access to that version self._session = aiohttp.ClientSession() async def _get(self, url, **kwargs) -> dict: - # Workaround because miHoYo uses retcode for their API instead of HTTP status code + # Workaround because miHoYo uses retcode for their API instead of HTTP status code (always 200) rsp = await self._session.get(url, **kwargs) rsp_json = await rsp.json() if rsp_json["retcode"] != 0: # TODO: Add more information to the error message raise aiohttp.ClientResponseError( - code=rsp_json["retcode"], message=rsp_json["message"], history=rsp.history, request_info=rsp.request_info, + headers=rsp.headers, + status=rsp_json["retcode"], ) return rsp_json @@ -97,7 +99,9 @@ class Launcher: @lang.setter def lang(self, language: str) -> None: - """Overrides system detected language with another language. + """Sets current language to another language in xx-xx format. + + This method will automatically convert language with xx_XX to xx-xx (e.g. en_US to en-us) Args: language (str): Language to override with. @@ -142,15 +146,3 @@ class Launcher: """ return await self._get_launcher_info(adv=False) - - async def get_launcher_background_url(self) -> str: - """Gets launcher background image url from the server. - - Returns: - Background image url. - Raises: - aiohttp.ClientResponseError: An error occurred while fetching the background image. - """ - - rsp = await self.get_launcher_info() - return rsp.background.background diff --git a/worthless/patcher.py b/worthless/patcher.py index 34d407b..6f33871 100644 --- a/worthless/patcher.py +++ b/worthless/patcher.py @@ -1,14 +1,15 @@ +import asyncio import os import platform -import tarfile import shutil -import aiohttp -import asyncio - +import tarfile from pathlib import Path + +import aiohttp + from worthless import constants -from worthless.launcher import Launcher from worthless.game import Game as Installer +from worthless.launcher import Launcher match platform.system(): case "Linux": @@ -28,14 +29,14 @@ except ImportError: class Patcher: def __init__( self, - gamedir: Path | AsyncPath | str = AsyncPath.cwd(), - data_dir: str | Path | AsyncPath = None, + gamedir: Path | str = None, + data_dir: str | Path = None, patch_url: str = None, overseas=True, patch_provider="Krock", ): if isinstance(gamedir, str | Path): - gamedir = AsyncPath(gamedir) + gamedir = Path(gamedir) self._gamedir = gamedir if not patch_url: patch_url = constants.PATCH_LIST[patch_provider].replace( @@ -44,13 +45,11 @@ class Patcher: self._patch_url = patch_url if not data_dir: self._appdirs = constants.APPDIRS - self._patch_path = AsyncPath(self._appdirs.user_data_dir).joinpath("Patch") - self._temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath( - "Patcher" - ) + self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch") + self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Patcher") else: if isinstance(data_dir, str | Path): - data_dir = AsyncPath(data_dir) + data_dir = Path(data_dir) self._patch_path = data_dir.joinpath("Patch") self._temp_path = data_dir.joinpath("Temp/Patcher") self._overseas = overseas @@ -275,20 +274,20 @@ class Patcher: for file in disable_files: file_path = Path(self._gamedir.joinpath(file)).resolve() if file_path.exists(): - await AsyncPath(file_path).rename(str(file_path) + ".bak") + await Path(file_path).rename(str(file_path) + ".bak") patch_jobs.append(disable_crashreporters()) await asyncio.gather(*patch_jobs) @staticmethod - async def _creation_date(file_path: str | Path | AsyncPath): + async def _creation_date(file_path: str | Path | Path): """ Try to get the date that a file was created, falling back to when it was last modified if that isn't possible. See http://stackoverflow.com/a/39501288/1709587 for explanation. """ if isinstance(file_path, str | Path): - file_path = AsyncPath(file_path) + file_path = Path(file_path) if platform.system() == "Windows": return os.path.getctime(file_path) else: @@ -301,7 +300,7 @@ class Patcher: return stat.st_mtime async def _revert_file( - self, original_file: str, base_file: AsyncPath, ignore_error=False + self, original_file: str, base_file: Path, ignore_error=False ): original_path = Path(self._gamedir.joinpath(original_file + ".bak")).resolve() target_file = Path(self._gamedir.joinpath(original_file)).resolve() @@ -351,7 +350,7 @@ class Patcher: files = get_files(("*.dxvk-cache", "*_d3d9.log", "*_d3d11.log", "*_dxgi.log")) for file in files: - revert_job.append(AsyncPath(file).unlink(missing_ok=True)) + revert_job.append(Path(file).unlink(missing_ok=True)) await asyncio.gather(*revert_job)