worthless-launcher/worthless/patcher.py

290 lines
12 KiB
Python

import os
import platform
import tarfile
import appdirs
from pathlib import Path
import shutil
import aiohttp
import asyncio
from worthless import linux
from worthless import constants
from worthless.launcher import Launcher
from worthless.installer import Installer
NO_XDELTA3_MODULE = False
try:
import xdelta3
except ImportError:
NO_XDELTA3_MODULE = True
class Patcher:
def __init__(self, gamedir=Path.cwd(), data_dir: str | Path = None, patch_url: str = None, overseas=True):
self._gamedir = gamedir
self._patch_url = (patch_url if patch_url else constants.PATCH_GIT_URL).replace('http://', 'https://')
if not data_dir:
self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
self._patch_path = Path(self._appdirs.user_data_dir).joinpath("Patch")
self._temp_path = Path(self._appdirs.user_cache_dir).joinpath("Patcher")
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
self._patch_path = data_dir.joinpath("Patch")
self._temp_path = data_dir.joinpath("Temp/Patcher")
self._overseas = overseas
self._installer = Installer(self._gamedir, overseas=overseas, data_dir=data_dir)
self._launcher = Launcher(self._gamedir, overseas=overseas)
match platform.system():
case "Linux":
self._linuxutils = linux.LinuxUtils()
@staticmethod
async def _get(url, **kwargs) -> aiohttp.ClientResponse:
async with aiohttp.ClientSession() as session:
rsp = await session.get(url, **kwargs)
rsp.raise_for_status()
return rsp
async def _get_git_archive(self, archive_format="tar.gz", branch="master"):
"""
Get the git archive of the patch repository.
This supports Gitea API and also introduce workaround for https://notabug.org
:return: Archive file in bytes
"""
# Replace http with https
if self._patch_url.startswith('https://notabug.org'):
archive_url = self._patch_url + '/archive/master.{}'.format(archive_format)
return await (await self._get(archive_url)).read()
try:
url_split = self._patch_url.split('//')
git_server = url_split[0]
git_owner, git_repo = url_split[1].split('/')
archive_url = git_server + '/api/v1/repos/{}/{}/archive/{}.{}'.format(
git_owner, git_repo, branch, archive_format
)
archive = await self._get(archive_url)
except aiohttp.ClientResponseError:
pass
else:
return await archive.read()
async def _download_repo(self):
if shutil.which("git"):
if not self._patch_path.exists() or not self._patch_path.is_dir() \
or not self._patch_path.joinpath(".git").exists():
proc = await asyncio.create_subprocess_exec("git", "clone", self._patch_url, str(self._patch_path))
await proc.wait()
else:
proc = await asyncio.create_subprocess_exec("git", "pull", cwd=str(self._patch_path))
await proc.wait()
else:
archive = await self._get_git_archive()
if not archive:
raise RuntimeError("Cannot download patch repository")
with tarfile.open(archive) as tar:
tar.extractall(self._patch_path)
def override_patch_url(self, url) -> None:
"""
Override the patch url.
:param url: Patch repository url, the url must be a valid git repository.
:return: None
"""
self._patch_url = url
async def download_patch(self) -> None:
"""
If `git` exists, this will clone the patch git url and save it to a temporary directory.
Else, this will download the patch from the patch url and save it to a temporary directory. (Not reliable)
:return: None
"""
await self._download_repo()
async def is_telemetry_blocked(self):
"""
Check if the telemetry is blocked.
"""
if self._overseas:
telemetry_url = constants.TELEMETRY_URL_LIST
else:
telemetry_url = constants.TELEMETRY_URL_CN_LIST
unblocked_list = []
async with aiohttp.ClientSession() as session:
for url in telemetry_url:
try:
await session.get("https://" + url)
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
continue
else:
unblocked_list.append(url)
return None if unblocked_list == [] else unblocked_list
async def block_telemetry(self):
telemetry = await self.is_telemetry_blocked()
if not telemetry:
raise ValueError("All telemetry are blocked")
telemetry_hosts = "\n"
for url in telemetry:
telemetry_hosts += "0.0.0.0 " + url + "\n"
match platform.system():
case "Linux":
await self._linuxutils.append_text_to_file(telemetry_hosts, "/etc/hosts")
return
# TODO: Windows and macOS
raise NotImplementedError("Platform not implemented.")
async def _patch_unityplayer_fallback(self):
# xdelta3-python doesn't work because it's outdated.
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
unity_path = self._gamedir.joinpath("UnityPlayer.dll")
unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak"))
proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s",
str(self._gamedir.joinpath("UnityPlayer.dll.bak")),
str(self._patch_path.joinpath(
"{}/patch_files/{}".format(gamever, patch))),
str(self._gamedir.joinpath("UnityPlayer.dll")), cwd=self._gamedir)
await proc.wait()
async def _patch_xlua_fallback(self):
# xdelta3-python doesn't work becuase it's outdated.
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
data_name = self._installer.get_game_data_name()
xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))
xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name)))
proc = await asyncio.create_subprocess_exec("xdelta3", "-d", "-s",
str(self._gamedir.joinpath(
"{}/Plugins/xlua.dll.bak".format(data_name))),
str(self._patch_path.joinpath(
"{}/patch_files/{}".format(gamever, patch))),
str(self._gamedir.joinpath(
"{}/Plugins/xlua.dll".format(data_name))),
cwd=self._gamedir)
await proc.wait()
def _patch_unityplayer(self):
if self._overseas:
patch = "unityplayer_patch_os.vcdiff"
else:
patch = "unityplayer_patch_cn.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
unity_path = self._gamedir.joinpath("UnityPlayer.dll")
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
patched_unity_bytes = xdelta3.decode(unity_path.read_bytes(), patch_bytes)
unity_path.rename(self._gamedir.joinpath("UnityPlayer.dll.bak"))
with Path(self._gamedir.joinpath("UnityPlayer.dll")).open("wb") as f:
f.write(patched_unity_bytes)
def _patch_xlua(self):
patch = "xlua_patch.vcdiff"
gamever = "".join(self._installer.get_game_version().split("."))
data_name = self._installer.get_game_data_name()
xlua_path = self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))
patch_bytes = self._patch_path.joinpath("{}/patch_files/{}".format(gamever, patch)).read_bytes()
patched_xlua_bytes = xdelta3.decode(xlua_path.read_bytes(), patch_bytes)
xlua_path.rename(self._gamedir.joinpath("{}/Plugins/xlua.dll.bak".format(data_name)))
with Path(self._gamedir.joinpath("{}/Plugins/xlua.dll".format(data_name))).open("wb") as f:
f.write(patched_xlua_bytes)
def apply_xlua_patch(self, fallback=True):
if NO_XDELTA3_MODULE or fallback:
asyncio.run(self._patch_xlua_fallback())
return
self._patch_xlua()
def apply_patch(self, crash_fix=False, fallback=True) -> None:
"""
Patch the game (and optionally patch xLua if specified)
:param fallback:
:param crash_fix: Whether to patch xLua or not
:return: None
"""
# Patch UnityPlayer.dll
if NO_XDELTA3_MODULE or fallback:
asyncio.run(self._patch_unityplayer_fallback())
else:
self._patch_unityplayer()
# Patch xLua.dll
if crash_fix:
self.apply_xlua_patch(fallback=fallback)
# Disable crash reporters
disable_files = [
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
]
for file in disable_files:
file_path = Path(file).resolve()
if file_path.exists():
file_path.rename(str(file_path) + ".bak")
@staticmethod
def _creation_date(file_path: Path):
"""
Try to get the date that a file was created, falling back to when it was
last modified if that isn't possible.
See http://stackoverflow.com/a/39501288/1709587 for explanation.
"""
if platform.system() == 'Windows':
return os.path.getctime(file_path)
else:
stat = file_path.stat()
try:
return stat.st_birthtime
except AttributeError:
# We're probably on Linux. No easy way to get creation dates here,
# so we'll settle for when its content was last modified.
return stat.st_mtime
def _revert_file(self, original_file: str, base_file: Path, ignore_error=False):
original_path = self._gamedir.joinpath(original_file + ".bak").resolve()
target_file = self._gamedir.joinpath(original_file).resolve()
if original_path.exists():
if abs(self._creation_date(base_file) - self._creation_date(original_path)) > 86400: # 24 hours
if not ignore_error:
raise RuntimeError("{} is not for this game version.".format(original_path.name))
original_path.unlink(missing_ok=True)
else:
target_file.unlink(missing_ok=True)
original_path.rename(target_file)
def revert_patch(self, ignore_errors=True) -> None:
"""
Revert the patch (and revert the login door crash fix if patched)
:return: None
"""
game_exec = self._gamedir.joinpath(asyncio.run(self._launcher.get_resource_info()).game.latest.entry)
revert_files = [
"UnityPlayer.dll",
self._installer.get_game_data_name() + "upload_crash.exe",
self._installer.get_game_data_name() + "Plugins/crashreport.exe",
self._installer.get_game_data_name() + "Plugins/xlua.dll",
]
for file in revert_files:
self._revert_file(file, game_exec, ignore_errors)
for file in ["launcher.bat", "mhyprot2_running.reg"]:
self._gamedir.joinpath(file).unlink(missing_ok=True)
def get_files(extensions):
all_files = []
for ext in extensions:
all_files.extend(self._gamedir.glob(ext))
return all_files
files = get_files(('*.dxvk-cache', '*_d3d9.log', '*_d3d11.log', '*_dxgi.log'))
for file in files:
file.unlink(missing_ok=True)