refactor: convert all task-intensive functions to async.

chore: rename gui.py to cli.py
fix: internal downloader can resume download now.
feat: add verify_game, verify_from_pkg_version, clear_cache to installer.py.
feat: add clear_cache to patcher.py.
fix: linux now check for pkexec before executing it.
fix: add get_name to voicepack.py, latest.py, diff.py to get name from path (since the developer didn't set a name to these files in the sdk url)
chore: remove deprecation message in read_version_from_config in installer.py
misc: use chunk from self._download_chunk instead of being hardcoded to 8192.
fix: is_telemetry_blocked will only wait 15s for a connection.
chore: move appdirs to constants.py

This commit refactor almost all functions to be compatible with asyncio, also restructured CLI to use asyncio.run on main function instead of executing it randomly.
Also prioritize the use of asyncio.gather, sometimes making tasks faster
This commit is contained in:
tretrauit 2022-06-25 01:13:47 +07:00
parent 8b2d0cad8f
commit a5659f7ff3
Signed by: tretrauit
GPG Key ID: CDDE1C97EE305DAF
14 changed files with 515 additions and 406 deletions

View File

@ -1,3 +1,5 @@
# worthless-launcher
A worthless CLI launcher written in Python.
A worthless CLI launcher written in Python.
Check out its website at https://tretrauit.gitlab.io/worthless-launcher for more information.

View File

@ -1,4 +1,3 @@
aiohttp==3.8.1
appdirs~=1.4.4
aiopath~=0.6.10
xdelta3~=0.0.5
aiopath~=0.6.10

View File

@ -9,7 +9,7 @@ README = (HERE / "README.md").read_text()
setup(
name='worthless',
version='1.3.1-2',
version='2.0.0',
packages=['worthless', 'worthless.classes', 'worthless.classes.launcher', 'worthless.classes.installer'],
url='https://git.froggi.es/tretrauit/worthless-launcher',
license='MIT License',

View File

@ -1,6 +1,6 @@
#!/usr/bin/python3
import asyncio
from worthless import cli
from worthless import gui
if __name__ == '__main__':
gui.main()
if __name__ == "__main__":
asyncio.run(cli.main())

View File

@ -12,6 +12,9 @@ class Diff:
self.voice_packs = voice_packs
self.raw = raw
def get_name(self):
return self.path.split("/")[-1]
@staticmethod
def from_dict(data):
voice_packs = []

View File

@ -14,6 +14,9 @@ class Latest:
self.segments = segments
self.raw = raw
def get_name(self):
return self.path.split("/")[-1]
@staticmethod
def from_dict(data):
voice_packs = []

View File

@ -7,6 +7,9 @@ class Voicepack:
self.md5 = md5
self.raw = raw
def get_name(self):
return self.path.split("/")[-1]
@staticmethod
def from_dict(data):
return Voicepack(data["language"], data["name"], data["path"], data["size"], data["md5"], data)

View File

@ -22,145 +22,146 @@ class UI:
def _ask(self, question):
if self._noconfirm:
# Fake dialog
print(question + " (y/n): y")
print(question + " [Y/n]:")
return True
answer = ""
while answer.lower() not in ['y', 'n']:
while answer.lower() not in ['y', 'n', '']:
if answer != "":
print("Invalid choice, please try again.")
answer = input(question + " (y/n): ")
return answer.lower() == 'y'
answer = input(question + " [Y/n]: ")
return answer.lower() == 'y' or answer == ''
def override_game_version(self, version: str):
self._installer._version = version
def get_game_version(self):
print(self._installer.get_game_version())
async def get_game_version(self):
print(await self._installer.get_game_version())
def block_telemetry(self):
async def block_telemetry(self):
print("Checking for available telemetry to block...")
try:
asyncio.run(self._patcher.block_telemetry())
await self._patcher.block_telemetry()
except ValueError:
print("No telemetry to block.")
else:
print("Telemetry blocked.")
def check_telemetry(self):
block_status = asyncio.run(self._patcher.is_telemetry_blocked())
async def check_telemetry(self):
block_status = await self._patcher.is_telemetry_blocked()
if not block_status:
print("Telemetry is blocked.")
else:
print("Telemetry is not blocked, you need to block these hosts below.")
for block in block_status:
print(block)
for hosts in block_status:
print(hosts)
def _update_from_archive(self, filepath):
async def _update_from_archive(self, filepath):
print("Reverting patches if patched...")
self._patcher.revert_patch(True)
print("Updating game from archive (this may takes some time)...")
asyncio.run(self._installer.update_game(filepath))
await self._patcher.revert_patch(True)
print("Updating game from archive (This may takes some time)...")
await self._installer.update_game(filepath)
self._installer.set_version_config()
def _install_from_archive(self, filepath, force_reinstall):
print("Installing game from archive (this may takes some time)...")
self._installer.install_game(filepath, force_reinstall)
async def _install_from_archive(self, filepath, force_reinstall):
print("Installing game from archive (This may takes some time)...")
print(filepath)
await self._installer.install_game(filepath, force_reinstall)
self._installer.set_version_config()
def _apply_voiceover_from_archive(self, filepath):
print("Applying voiceover from archive (this may takes some time)...")
self._installer.apply_voiceover(filepath)
async def _apply_voiceover_from_archive(self, filepath):
print("Applying voiceover from archive (This may takes some time)...")
print("Voiceover archive:", filepath)
await self._installer.apply_voiceover(filepath)
def install_voiceover_from_file(self, filepath):
async def install_voiceover_from_file(self, filepath):
print("Archive voiceover language: {} ({})".format(
self._installer.get_voiceover_archive_language(filepath),
"Full archive" if self._installer.get_voiceover_archive_type(filepath) else "Update archive"))
await self._installer.get_voiceover_archive_language(filepath),
"Full archive" if await self._installer.get_voiceover_archive_type(filepath) else "Update archive"))
if not self._ask("Do you want to apply this voiceover pack? ({})".format(filepath)):
print("Aborting apply process.")
return
self._apply_voiceover_from_archive(filepath)
await self._apply_voiceover_from_archive(filepath)
print("Voiceover applied successfully.")
def revert_patch(self):
async def revert_patch(self):
print("Reverting patches...")
self._patcher.revert_patch(True)
await self._patcher.revert_patch(True)
print("Patches reverted.")
def patch_game(self, login_fix: bool = False):
async def patch_game(self, login_fix: bool = False):
print("NOTE: Hereby you are violating the game's Terms of Service!")
print("Do not patch the game if you don't know what you are doing!")
if not self._ask("Do you want to patch the game? (This will overwrite your game files!)"):
print("Aborting patch process.")
return
self.block_telemetry()
await self.block_telemetry()
print("Updating patches...")
asyncio.run(self._patcher.download_patch())
await self._patcher.download_patch()
print("Patching game...")
self._patcher.apply_patch(login_fix)
await self._patcher.apply_patch(login_fix)
print("Game patched.")
print("Please refrain from sharing this project to public, thank you.")
def install_from_file(self, filepath):
gamever = self._installer.get_game_version()
print("Archive game version: " + self._installer.get_game_archive_version(filepath))
async def install_from_file(self, filepath):
gamever = await self._installer.get_game_version()
print("Archive game version:", await self._installer.get_game_archive_version(filepath))
if gamever:
print("Current game installation detected. ({})".format(self._installer.get_game_version()))
print("Current game installation detected. ({})".format(await self._installer.get_game_version()))
if not self._ask("Do you want to update the game? ({})".format(filepath)):
print("Aborting update process.")
return
self._update_from_archive(filepath)
await self._update_from_archive(filepath)
print("Game updated successfully.")
else:
print("No game installation detected.")
if not self._ask("Do you want to install the game? ({})".format(filepath)):
print("Aborting installation process.")
return
self._install_from_archive(filepath, False)
await self._install_from_archive(filepath, False)
print("Game installed successfully.")
def download_patch(self):
async def download_patch(self):
print("Downloading patches...")
asyncio.run(self._patcher.download_patch())
await self._patcher.download_patch()
def download_game(self):
async def download_game(self):
print("Downloading full game (This will take a long time)...")
asyncio.run(self._installer.download_full_game())
await self._installer.download_full_game()
def download_game_update(self):
async def download_game_update(self):
print("Downloading game update (This will take a long time)...")
asyncio.run(self._installer.download_game_update())
await self._installer.download_game_update()
def download_voiceover(self, languages: str):
res_info = asyncio.run(self._launcher.get_resource_info())
async def download_voiceover(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
continue
print("Downloading voiceover pack for {} (This will take a long time)...".format(lng))
asyncio.run(self._installer.download_full_voiceover(lng))
await self._installer.download_full_voiceover(lng)
def download_voiceover_update(self, languages: str):
res_info = asyncio.run(self._launcher.get_resource_info())
async def download_voiceover_update(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
continue
print("Downloading voiceover update pack for {} (This will take a long time)...".format(lng))
asyncio.run(self._installer.download_voiceover_update(lng))
await self._installer.download_voiceover_update(lng)
def install_game(self, forced: bool = False):
res_info = asyncio.run(self._launcher.get_resource_info())
async def install_game(self, forced: bool = False):
res_info = await self._launcher.get_resource_info()
print("Latest game version: {}".format(res_info.game.latest.version))
if not self._ask("Do you want to install the game?"):
print("Aborting game installation process.")
return
self.download_game()
print("Installing game...")
self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name), forced)
await self.download_game()
print("Game archive:", res_info.game.latest.get_name())
await self._install_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.get_name()), forced)
def install_voiceover(self, languages: str):
res_info = asyncio.run(self._launcher.get_resource_info())
print("Latest game version: {}".format(res_info.game.latest.version))
async def install_voiceover(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
@ -169,19 +170,20 @@ class UI:
print("Aborting voiceover installation process.")
return
print("Downloading voiceover pack (This will take a long time)...")
asyncio.run(self._installer.download_full_voiceover(lng))
print("Installing voiceover pack...")
self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(res_info.game.latest.name))
await self._installer.download_full_voiceover(lng)
await self._apply_voiceover_from_archive(
self._installer.temp_path.joinpath(vo.get_name())
)
break
def update_game(self):
game_ver = self._installer.get_game_version()
async def update_game(self):
game_ver = await self._installer.get_game_version()
if not game_ver:
self.install_game()
await self.install_game()
return
print("Current game installation detected. ({})".format(game_ver))
diff_archive = asyncio.run(self._installer.get_game_diff_archive())
res_info = asyncio.run(self._launcher.get_resource_info())
print("Current game installation detected: {}".format(game_ver))
diff_archive = await self._installer.get_game_diff_archive()
res_info = await self._launcher.get_resource_info()
if not diff_archive:
print("No game updates available.")
return
@ -190,19 +192,24 @@ class UI:
print("Aborting game update process.")
return
print("Downloading game update (This will take a long time)...")
asyncio.run(self._installer.download_game_update())
await self._installer.download_game_update()
print("Installing game update...")
self.install_from_file(self._installer.temp_path.joinpath(res_info.game.latest.name))
await self.install_from_file(self._installer.temp_path.joinpath(res_info.game.latest.name))
def update_voiceover(self, languages: str):
game_ver = self._installer.get_game_version()
async def update_voiceover(self, languages: str | list):
if isinstance(languages, str):
languages = languages.split(" ")
game_ver = await self._installer.get_game_version()
if not game_ver:
self.install_voiceover(languages)
print("Couldn't detect current game installation, is game installed?")
return
print("Current game installation detected. ({})".format(game_ver))
for lng in languages.split(" "):
diff_archive = asyncio.run(self._installer.get_voiceover_diff_archive(lng))
# res_info = asyncio.run(self._launcher.get_resource_info())
installed_voiceovers = await self._installer.get_installed_voiceovers()
print(f"Installed voiceovers: {None if installed_voiceovers == [] else ', '.join(installed_voiceovers)}")
for lng in languages:
if self._installer.voiceover_lang_translate(lng, "locale") not in installed_voiceovers:
await self.install_voiceover(lng)
continue
diff_archive = await self._installer.get_voiceover_diff_archive(lng)
if not diff_archive:
print("No voiceover updates available for {}.".format(lng))
continue
@ -210,19 +217,40 @@ class UI:
print("Aborting this voiceover language update process.")
continue
print("Downloading voiceover update (This may takes some time)...")
asyncio.run(self._installer.download_voiceover_update(lng))
await self._installer.download_voiceover_update(lng)
print("Installing voiceover update for {}...".format(lng))
self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(diff_archive.name))
await self._apply_voiceover_from_archive(self._installer.temp_path.joinpath(diff_archive.get_name()))
def update_game_voiceover(self, languages: str):
self.update_game()
self.update_voiceover(languages)
async def update_game_voiceover(self, languages: str):
await self.update_game()
await self.update_voiceover(languages)
def interactive_ui(self):
raise NotImplementedError()
async def update_all(self):
await self.update_game()
await self.update_voiceover(await self._installer.get_installed_voiceovers())
async def verify_game(self):
game_ver = await self._installer.get_game_version()
if not game_ver:
print("Couldn't detect current game installation, is game installed?")
return
print("Verifying game contents... (This may takes a long time)")
failed_files = await self._installer.verify_game(ignore_mismatch=True)
if not failed_files:
print("All good.")
return
print("Some game files got corrupted (mismatch md5), uh oh.")
for file in failed_files:
print("{}: expected {}, actual {}".format(file[0], file[1], file[2]))
async def clear_cache(self):
if self._ask("Do you want to clear Installer cache (contains downloaded game files, etc)"):
await self._installer.clear_cache()
if self._ask("Do you want to clear Patcher cache (contains files used to patch)"):
await self._patcher.clear_cache()
def main():
async def main():
default_dirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
parser = argparse.ArgumentParser(prog="worthless", description="A worthless launcher written in Python.")
parser.add_argument("-D", "--dir", action="store", type=Path, default=Path.cwd(),
@ -259,19 +287,15 @@ def main():
parser.add_argument("-Rs", "--remove", action="store_true", help="Remove the game (if installed)")
parser.add_argument("-Rp", "--remove-patch", action="store_true", help="Revert the game patch (if patched)")
parser.add_argument("-Rv", "--remove-voiceover", action="store_true", help="Remove a Voiceover pack (if installed)")
parser.add_argument("-V", "--verify", action="store_true", help="Verify the game installation")
parser.add_argument("--get-game-version", action="store_true", help="Get the current game version")
parser.add_argument("--no-overseas", action="store_true", help="Don't use overseas server")
parser.add_argument("--check-telemetry", action="store_true", help="Check for the telemetry information")
parser.add_argument("--clear-cache", action="store_true", help="Clear cache used by worthless")
parser.add_argument("--from-ver", action="store", help="Override the detected game version", type=str, default=None)
parser.add_argument("--noconfirm", action="store_true",
help="Do not ask any for confirmation. (Ignored in interactive mode)")
args = parser.parse_args()
interactive_mode = not args.install and not args.install_from_file and not args.patch and not args.update and not \
args.remove and not args.remove_patch and not args.remove_voiceover and not args.get_game_version and not \
args.install_voiceover_from_file and not args.update_voiceover and not args.download_game and not \
args.download_voiceover and not args.download_game_update and not args.download_voiceover_update and not \
args.install_voiceover_from_file and not args.update_all and not args.login_fix and not args.check_telemetry\
and not args.from_ver
if args.temporary_dir:
args.temporary_dir.mkdir(parents=True, exist_ok=True)
@ -293,50 +317,62 @@ def main():
ui.override_game_version(args.from_ver)
if args.get_game_version:
ui.get_game_version()
await ui.get_game_version()
if args.check_telemetry:
ui.check_telemetry()
await ui.check_telemetry()
# Download
if args.download_game:
ui.download_game()
await ui.download_game()
if args.download_voiceover:
ui.download_voiceover(args.download_voiceover)
await ui.download_voiceover(args.download_voiceover)
if args.download_game_update:
ui.download_game_update()
await ui.download_game_update()
if args.download_voiceover_update:
ui.download_voiceover_update(args.download_voiceover_update)
await ui.download_voiceover_update(args.download_voiceover_update)
# Install
if args.install:
ui.install_game()
if args.update_all:
raise NotImplementedError() # TODO
if args.update:
ui.update_game_voiceover(args.update)
if args.update_voiceover:
ui.update_voiceover(args.update_voiceover)
await ui.install_game()
if args.install_from_file:
ui.install_from_file(args.install_from_file)
await ui.install_from_file(args.install_from_file)
if args.install_voiceover_from_file:
ui.install_voiceover_from_file(args.install_voiceover_from_file)
await ui.install_voiceover_from_file(args.install_voiceover_from_file)
# Update
if args.update_all:
await ui.update_all()
if args.update:
await ui.update_game_voiceover(args.update)
if args.update_voiceover:
await ui.update_voiceover(args.update_voiceover)
# Patch
if args.patch:
ui.patch_game(args.login_fix)
await ui.patch_game(args.login_fix)
if args.remove_patch:
ui.revert_patch()
await ui.revert_patch()
if interactive_mode:
ui.interactive_ui()
# Verify
if args.verify:
await ui.verify_game()
if args.clear_cache:
await ui.clear_cache()
if __name__ == "__main__":
main()
asyncio.run(main())

View File

@ -1,5 +1,9 @@
from appdirs import AppDirs
APP_NAME="worthless"
APP_AUTHOR="tretrauit"
APPDIRS=AppDirs(APP_NAME, APP_AUTHOR)
LAUNCHER_API_URL_OS = "https://sdk-os-static.hoyoverse.com/hk4e_global/mdk/launcher/api"
LAUNCHER_API_URL_CN = "https://sdk-static.mihoyo.com/hk4e_cn/mdk/launcher/api"
HDIFFPATCH_GIT_URL="https://github.com/sisong/HDiffPatch"

View File

@ -3,10 +3,9 @@ import re
import shutil
import platform
import aiohttp
import appdirs
import zipfile
import warnings
import json
import hashlib
from pathlib import Path
from configparser import ConfigParser
from aiopath import AsyncPath
@ -23,19 +22,21 @@ async def _download_file(file_url: str, file_name: str, file_path: Path | str, f
:param file_name:
:return:
"""
params = {}
headers = {}
file_path = AsyncPath(file_path).joinpath(file_name)
if overwrite:
await file_path.unlink(missing_ok=True)
if await file_path.exists():
cur_len = len(await file_path.read_bytes())
params |= {
cur_len = (await file_path.stat()).st_size
headers |= {
"Range": f"bytes={cur_len}-{file_len if file_len else ''}"
}
else:
await file_path.touch()
async with aiohttp.ClientSession() as session:
rsp = await session.get(file_url, params=params, timeout=None)
rsp = await session.get(file_url, headers=headers, timeout=None)
if rsp.status == 416:
return
rsp.raise_for_status()
while True:
chunk = await rsp.content.read(chunks)
@ -51,7 +52,7 @@ class HDiffPatch:
git_url = constants.HDIFFPATCH_GIT_URL
self._git_url = git_url
if not data_dir:
self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
self._appdirs = constants.APPDIRS
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("HDiffPatch")
self.data_path = Path(self._appdirs.user_data_dir).joinpath("Tools/HDiffPatch")
else:
@ -98,7 +99,7 @@ class HDiffPatch:
hpatchz_name = "hpatchz" + (".exe" if platform.system() == "Windows" else "")
return self._get_hdiffpatch_exec(hpatchz_name)
async def patch_file(self, in_file, out_file, patch_file, wait=False):
async def patch_file(self, in_file, out_file, patch_file, error=False, wait=False):
hpatchz = self.get_hpatchz_executable()
if not hpatchz:
raise RuntimeError("hpatchz executable not found")
@ -106,6 +107,8 @@ class HDiffPatch:
if not wait:
return proc
await proc.wait()
if error and proc.returncode != 0:
raise RuntimeError(f"Patching failed, return code is {proc.returncode}")
return proc
def get_hdiffz_executable(self):
@ -141,91 +144,33 @@ class HDiffPatch:
await _download_file(url, name, self.temp_path, overwrite=True)
if not extract:
return
archive = zipfile.ZipFile(self.temp_path.joinpath(name))
archive.extractall(self.data_path)
archive.close()
with zipfile.ZipFile(self.temp_path.joinpath(name), 'r') as f:
await asyncio.to_thread(f.extractall, path=self.data_path)
class Installer:
def _read_version_from_config(self):
warnings.warn("This function is not reliable as upgrading game version from worthless\
doesn't write the config.", DeprecationWarning)
if not self._config_file.exists():
raise FileNotFoundError(f"Config file {self._config_file} not found")
cfg = ConfigParser()
cfg.read(str(self._config_file))
return cfg.get("General", "game_version")
@staticmethod
def read_version_from_game_file(globalgamemanagers: Path | bytes):
"""
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
Uses `An Anime Game Launcher` method to read the version:
https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
:return: Game version (ex 1.0.0)
"""
if isinstance(globalgamemanagers, Path):
with globalgamemanagers.open("rb") as f:
data = f.read().decode("ascii", errors="ignore")
else:
data = globalgamemanagers.decode("ascii", errors="ignore")
result = re.search(r"([1-9]+\.[0-9]+\.[0-9]+)_[\d]+_[\d]+", data)
if not result:
raise ValueError("Could not find version in game file")
return result.group(1)
def get_game_data_name(self):
if self._overseas:
return "GenshinImpact_Data/"
else:
return "YuanShen_Data/"
def get_game_data_path(self):
return self._gamedir.joinpath(self.get_game_data_name())
def get_game_version(self):
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
return
return self.read_version_from_game_file(globalgamemanagers)
def get_installed_voiceovers(self):
"""
Returns a list of installed voiceovers.
:return: List of installed voiceovers
"""
voiceovers = []
for file in self.get_game_data_path().joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir():
if file.is_dir():
voiceovers.append(file.name)
return voiceovers
def __init__(self, gamedir: str | Path = Path.cwd(), overseas: bool = True, data_dir: str | Path = None):
if isinstance(gamedir, str):
gamedir = Path(gamedir)
def __init__(self, gamedir: str | Path | AsyncPath = AsyncPath.cwd(),
overseas: bool = True, data_dir: str | Path | AsyncPath = None):
if isinstance(gamedir, str | Path):
gamedir = AsyncPath(gamedir)
self._gamedir = gamedir
if not data_dir:
self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("Installer")
self._appdirs = constants.APPDIRS
self.temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath("Installer")
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
if isinstance(data_dir, str | AsyncPath):
data_dir = AsyncPath(data_dir)
self.temp_path = data_dir.joinpath("Temp/Installer/")
self.temp_path.mkdir(parents=True, exist_ok=True)
Path(self.temp_path).mkdir(parents=True, exist_ok=True)
config_file = self._gamedir.joinpath("config.ini")
self._config_file = config_file.resolve()
self._config_file = config_file
self._download_chunk = 8192
self._overseas = overseas
self._version = self.get_game_version()
self._version = None
self._launcher = Launcher(self._gamedir, overseas=self._overseas)
self._hdiffpatch = HDiffPatch(data_dir=data_dir)
self._config = LauncherConfig(self._config_file, self._version)
def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
self._game_version_re = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+")
async def _download_file(self, file_url: str, file_name: str, file_len: int = None, overwrite=False):
"""
@ -234,76 +179,143 @@ class Installer:
:param file_name:
:return:
"""
await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite)
await _download_file(file_url, file_name, self.temp_path, file_len=file_len, overwrite=overwrite,
chunks=self._download_chunk)
def get_game_archive_version(self, game_archive: str | Path):
if not game_archive.exists():
raise FileNotFoundError(f"Game archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, 'r')
return self.read_version_from_game_file(archive.read(self.get_game_data_name() + "globalgamemanagers"))
async def read_version_from_config(self):
if not await self._config_file.exists():
raise FileNotFoundError(f"Config file {self._config_file} not found")
cfg = ConfigParser()
await asyncio.to_thread(cfg.read, str(self._config_file))
return cfg.get("General", "game_version")
async def read_version_from_game_file(self, globalgamemanagers: AsyncPath | Path | bytes) -> str:
"""
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
Uses `An Anime Game Launcher` method to read the version:
https://gitlab.com/KRypt0n_/an-anime-game-launcher/-/blob/main/src/ts/Game.ts#L26
:return: Game version (ex 1.0.0)
"""
if isinstance(globalgamemanagers, Path | AsyncPath):
globalgamemanagers = AsyncPath(globalgamemanagers)
data = await globalgamemanagers.read_text("ascii", errors="ignore")
else:
data = globalgamemanagers.decode("ascii", errors="ignore")
result = self._game_version_re.search(data)
if not result:
raise ValueError("Could not find version in game file")
return result.group(1)
@staticmethod
def voiceover_lang_translate(lang: str):
def voiceover_lang_translate(lang: str, base_language="game") -> str:
"""
Translates the voiceover language to the language code used by the game.
:param lang: Language to translate
:param base_language: Base language type (game/locale/both)
:return: Language code
"""
match lang:
case "English(US)":
return "en-us"
case "Japanese":
return "ja-jp"
case "Chinese":
return "zh-cn"
case "Korean":
return "ko-kr"
if base_language == "game" or base_language == "both":
match lang.lower():
case "english(us)":
return "en-us"
case "japanese":
return "ja-jp"
case "chinese":
return "zh-cn"
case "korean":
return "ko-kr"
if base_language == "locale" or base_language == "both":
match lang.lower().replace("_", "-"):
case "en-us":
return "English(US)"
case "ja-jp":
return "Japanese"
case "zh-cn":
return "Chinese"
case "ko-kr":
return "Korean"
# If nothing else matches
return lang
@staticmethod
def get_voiceover_archive_language(voiceover_archive: str | Path):
if isinstance(voiceover_archive, str):
async def get_voiceover_archive_language(voiceover_archive: str | Path | AsyncPath) -> str:
if isinstance(voiceover_archive, str | Path):
voiceover_archive = Path(voiceover_archive).resolve()
if not voiceover_archive.exists():
raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found")
archive = zipfile.ZipFile(voiceover_archive, 'r')
archive_path = zipfile.Path(archive)
for file in archive_path.iterdir():
if file.name.endswith("_pkg_version"):
return file.name.split("_")[1]
with zipfile.ZipFile(voiceover_archive, 'r') as f:
for file in zipfile.Path(f).iterdir():
if file.name.endswith("_pkg_version"):
return file.name.split("_")[1]
def get_voiceover_archive_type(self, voiceover_archive: str | Path):
vo_lang = self.get_voiceover_archive_language(voiceover_archive)
archive = zipfile.ZipFile(voiceover_archive, 'r')
archive_path = zipfile.Path(archive)
files = archive.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n")
for file in files:
if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists():
return False
@staticmethod
async def get_voiceover_archive_type(voiceover_archive: str | Path) -> bool:
"""
Gets voiceover archive type.
:param voiceover_archive:
:return: True if this is a full archive, else False.
"""
vo_lang = Installer.get_voiceover_archive_language(voiceover_archive)
with zipfile.ZipFile(voiceover_archive, 'r') as f:
archive_path = zipfile.Path(f)
files = (await asyncio.to_thread(f.read, "Audio_{}_pkg_version".format(vo_lang))).decode().split("\n")
for file in files:
if file.strip() and not archive_path.joinpath(json.loads(file)["remoteName"]).exists():
return False
return True
def apply_voiceover(self, voiceover_archive: str | Path):
# Since Voiceover packages are unclear about diff package or full package
# we will try to extract the voiceover package and apply it to the game
# making this function universal for both cases
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(voiceover_archive, str):
voiceover_archive = Path(voiceover_archive).resolve()
if not voiceover_archive.exists():
raise FileNotFoundError(f"Voiceover archive {voiceover_archive} not found")
archive = zipfile.ZipFile(voiceover_archive, 'r')
archive.extractall(self._gamedir)
archive.close()
def set_download_chunk(self, chunk: int):
self._download_chunk = chunk
async def update_game(self, game_archive: str | Path):
if not self.get_game_data_path().exists():
def get_game_data_name(self):
if self._overseas:
return "GenshinImpact_Data/"
else:
return "YuanShen_Data/"
def get_game_data_path(self) -> AsyncPath:
return self._gamedir.joinpath(self.get_game_data_name())
async def get_game_archive_version(self, game_archive: str | Path):
if not game_archive.exists():
raise FileNotFoundError(f"Game archive {game_archive} not found")
with zipfile.ZipFile(game_archive, 'r') as f:
return await self.read_version_from_game_file(
await asyncio.to_thread(f.read, self.get_game_data_name() + "globalgamemanagers")
)
async def get_game_version(self) -> str | None:
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not await globalgamemanagers.exists():
try:
return await self.read_version_from_config()
except FileNotFoundError:
return
return await self.read_version_from_game_file(globalgamemanagers)
async def get_installed_voiceovers(self) -> list[str]:
"""
Returns a list of installed voiceovers.
:return: List of installed voiceovers
"""
voiceovers = []
async for file in self.get_game_data_path()\
.joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows").iterdir():
if await file.is_dir():
voiceovers.append(file.name)
return voiceovers
async def update_game(self, game_archive: str | Path | AsyncPath):
if not await self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
if isinstance(game_archive, str):
if isinstance(game_archive, str | Path):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, 'r')
if not self._hdiffpatch.get_hpatchz_executable():
@ -320,13 +332,13 @@ class Installer:
# hdiffpatch implementation
hdifffiles = []
for x in archive.read("hdifffiles.txt").decode().split("\n"):
for x in (await asyncio.to_thread(archive.read, "hdifffiles.txt")).decode().split("\n"):
if x:
hdifffiles.append(json.loads(x)["remoteName"])
patch_jobs = []
for file in hdifffiles:
current_game_file = self._gamedir.joinpath(file)
if not current_game_file.exists():
if not await current_game_file.exists():
# Not patching since we don't have the file
continue
@ -341,10 +353,11 @@ class Installer:
patch_path, wait=True)
patch_path.unlink()
if proc.returncode != 0:
# Let the game redownload the file.
# Let the game download the file.
old_file.rename(old_file.with_suffix(old_suffix))
return
old_file.unlink()
files.remove(patch_file)
patch_jobs.append(extract_and_patch(current_game_file, patch_file))
@ -353,15 +366,16 @@ class Installer:
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
for file in deletefiles:
current_game_file = self._gamedir.joinpath(file)
if not current_game_file.exists():
if not await current_game_file.exists():
continue
if current_game_file.is_file():
current_game_file.unlink(missing_ok=True)
archive.extractall(self._gamedir, members=files)
await asyncio.to_thread(archive.extractall, self._gamedir, members=files)
archive.close()
# Update game version on local variable.
self._version = self.get_game_version()
self._version = await self.get_game_version()
self.set_version_config()
def set_version_config(self, version: str = None):
if not version:
@ -370,11 +384,11 @@ class Installer:
self._config.save()
async def download_full_game(self):
archive = await self._launcher.get_resource_info()
if archive is None:
resource = await self._launcher.get_resource_info()
if resource is None:
raise RuntimeError("Failed to fetch game resource info.")
archive_name = archive.game.latest.path.split("/")[-1]
await self._download_file(archive.game.latest.path, archive_name, archive.game.latest.size)
archive_name = resource.game.latest.path.split("/")[-1]
await self._download_file(resource.game.latest.path, archive_name, resource.game.latest.size)
async def download_full_voiceover(self, language: str):
archive = await self._launcher.get_resource_info()
@ -383,19 +397,57 @@ class Installer:
translated_lang = self.voiceover_lang_translate(language)
for vo in archive.game.latest.voice_packs:
if vo.language == translated_lang:
await self._download_file(vo.path, vo.name, vo.size)
await self._download_file(vo.path, vo.get_name(), vo.size)
async def download_game_update(self, from_version: str = None):
async def uninstall_game(self):
await asyncio.to_thread(shutil.rmtree, self._gamedir, ignore_errors=True)
async def _extract_game_file(self, archive: str | Path | AsyncPath):
if isinstance(archive, str | AsyncPath):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"'{archive}' not found")
with zipfile.ZipFile(archive, 'r') as f:
await asyncio.to_thread(f.extractall, path=self._gamedir)
async def apply_voiceover(self, voiceover_archive: str | Path):
# Since Voiceover packages are unclear about diff package or full package
# we will try to extract the voiceover package and apply it to the game
# making this function universal for both cases
if not await self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._gamedir}")
await self._extract_game_file(voiceover_archive)
async def install_game(self, game_archive: str | Path | AsyncPath, force_reinstall: bool = False):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
if await self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._gamedir}")
await self.uninstall_game()
await self._gamedir.mkdir(parents=True, exist_ok=True)
await self._extract_game_file(game_archive)
self._version = await self.get_game_version()
self.set_version_config()
async def _get_game_resource(self, from_version: str = None):
if not from_version:
if self._version:
from_version = self._version
else:
from_version = self._version = self.get_game_version()
from_version = self._version = await self.get_game_version()
if not from_version:
raise ValueError("No game version found")
version_info = await self._launcher.get_resource_info()
if version_info is None:
raise RuntimeError("Failed to fetch game resource info.")
game_resource = await self._launcher.get_resource_info()
if not game_resource:
raise ValueError("Could not fetch game resource")
return game_resource
async def download_game_update(self, from_version: str = None):
version_info = await self._get_game_resource()
if self._version == version_info.game.latest.version:
raise ValueError("Game is already up to date.")
diff_archive = await self.get_game_diff_archive(from_version)
@ -404,56 +456,17 @@ class Installer:
await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size)
async def download_voiceover_update(self, language: str, from_version: str = None):
if not from_version:
if self._version:
from_version = self._version
else:
from_version = self._version = self.get_game_version()
if not from_version:
raise ValueError("No game version found")
version_info = await self._launcher.get_resource_info()
if version_info is None:
raise RuntimeError("Failed to fetch game resource info.")
diff_archive = await self.get_voiceover_diff_archive(language, from_version)
if diff_archive is None:
raise ValueError("Voiceover diff archive is not available for this version, please reinstall.")
await self._download_file(diff_archive.path, diff_archive.name, diff_archive.size)
def uninstall_game(self):
shutil.rmtree(self._gamedir)
def install_game(self, game_archive: str | Path, force_reinstall: bool = False):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
if self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._gamedir}")
self.uninstall_game()
self._gamedir.mkdir(parents=True, exist_ok=True)
if isinstance(game_archive, str):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Install archive {game_archive} not found")
archive = zipfile.ZipFile(game_archive, 'r')
archive.extractall(self._gamedir)
archive.close()
async def get_voiceover_diff_archive(self, lang: str, from_version: str = None):
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
if self._version:
from_version = self._version
else:
from_version = self._version = self.get_game_version()
if not from_version:
raise ValueError("No game version found")
game_resource = await self._launcher.get_resource_info()
game_resource = await self._get_game_resource()
if not game_resource:
raise ValueError("Could not fetch game resource")
translated_lang = self.voiceover_lang_translate(lang)
@ -470,16 +483,48 @@ class Installer:
If from_version is not specified, it will be taken from the game version.
"""
if not from_version:
if self._version:
from_version = self._version
else:
from_version = self._version = self.get_game_version()
if not from_version:
raise ValueError("No game version found")
game_resource = await self._launcher.get_resource_info()
if not game_resource:
raise ValueError("Could not fetch game resource")
game_resource = await self._get_game_resource()
for v in game_resource.game.diffs:
if v.version == from_version:
return v
async def verify_from_pkg_version(self, pkg_version: AsyncPath, ignore_mismatch=False):
contents = await pkg_version.read_text()
async def calculate_md5(file_to_calculate):
async with AsyncPath(file_to_calculate).open("rb") as f:
file_hash = hashlib.md5()
while chunk := await f.read(self._download_chunk):
file_hash.update(chunk)
return file_hash.hexdigest()
async def verify_file(file_to_verify, md5):
file_md5 = await calculate_md5(file_to_verify)
if file_md5 == md5:
return None
if ignore_mismatch:
return file_to_verify, md5, file_md5
raise ValueError(f"MD5 does not match for {file_to_verify}, expected md5: {md5}, actual md5: {file_md5}")
verify_jobs = []
for content in contents.split("\r\n"):
if not content.strip():
continue
info = json.loads(content)
verify_jobs.append(verify_file(self._gamedir.joinpath(info["remoteName"]), info["md5"]))
verify_result = await asyncio.gather(*verify_jobs)
failed_files = []
for file in verify_result:
if file is not None:
failed_files.append(file)
return None if not failed_files else failed_files
async def verify_game(self, pkg_version: str | Path | AsyncPath = None, ignore_mismatch=False):
if pkg_version is None:
pkg_version = self._gamedir.joinpath("pkg_version")
return await self.verify_from_pkg_version(pkg_version, ignore_mismatch)
async def clear_cache(self):
await asyncio.to_thread(shutil.rmtree, self.temp_path, ignore_errors=True)

View File

@ -1,5 +1,8 @@
import aiohttp
import locale
from aiopath import AsyncPath
from worthless import constants
from pathlib import Path
from worthless.classes import launcher, installer
@ -60,8 +63,8 @@ class Launcher:
"launcher_id": "18",
"channel_id": "1"
}
self._lang = "zh-cn" # Use chinese language because this is Pooh version
if isinstance(gamedir, str):
self._lang = "zh-cn" # Use chinese language because this is chinese version
if isinstance(gamedir, str | AsyncPath):
gamedir = Path(gamedir)
self._gamedir = gamedir.resolve()

View File

@ -1,6 +1,8 @@
from configparser import ConfigParser
from pathlib import Path
from aiopath import AsyncPath
class LauncherConfig:
"""
@ -23,8 +25,8 @@ class LauncherConfig:
return config
def __init__(self, config_path, game_version=None, overseas=True):
if isinstance(config_path, str):
self.config_path = Path(config_path)
if isinstance(config_path, str | AsyncPath):
config_path = Path(config_path)
if not game_version:
game_version = "0.0.0"
self.config_path = config_path

View File

@ -1,5 +1,6 @@
import asyncio
from pathlib import Path
from aiopath import AsyncPath
class LinuxUtils:
@ -8,9 +9,12 @@ class LinuxUtils:
def __init__(self):
pass
async def _exec_command(self, args):
@staticmethod
async def _exec_command(args):
"""Execute a command using pkexec (friendly gui)
"""
if not await AsyncPath("/usr/bin/pkexec").exists():
raise FileNotFoundError("pkexec not found.")
rsp = await asyncio.create_subprocess_shell(args)
await rsp.wait()
match rsp.returncode:
@ -21,14 +25,14 @@ class LinuxUtils:
return rsp
async def write_text_to_file(self, text, file_path: str | Path):
async def write_text_to_file(self, text, file_path: str | Path | AsyncPath):
"""Write text to a file using pkexec (friendly gui)
"""
if isinstance(file_path, Path):
if isinstance(file_path, Path | AsyncPath):
file_path = str(file_path)
await self._exec_command('echo -e "{}" | pkexec tee {}'.format(text, file_path))
async def append_text_to_file(self, text, file_path: str | Path):
async def append_text_to_file(self, text, file_path: str | Path | AsyncPath):
"""Append text to a file using pkexec (friendly gui)
"""
if isinstance(file_path, Path):

View File

@ -1,11 +1,13 @@
import os
import platform
import tarfile
import appdirs
from pathlib import Path
import shutil
import aiohttp
import asyncio
from aiopath import AsyncPath
from worthless import constants
from worthless.launcher import Launcher
from worthless.installer import Installer
@ -26,16 +28,19 @@ except ImportError:
class Patcher:
def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True):
def __init__(self, gamedir: Path | AsyncPath | str = AsyncPath.cwd(), data_dir: str | Path | AsyncPath = None,
patch_url: str = None, overseas=True):
if isinstance(gamedir, str | Path):
gamedir = AsyncPath(gamedir)
self._gamedir = gamedir
self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://')
if not data_dir:
self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch")
self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Patcher")
self._appdirs = constants.APPDIRS
self._patch_path = AsyncPath(self._appdirs.user_data_dir).joinpath("Patch")
self._temp_path = AsyncPath(self._appdirs.user_cache_dir).joinpath("Patcher")
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
if isinstance(data_dir, str | Path):
data_dir = AsyncPath(data_dir)
self._patch_path = data_dir.joinpath("Patch")
self._temp_path = data_dir.joinpath("Temp/Patcher")
self._overseas = overseas
@ -43,7 +48,7 @@ class Patcher:
self._launcher = Launcher(self._gamedir, overseas=overseas)
match platform.system():
case "Linux":
self._linuxutils = linux.LinuxUtils()
self._linux = linux.LinuxUtils()
@staticmethod
async def _get(url, **kwargs) -> aiohttp.ClientResponse:
@ -76,15 +81,15 @@ class Patcher:
else:
return await archive.read()
async def _download_repo(self):
if shutil.which("git"):
if not self._patch_path.exists() or not self._patch_path.is_dir() \
or not self._patch_path.joinpath(".git").exists():
async def _download_repo(self, fallback=False):
if shutil.which("git") and not fallback:
if not await self._patch_path.is_dir() or not await self._patch_path.joinpath(".git").exists():
proc = await asyncio.create_subprocess_exec("git", "clone", self._patch_url, str(self._patch_path))
await proc.wait()
else:
proc = await asyncio.create_subprocess_exec("git", "pull", cwd=str(self._patch_path))
await proc.wait()
await proc.wait()
if proc.returncode != 0:
raise RuntimeError("Cannot download patch repository through git.")
else:
archive = await self._get_git_archive()
if not archive:
@ -120,24 +125,18 @@ class Patcher:
telemetry_url = constants.TELEMETRY_URL_LIST
else:
telemetry_url = constants.TELEMETRY_URL_CN_LIST
if optional:
telemetry_url |= constants.TELEMETRY_OPTIONAL_URL_LIST
unblocked_list = []
async with aiohttp.ClientSession() as session:
for url in telemetry_url:
try:
await session.get("https://" + url)
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
await session.get("https://" + url, timeout=15)
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError, asyncio.exceptions.TimeoutError):
continue
else:
unblocked_list.append(url)
if optional:
for url in constants.TELEMETRY_OPTIONAL_URL_LIST:
try:
await session.get("https://" + url)
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
continue
else:
unblocked_list.append(url)
return None if unblocked_list == [] else unblocked_list
return None if not unblocked_list else unblocked_list
async def block_telemetry(self, optional=False):
telemetry = await self.is_telemetry_blocked(optional)
@ -148,20 +147,15 @@ class Patcher:
telemetry_hosts += "0.0.0.0 " + url + "\n"
match platform.system():
case "Linux":
await self._linuxutils.append_text_to_file(telemetry_hosts, "/etc/hosts")
await self._linux.append_text_to_file(telemetry_hosts, "/etc/hosts")
return
# TODO: Windows and macOS
raise NotImplementedError("Platform not implemented.")
async def _patch_unityplayer_fallback(self):
# xdelta3-python doesn't work because it's outdated.
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
async def _patch_unityplayer_fallback(self, patch):
gamever = "".join((await self._installer.get_game_version()).split("."))
unity_path = self._gamedir.joinpath("UnityPlayer.dll")
unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak"))
await unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak"))
proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s",
str(self._gamedir.joinpath("UnityPlayer.dll.bak")),
str(self._patch_path.joinpath(
@ -169,16 +163,11 @@ class Patcher:
str(self._gamedir.joinpath("UnityPlayer.dll")), cwd=self._gamedir)
await proc.wait()
async def _patch_xlua_fallback(self):
# xdelta3-python doesn't work becuase it's outdated.
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
async def _patch_xlua_fallback(self, patch):
gamever = "".join((await self._installer.get_game_version()).split("."))
data_name = self._installer.get_game_data_name()
xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))
xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name)))
await xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name)))
proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s",
str(self._gamedir.joinpath(
"{}/Plugins/xlua.dll.bak".format(data_name))),
@ -189,12 +178,8 @@ class Patcher:
cwd=self._gamedir)
await proc.wait()
def _patch_unityplayer(self):
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
async def _patch_unityplayer(self, patch):
gamever = "".join((await self._installer.get_game_version()).split("."))
unity_path = self._gamedir.joinpath("UnityPlayer.dll")
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
patched_unity_bytes = xdelta3.decode(unity_path.read_bytes(), patch_bytes)
@ -202,9 +187,8 @@ class Patcher:
with Path(self._gamedir.joinpath("UnityPlayer.dll")).open("wb") as f:
f.write(patched_unity_bytes)
def _patch_xlua(self):
patch = "xlua_patch.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
async def _patch_xlua(self, patch):
gamever = "".join((await self._installer.get_game_version()).split("."))
data_name = self._installer.get_game_data_name()
xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
@ -213,13 +197,17 @@ class Patcher:
with Path(self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))).open("wb") as f:
f.write(patched_xlua_bytes)
def apply_xlua_patch(self, fallback=True):
async def apply_xlua_patch(self, fallback=True):
if self._overseas:
patch = "xlua_patch_os.vcdiff"
else:
patch = "xlua_patch_cn.vcdiff"
if NO_XDELTA3_MODULE or fallback:
asyncio.run(self._patch_xlua_fallback())
await self._patch_xlua_fallback(patch)
return
self._patch_xlua()
await self._patch_xlua(patch)
def apply_patch(self, crash_fix=False, fallback=True) -> None:
async def apply_patch(self, crash_fix=False, fallback=True) -> None:
"""
Patch the game (and optionally patch xLua if specified)
@ -228,25 +216,36 @@ class Patcher:
:return: None
"""
# Patch UnityPlayer.dll
if NO_XDELTA3_MODULE or fallback:
asyncio.run(self._patch_unityplayer_fallback())
# xdelta3-python doesn't work because it's outdated.
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
self._patch_unityplayer()
patch = "unityplayer_patch_cn.vcdiff"
patch_jobs = []
if NO_XDELTA3_MODULE or fallback:
patch_jobs.append(self._patch_unityplayer_fallback(patch))
else:
patch_jobs.append(self._patch_unityplayer(patch))
# Patch xLua.dll
if crash_fix:
self.apply_xlua_patch(fallback=fallback)
patch_jobs.append(self.apply_xlua_patch(fallback=fallback))
# Disable crash reporters
disable_files = [
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
]
for file in disable_files:
file_path = Path(file).resolve()
if file_path.exists():
file_path.rename(str(file_path) + ".bak")
async def disable_crashreporters():
disable_files = [
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
]
for file in disable_files:
file_path = Path(file).resolve()
if file_path.exists():
file_path.rename(str(file_path) + ".bak")
patch_jobs.append(disable_crashreporters())
await asyncio.gather(*patch_jobs)
@staticmethod
def _creation_date(file_path: Path):
async def _creation_date(file_path: AsyncPath):
"""
Try to get the date that a file was created, falling back to when it was
last modified if that isn't possible.
@ -255,7 +254,7 @@ class Patcher:
if platform.system() == 'Windows':
return os.path.getctime(file_path)
else:
stat = file_path.stat()
stat = await file_path.stat()
try:
return stat.st_birthtime
except AttributeError:
@ -263,11 +262,11 @@ class Patcher:
# so we'll settle for when its content was last modified.
return stat.st_mtime
def _revert_file(self, original_file: str, base_file: Path, ignore_error=False):
original_path = self._gamedir.joinpath(original_file + ".bak").resolve()
target_file = self._gamedir.joinpath(original_file).resolve()
if original_path.exists():
if abs(self._creation_date(base_file) - self._creation_date(original_path)) > 86400: # 24 hours
async def _revert_file(self, original_file: str, base_file: AsyncPath, ignore_error=False):
original_path = await self._gamedir.joinpath(original_file + ".bak").resolve()
target_file = await self._gamedir.joinpath(original_file).resolve()
if await original_path.exists():
if abs(await self._creation_date(base_file) - await self._creation_date(original_path)) > 86400: # 24 hours
if not ignore_error:
raise RuntimeError("{} is not for this game version.".format(original_path.name))
original_path.unlink(missing_ok=True)
@ -275,30 +274,36 @@ class Patcher:
target_file.unlink(missing_ok=True)
original_path.rename(target_file)
def revert_patch(self, ignore_errors=True) -> None:
async def revert_patch(self, ignore_errors=True) -> None:
"""
Revert the patch (and revert the login door crash fix if patched)
Revert the patch (and revert the xLua patch if patched)
:return: None
"""
game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry)
game_exec = self._gamedir.joinpath((await self._launcher.get_resource_info()).game.latest.entry)
revert_files = [
"UnityPlayer.dll",
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
self._installer.get_game_data_name() + "Plugins/xlua.dll",
]
revert_job = []
for file in revert_files:
self._revert_file(file, game_exec, ignore_errors)
revert_job.append(self._revert_file(file, game_exec, ignore_errors))
for file in ["launcher.bat", "mhyprot2_running.reg"]:
self._gamedir.joinpath(file).unlink(missing_ok=True)
revert_job.append(self._gamedir.joinpath(file).unlink(missing_ok=True))
def get_files(extensions):
async def get_files(extensions):
all_files = []
for ext in extensions:
all_files.extend(self._gamedir.glob(ext))
all_files.extend(await self._gamedir.glob(ext))
return all_files
files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log'))
files = await get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log'))
for file in files:
file.unlink(missing_ok=True)
revert_job.append(file.unlink(missing_ok=True))
await asyncio.gather(*revert_job)
async def clear_cache(self):
await asyncio.to_thread(shutil.rmtree, self._temp_path, ignore_errors=True)