worthless-launcher/worthless/game/_game.py

695 lines
24 KiB
Python

import asyncio
import hashlib
import json
import logging
import re
import shutil
import zipfile
from os import PathLike
from pathlib import Path
from worthless import constants, launcher
from worthless.classes import installer
from worthless.game.gameenums import (
Variant,
VoicepackArchiveType,
UpdateStatus,
InstallStatus,
VoicepackArchiveLanguage,
)
from worthless.game.hdiffpatch import HDiffPatch
from worthless.game.launcherconfig import LauncherConfig
_logger = logging.getLogger("worthless.Game")
_game_version_regex = re.compile(r"([1-9]+\.[0-9]+\.[0-9]+)_\d+_\d+")
def _calculate_md5(file: str | bytes | PathLike):
file = Path(file)
if not file.exists():
return ""
with file.open("rb") as f:
file_hash = hashlib.md5()
while chunk := f.read(8196):
file_hash.update(chunk)
return file_hash.hexdigest()
def _read_version_from_game_file(globalgamemanagers: Path | bytes) -> str | None:
"""
Reads the version from the globalgamemanagers file. (Data/globalgamemanagers)
This function uses `An Anime Game Launcher` method to read the version:
https://github.com/an-anime-team/an-anime-game-launcher/blob/main/src/ts/Game.ts#L26
Returns:
Version as string "x.x.x" or None if not found.
"""
if isinstance(globalgamemanagers, Path):
data = globalgamemanagers.read_text(encoding="ascii", errors="ignore")
else:
data = globalgamemanagers.decode(encoding="ascii", errors="ignore")
result = _game_version_regex.search(data)
if not result:
return
return result.group(1)
def parse_pkg_version(pkg_version: str | bytes | PathLike) -> dict:
"""
Parse a pkg_version file
Args:
pkg_version: The pkg_version file to be parsed
Returns:
A parsed pkg_version as a dict
{"<file 1>": "<md5">, "<file 2>": "<md5>", ...}
"""
contents = Path(pkg_version).read_text()
pkg_version = {}
for content in contents.split("\r\n"):
if not content.strip():
continue
info = json.loads(content)
pkg_version.update({info["remoteName"]: info["md5"]})
return pkg_version
def voicepack_lang_translate(lang: str, base_language="game") -> str | None:
"""
Translates the voicepack language to the language code used by the game (and reverse)
Args:
lang: Language to translate
base_language: Base language type (game/locale/both)
Returns:
Translated language code such as "en-us" or "English(US)"
"""
if base_language == "game" or base_language == "both":
match lang.lower():
case "english(us)":
return "en-us"
case "japanese":
return "ja-jp"
case "chinese":
return "zh-cn"
case "korean":
return "ko-kr"
if base_language == "locale" or base_language == "both":
match lang.lower().replace("_", "-"):
case "en-us":
return "English(US)"
case "ja-jp":
return "Japanese"
case "zh-cn":
return "Chinese"
case "ko-kr":
return "Korean"
def get_voicepack_archive_language(
archive: str | bytes | PathLike,
) -> VoicepackArchiveLanguage:
"""
Gets voicepack archive language.
Args:
archive: The voicepack archive to get type
Returns:
VoicepackArchiveLanguage
"""
if not isinstance(archive, Path):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"Voiceover archive {archive} not found")
with zipfile.ZipFile(archive, "r") as f:
for file in zipfile.Path(f).iterdir():
if file.name.startswith("Audio_") and file.name.endswith("_pkg_version"):
# Audio_<language>_pkg_version
return VoicepackArchiveLanguage(file.name.split("_")[1])
def get_voicepack_archive_type(archive: str | bytes | PathLike) -> VoicepackArchiveType:
"""
Gets voicepack archive type.
Args:
archive: The voicepack archive to get type
Returns:
VoicepackArchiveType.UPDATE for update archive, and VoicepackArchiveType.FULL for a full voicepack archive.
"""
vo_lang = get_voicepack_archive_language(archive)
with zipfile.ZipFile(archive, "r") as f:
archive_path = zipfile.Path(f)
files = f.read("Audio_{}_pkg_version".format(vo_lang)).decode().split("\n")
for file in files:
if (
file.strip()
and not archive_path.joinpath(json.loads(file)["remoteName"]).exists()
):
return VoicepackArchiveType.UPDATE
return VoicepackArchiveType.FULL
def verify_game_file(
file: str | bytes | PathLike,
game_dir: str | bytes | PathLike,
pkg_version: dict[str, str] | str | bytes | PathLike,
ignore_mismatch=True,
) -> (str, str, str) | None:
"""
Verifies a game file
It is strongly recommended to parse pkg_version with parse_pkg_version and use
that as pkg_version argument if you're planning to verify multiple game files
to save resource and verify faster.
Args:
file: File to verify (e.g. GenshinImpact_Data/sharedassets0.assets.resS)
game_dir: Game directory
pkg_version: pkg_version file to read MD5 from, if not specified it'll use
the file from the current game installation
ignore_mismatch: Do not raise exception if a file has mismatch MD5, instead
return as a (file, expected md5, actual md5) tuple
Returns:
None if the file is good, (file, expected md5, actual md5) if the file has mismatch md5
Raises:
FileNotFoundError if the file to verify doesn't exist
"""
game_dir = Path(game_dir)
file_full = game_dir.joinpath(file)
if not isinstance(pkg_version, dict):
pkg_version = parse_pkg_version(pkg_version=pkg_version)
if not pkg_version.get(file):
raise FileNotFoundError(file)
file_md5 = _calculate_md5(file_full)
if file_md5 == pkg_version[file]:
return None
if ignore_mismatch:
return file, pkg_version[file], file_md5
raise ValueError(
f"MD5 does not match for {file}, expected md5: {pkg_version[file]}, actual md5: {file_md5}"
)
async def verify_game_files(
game_dir: str | bytes | PathLike,
pkg_version: dict[str, str] | str | bytes | PathLike,
ignore_mismatch=True,
):
if not isinstance(game_dir, Path):
game_dir = Path(game_dir)
if not isinstance(pkg_version, dict):
pkg_version = parse_pkg_version(pkg_version=pkg_version)
# Wrapper for catching FileNotFoundError
def verify(f):
try:
result = verify_game_file(
file=f,
game_dir=game_dir,
pkg_version=pkg_version,
ignore_mismatch=ignore_mismatch,
)
except FileNotFoundError:
return
return result
verify_jobs = []
for file in pkg_version:
verify_jobs.append(asyncio.to_thread(verify, file))
verify_result = await asyncio.gather(*verify_jobs)
failed_files = []
for file in verify_result:
if file is not None:
failed_files.append(file)
return failed_files
class Game:
"""
Manages game & voicepacks installation.
This class handles installing and updating game & voicepacks installation.
Args:
game_dir: Game directory
variant: Game variant
cache_dir: Cache directory, if not specified it'll automatically detect your
system cache directory and use that.
"""
def __init__(
self,
game_dir: str | bytes | PathLike = None,
variant: Variant = None,
cache_dir: str | bytes | PathLike = None,
):
if not game_dir:
game_dir = Path.cwd()
if not isinstance(game_dir, Path):
game_dir = Path(game_dir)
self._game_dir = game_dir
if cache_dir:
if not isinstance(cache_dir, Path):
cache_dir = Path(cache_dir)
self._cache_path = cache_dir.joinpath("Installer/")
else:
self._cache_path = Path(constants.APPDIRS.user_cache_dir).joinpath(
"Installer"
)
Path(self._cache_path).mkdir(parents=True, exist_ok=True)
self._variant = variant
self._version = None
self._launcher = launcher.Launcher(variant=variant)
self._hdiffpatch = HDiffPatch(data_dir=cache_dir)
self._game_config_file = self._game_dir.joinpath("config.ini")
self._game_config = LauncherConfig(
self._game_config_file, game_version=self.version
)
def _read_version_from_config(self) -> str | None:
"""
Reads the version from config.ini
Returns:
Version as string "x.x.x" or None if not found.
"""
if not self._game_config_file.exists():
raise FileNotFoundError(f"Config file {self._game_config_file} not found")
return self._game_config.get("General", "game_version", fallback=None)
async def get_resource_info(self) -> installer.Resource:
"""
Alias for `launcher.get_resource_info()`
"""
return await self._launcher.get_resource_info()
async def get_server_game_info(self, pre_download: bool = False) -> installer.Game:
game_resource = await self.get_resource_info()
game = game_resource.game
if not pre_download:
return game
game = game_resource.pre_download_game
if game is None:
raise RuntimeError("Pre-download version is not available.")
@property
def version(self) -> str | None:
"""
Gets the current game version
Returns:
Version as string "x.x.x" or None if not found.
"""
if not self._version:
self._version = self.get_current_game_version()
return self._version
@property
def variant(self) -> Variant:
"""
Gets the current game variant
This property is a shorthand for get_current_game_variant()
Returns:
GameVariant
"""
if not self._variant:
self._version = self.get_current_game_variant()
return self._variant
@property
def cache_path(self) -> Path:
return self._cache_path
@staticmethod
def voicepack_lang_translate(lang: str, base_language="game") -> str | None:
"""
This function is an alias to worthless.game.voicepack_lang_translate
"""
return voicepack_lang_translate(lang=lang, base_language=base_language)
@staticmethod
def get_voicepack_archive_language(
archive: str | bytes | PathLike,
) -> VoicepackArchiveLanguage:
"""
This function is an alias to worthless.game.get_voicepack_archive_language
"""
return get_voicepack_archive_language(archive=archive)
@staticmethod
def get_voicepack_archive_type(
archive: str | bytes | PathLike,
) -> VoicepackArchiveType:
"""
This function is an alias to worthless.game.get_voicepack_archive_type
"""
return get_voicepack_archive_type(archive=archive)
def get_game_data_name(self) -> str:
"""
Gets the game data path name
Returns:
A string containing game data path (e.g. GenshinImpact_Data)
"""
match self.variant:
case Variant.INTERNATIONAL:
return "GenshinImpact_Data"
case Variant.CHINESE:
return "YuanShen_Data"
case Variant.BILIBILI:
return "YuanShen_Data"
def get_game_data_path(self) -> Path:
"""
Gets the game data path
Returns:
A Path object with the game data path.
"""
return self._game_dir.joinpath(self.get_game_data_name())
def get_archive_game_version(
self, game_archive: str | bytes | PathLike
) -> str | None:
"""
Gets the game version in the archive
Returns:
Version as string "x.x.x" or None if not found.
"""
game_archive = Path(game_archive)
if not game_archive.is_file():
raise FileNotFoundError(f"Game archive {game_archive} not found")
with zipfile.ZipFile(game_archive, "r") as f:
return _read_version_from_game_file(
f.read(self.get_game_data_name() + "globalgamemanagers")
)
def get_current_game_variant(self) -> Variant:
"""
Gets the current game variant
Returns:
GameVariant
"""
if self._game_dir.joinpath("GenshinImpact.exe").is_file():
return Variant.INTERNATIONAL
if not self._game_dir.joinpath("YuanShen.exe").is_file():
raise FileNotFoundError("Game installation not found.")
# We can't depend on get_game_data_name() because it depends on self._variant
# which depends on this function.
if self._game_dir.joinpath("YuanShen_Data/Plugins/PCGameSDK.dll").is_file():
return Variant.BILIBILI
return Variant.CHINESE
def get_current_game_version(self) -> str | None:
"""
Gets the current game version
Returns:
Version as string "x.x.x" or None if not found.
"""
globalgamemanagers = self.get_game_data_path().joinpath("./globalgamemanagers")
if not globalgamemanagers.exists():
try:
return self._read_version_from_config()
except FileNotFoundError:
return
return _read_version_from_game_file(globalgamemanagers)
def get_installed_voicepacks(self) -> list[str]:
"""
Returns a list of installed voicepacks.
:return: List of installed voicepacks
"""
voicepacks = []
for file in (
self.get_game_data_path()
.joinpath("StreamingAssets/Audio/GeneratedSoundBanks/Windows")
.iterdir()
):
if file.is_dir():
voicepacks.append(file.name)
return voicepacks
async def update_game(self, game_archive: str | bytes | PathLike, callback=None):
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
if not isinstance(game_archive, Path):
game_archive = Path(game_archive).resolve()
if not game_archive.exists():
raise FileNotFoundError(f"Update archive {game_archive} not found")
def call(status, f=None):
if not callback:
return
callback(status=status, file=f)
if not self._hdiffpatch.get_hpatchz_executable():
call(UpdateStatus.DOWNLOADING_PATCHER)
await self._hdiffpatch.download_latest_release()
archive = zipfile.ZipFile(game_archive, "r")
files = archive.namelist()
# Don't extract these files (they're useless and if the game isn't patched then it'll
# raise 31-4xxx error in-game)
call(UpdateStatus.PREPARING_PATCH)
for file in ["deletefiles.txt", "hdifffiles.txt"]:
try:
files.remove(file)
except ValueError:
pass
# hdiffpatch implementation
hdifffiles = []
for x in (
(await asyncio.to_thread(archive.read, "hdifffiles.txt"))
.decode()
.split("\n")
):
if x:
hdifffiles.append(json.loads(x)["remoteName"])
patch_jobs = []
for file in hdifffiles:
call(UpdateStatus.PREPARING_PATCH, f=file)
current_game_file = self._game_dir.joinpath(file)
if not current_game_file.exists():
# Not patching since we don't have the file
continue
patch_file = str(file) + ".hdiff"
async def extract_and_patch(old_file, diff_file):
call(UpdateStatus.PATCHING, f=old_file)
diff_path = self._cache_path.joinpath(diff_file)
if diff_path.is_file():
diff_path.unlink(missing_ok=True)
await asyncio.to_thread(archive.extract, diff_file, self._cache_path)
patch_path = self._cache_path.joinpath(diff_file)
old_suffix = old_file.suffix
old_file = await old_file.rename(old_file.with_suffix(".bak"))
proc = await self._hdiffpatch.patch_file(
old_file, old_file.with_suffix(old_suffix), patch_path, wait=True
)
patch_path.unlink()
if proc.returncode != 0:
# Let the game download the file.
await old_file.rename(old_file.with_suffix(old_suffix))
return
await old_file.unlink()
files.remove(patch_file)
patch_jobs.append(extract_and_patch(current_game_file, patch_file))
await asyncio.gather(*patch_jobs)
call(UpdateStatus.PREPARING_REMOVE_UNUSED)
deletefiles = archive.read("deletefiles.txt").decode().split("\n")
for file in deletefiles:
current_game_file = Path(self._game_dir.joinpath(file))
if not current_game_file.exists():
continue
if current_game_file.is_file():
call(UpdateStatus.REMOVING_UNUSED, f=file)
current_game_file.unlink(missing_ok=True)
call(UpdateStatus.EXTRACTING)
await asyncio.to_thread(archive.extractall, self._game_dir, members=files)
archive.close()
# Update game version on local variable.
self._version = self.get_current_game_version()
self.set_version_config()
call(UpdateStatus.COMPLETED)
def set_version_config(self, version: str = None):
"""
Sets the version in config.ini
Args:
version: Version to be set, if not specified then it'll use the current game version
"""
if not version:
version = self.version
self._game_config.set_game_version(version)
self._game_config.save()
def _extract_game_file(self, archive: str | bytes | PathLike):
if not isinstance(archive, Path):
archive = Path(archive).resolve()
if not archive.exists():
raise FileNotFoundError(f"'{archive}' not found")
with zipfile.ZipFile(archive, "r") as f:
f.extractall(path=self._game_dir)
def apply_voicepack(self, archive: str | bytes | PathLike):
"""
Install or update a voicepack to the game
Args:
archive: The voicepack to install or update
"""
# Since voicepack packages are unclear about diff package or full package
# we will try to extract the voicepack package and apply it to the game
# making this function universal for both cases
if not self.get_game_data_path().exists():
raise FileNotFoundError(f"Game not found in {self._game_dir}")
self._extract_game_file(archive)
async def install_game(
self,
game_archive: str | bytes | PathLike,
force_reinstall: bool = False,
callback=None,
):
"""Installs the game to the current directory
If `force_reinstall` is True, the game will be uninstalled then reinstalled.
"""
def call(status):
if not callback:
return
callback(status=status)
if self.get_game_data_path().exists():
if not force_reinstall:
raise ValueError(f"Game is already installed in {self._game_dir}")
call(InstallStatus.REMOVING_OLD_GAME)
self.delete_game()
self._game_dir.mkdir(parents=True, exist_ok=True)
call(InstallStatus.EXTRACTING)
self._extract_game_file(game_archive)
self._version = self.get_current_game_version()
self.set_version_config()
call(InstallStatus.COMPLETED)
async def get_voicepack_diff_archive(
self, lang: str, from_version: str = None, pre_download=False
) -> installer.Diff:
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the current game version.
"""
from_version = from_version if from_version else self.version
game = await self.get_server_game_info(pre_download=pre_download)
translated_lang = voicepack_lang_translate(lang)
for v in game.diffs:
if v.version != from_version:
continue
for vo in v.voice_packs:
if vo.language == translated_lang:
return vo
async def get_game_diff_archive(
self, from_version: str = None, pre_download=False
) -> installer.Diff:
"""Gets a diff archive from `from_version` to the latest one
If from_version is not specified, it will be taken from the current game version.
"""
from_version = from_version if from_version else self.version
game = await self.get_server_game_info(pre_download=pre_download)
for v in game.diffs:
if v.version == from_version:
return v
def verify_game_file(
self,
file: str | bytes | PathLike,
pkg_version: dict[str, str] | str | bytes | PathLike = None,
ignore_mismatch=True,
):
"""
Verifies a game file
It is strongly recommended to parse pkg_version with parse_pkg_version and use
that as pkg_version argument if you're planning to verify multiple game files
to save resource and verify faster.
Args:
file: The file to verify (e.g. GenshinImpact_Data/sharedassets0.assets.resS)
pkg_version: The pkg_version file to read MD5 from, if not specified it'll use
the file from the current game installation
ignore_mismatch: Do not raise exception if a file has mismatch MD5, instead
return as a (file, expected md5, actual md5) tuple
Returns:
None if the file is good, (file, expected md5, actual md5) if the file has mismatch md5
Raises:
FileNotFoundError if the file to verify doesn't exist
"""
if pkg_version is None:
pkg_version = self._game_dir.joinpath("pkg_version")
return verify_game_file(
file=file,
game_dir=self._game_dir,
pkg_version=pkg_version,
ignore_mismatch=ignore_mismatch,
)
async def verify_game_files(
self,
pkg_version: dict[str, str] | str | bytes | PathLike = None,
ignore_mismatch=True,
):
"""
Verifies the current game installation
Args:
pkg_version: The pkg_version file to read MD5 from, if not specified it'll use
the file from the current game installation
ignore_mismatch: Do not raise exception if a file has mismatch MD5, instead
return all of them as a list with (file, expected md5, actual md5)
Returns:
None if all files are good, a list with (file, expected md5, actual md5) for mismatch files
"""
if pkg_version is None:
pkg_version = self._game_dir.joinpath("pkg_version")
return await verify_game_files(
game_dir=self._game_dir,
pkg_version=pkg_version,
ignore_mismatch=ignore_mismatch,
)
def delete_game(self):
"""
Deletes the current game installation
"""
shutil.rmtree(self._game_dir, ignore_errors=True)
self._version = None
self._variant = None
def clear_cache(self):
"""
Clears the cache (e.g. downloaded game files)
"""
shutil.rmtree(self._cache_path, ignore_errors=True)