import xdelta3 import tarfile import appdirs from pathlib import Path import shutil import aiohttp import asyncio from worthless import constants from worthless.launcher import Launcher from worthless.installer import Installer class Patcher: def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True): self._gamedir = gamedir self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://') if not data_dir: self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR) self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch") self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Patcher") else: if not isinstance(data_dir, Path): data_dir = Path(data_dir) self._patch_path = data_dir.joinpath("Patch") self._temp_path = data_dir.joinpath("Temp/Patcher") self._overseas = overseas self._installer = Installer(self._gamedir, overseas=overseas, data_dir=self._temp_path) self._launcher = Launcher(self._gamedir, overseas=overseas) @staticmethod async def _get(url, **kwargs) -> aiohttp.ClientResponse: async with aiohttp.ClientSession() as session: rsp = await session.get(url, **kwargs) rsp.raise_for_status() return rsp async def _get_git_archive(self, archive_format="tar.gz", branch="master"): """ Get the git archive of the patch repository. This supports Gitea API and also introduce workaround for https://notabug.org :return: Archive file in bytes """ # Replace http with https if self._patch_url.startswith('https://notabug.org'): archive_url = self._patch_url + '/archive/master.{}'.format(archive_format) return await (await self._get(archive_url)).read() try: url_split = self._patch_url.split('//') git_server = url_split[0] git_owner, git_repo = url_split[1].split('/') archive_url = git_server + '/api/v1/repos/{}/{}/archive/{}.{}'.format( git_owner, git_repo, branch, archive_format ) archive = await self._get(archive_url) except aiohttp.ClientResponseError: pass else: return await archive.read() async def _download_repo(self): if shutil.which("git"): if not self._patch_path.exists() or not self._patch_path.is_dir() \ or not self._patch_path.joinpath(".git").exists(): await asyncio.create_subprocess_exec("git", "clone", self._patch_url, str(self._patch_path)) else: await asyncio.create_subprocess_exec("git", "pull", cwd=str(self._patch_path)) else: archive = await self._get_git_archive() if not archive: raise RuntimeError("Cannot download patch repository") with tarfile.open(archive) as tar: tar.extractall(self._patch_path) def override_patch_url(self, url) -> None: """ Override the patch url. :param url: Patch repository url, the url must be a valid git repository. :return: None """ self._patch_url = url async def download_patch(self) -> None: """ If `git` exists, this will clone the patch git url and save it to a temporary directory. Else, this will download the patch from the patch url and save it to a temporary directory. (Not reliable) :return: None """ await self._download_repo() def _patch_unityplayer(self): if self._overseas: patch = "unityplayer_patch_os.vcdiff" else: patch = "unityplayer_patch_cn.vcdiff" gamever = "".join(self._installer.get_game_version().split(".")) unity_path = self._gamedir.joinpath("UnityPlayer.dll") patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes() patched_unity_bytes = xdelta3.decode(unity_path.read_bytes(), patch_bytes) unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak")) with Path(self._gamedir.joinpath("UnityPlayer.dll")).open("wb") as f: f.write(patched_unity_bytes) def _patch_xlua(self): patch = "xlua_patch.vcdiff" gamever = "".join(self._installer.get_game_version().split(".")) data_name = self._installer.get_game_data_name() xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name)) patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes() patched_xlua_bytes = xdelta3.decode(xlua_path.read_bytes(), patch_bytes) xlua_path.rename(self._gamedir.joinpath("xlua.dll.bak")) with Path(self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name)).open("wb") as f: f.write(patched_xlua_bytes) def apply_xlua_patch(self): self._patch_xlua() def apply_patch(self, crash_fix=False) -> None: """ Patch the game (and optionally patch xLua if specified) :param crash_fix: Whether to patch xLua or not :return: None """ self._patch_unityplayer() if crash_fix: self._patch_xlua() def _revert_file(self, original_file: str, base_file: Path, ignore_error=False): original_path = self._gamedir.joinpath(original_file + ".bak").resolve() target_file = self._gamedir.joinpath(original_file).resolve() if original_path.exists(): if abs(base_file.stat().st_mtime_ns - original_path.stat().st_mtime_ns) > 3600: if not ignore_error: raise RuntimeError("{} is not for this game version.".format(original_path.name)) original_path.unlink(missing_ok=True) else: target_file.unlink(missing_ok=True) original_path.rename(target_file) def revert_patch(self, ignore_errors=True) -> None: """ Revert the patch (and revert the login door crash fix if patched) :return: None """ game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry) revert_files = [ "UnityPlayer.dll", self._installer.get_game_data_name() + "upload_crash.exe", self._installer.get_game_data_name() + "Plugins/crashreport.exe", self._installer.get_game_data_name() + "Plugins/xlua.dll", ] for file in revert_files: self._revert_file(file, game_exec, ignore_errors) for file in ["launcher.bat", "mhyprot2_running.reg"]: self._gamedir.joinpath(file).unlink(missing_ok=True) def get_files(extensions): all_files = [] for ext in extensions: all_files.extend(self._gamedir.glob(ext)) return all_files files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log')) for file in files: file.unlink(missing_ok=True)