worthless-launcher/worthless/cli.py

593 lines
20 KiB
Python
Executable File

#!/usr/bin/python3
import argparse
import asyncio
import appdirs
from pathlib import Path
from worthless.launcher import Launcher
from worthless.game.game import GameManager as Installer
from worthless.patcher import Patcher
import worthless.constants as constants
class UI:
def __init__(
self,
gamedir: str,
noconfirm: bool,
tempdir: str | Path = None,
pre_download=False,
) -> None:
self._vo_version = None
self._noconfirm = noconfirm
self._gamedir = gamedir
self._launcher = Launcher(gamedir)
self._installer = Installer(gamedir, data_dir=tempdir)
self._patcher = Patcher(gamedir, data_dir=tempdir)
self._pre_download = pre_download
if self._pre_download:
print("Pre-download is enabled, use at your own risk!")
def _ask(self, question):
if self._noconfirm:
# Fake dialog
print(question + " [Y/n]:")
return True
# random text
answer = "kurumi3"
while answer.lower() not in ["y", "n", ""]:
answer = input(question + " [Y/n]: ")
return answer.lower() == "y" or answer == ""
def override_game_version(self, version: str):
self._installer._version = version
def override_voiceover_version(self, version: str):
self._vo_version = version
async def get_game_version(self):
print(await self._installer.get_game_version())
async def block_telemetry(self):
print("Checking for available telemetry to block...")
try:
await self._patcher.block_telemetry()
except ValueError:
print("No telemetry to block.")
else:
print("Telemetry blocked.")
async def check_telemetry(self):
block_status = await self._patcher.is_telemetry_blocked()
if not block_status:
print("Telemetry is blocked.")
else:
print("Telemetry is not blocked, you need to block these hosts below.")
for hosts in block_status:
print(hosts)
async def _update_from_archive(self, filepath):
print("Reverting patches if patched...")
await self._patcher.revert_patch(True)
print("Updating game from archive (This may takes some time)...")
await self._installer.update_game(filepath)
self._installer.set_version_config()
async def _install_from_archive(self, filepath, force_reinstall):
print("Installing game from archive (This may takes some time)...")
print(filepath)
await self._installer.install_game(filepath, force_reinstall)
self._installer.set_version_config()
async def _apply_voiceover_from_archive(self, filepath):
print("Applying voiceover from archive (This may takes some time)...")
print("Voiceover archive:", filepath)
await self._installer.apply_voiceover(filepath)
async def install_voiceover_from_file(self, filepath):
print(
"Archive voiceover language: {} ({})".format(
await self._installer.get_voiceover_archive_language(filepath),
"Full archive"
if await self._installer.get_voiceover_archive_type(filepath)
else "Update archive",
)
)
if not self._ask(
"Do you want to apply this voiceover pack? ({})".format(filepath)
):
print("Aborting apply process.")
return
await self._apply_voiceover_from_archive(filepath)
print("Voiceover applied successfully.")
async def revert_patch(self):
print("Reverting patches...")
await self._patcher.revert_patch(True)
print("Patches reverted.")
async def patch_game(self, login_fix: bool = False):
print("NOTE: Hereby you are violating the game's Terms of Service!")
print("Do not patch the game if you don't know what you are doing!")
if not self._ask(
"Do you want to patch the game? (This will overwrite your game files!)"
):
print("Aborting patch process.")
return
await self.block_telemetry()
print("Updating patches...")
await self._patcher.download_patch()
print("Patching game...")
await self._patcher.apply_patch(login_fix)
print("Game patched.")
print("Please refrain from sharing this project to public, thank you.")
async def install_from_file(self, filepath):
gamever = await self._installer.get_game_version()
print(
"Archive game version:",
await self._installer.get_game_archive_version(filepath),
)
if gamever:
print(
"Current game installation detected. ({})".format(
await self._installer.get_game_version()
)
)
if not self._ask("Do you want to update the game? ({})".format(filepath)):
print("Aborting update process.")
return
await self._update_from_archive(filepath)
print("Game updated successfully.")
else:
print("No game installation detected.")
if not self._ask("Do you want to install the game? ({})".format(filepath)):
print("Aborting installation process.")
return
await self._install_from_archive(filepath, False)
print("Game installed successfully.")
async def download_patch(self):
print("Downloading patches...")
await self._patcher.download_patch()
async def download_game(self):
print("Downloading full game (This will take a long time)...")
await self._installer.download_full_game(self._pre_download)
async def download_game_update(self):
print("Downloading game update (This will take a long time)...")
await self._installer.download_game_update(pre_download=self._pre_download)
async def download_voiceover(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
continue
print(
"Downloading voiceover pack for {} (This will take a long time)...".format(
lng
)
)
await self._installer.download_full_voiceover(
lng, pre_download=self._pre_download
)
async def download_voiceover_update(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
continue
print(
"Downloading voiceover update pack for {} (This will take a long time)...".format(
lng
)
)
await self._installer.download_voiceover_update(
lng, pre_download=self._pre_download
)
async def install_game(self, forced: bool = False):
res_info = await self._launcher.get_resource_info()
game = res_info.game
if self._pre_download:
game = res_info.pre_download_game
print("Latest game version: {}".format(game.latest.version))
if not self._ask("Do you want to install the game?"):
print("Aborting game installation process.")
return
await self.download_game()
print("Game archive:", game.latest.get_name())
await self._install_from_archive(
self._installer.temp_path.joinpath(game.latest.get_name()), forced
)
async def install_voiceover(self, languages: str):
res_info = await self._launcher.get_resource_info()
for lng in languages.split(" "):
for vo in res_info.game.latest.voice_packs:
if not self._installer.voiceover_lang_translate(lng) == vo.language:
continue
if not self._ask(
"Do you want to install this voiceover pack? ({})".format(lng)
):
print("Aborting voiceover installation process.")
break
print("Downloading voiceover pack (This will take a long time)...")
await self._installer.download_full_voiceover(
lng, pre_download=self._pre_download
)
await self._apply_voiceover_from_archive(
self._installer.temp_path.joinpath(vo.get_name())
)
async def update_game(self):
game_ver = await self._installer.get_game_version()
if not game_ver:
await self.install_game()
return
print("Current game installation detected: {}".format(game_ver))
diff_archive = await self._installer.get_game_diff_archive()
res_info = await self._launcher.get_resource_info()
game = res_info.game
if self._pre_download:
game = res_info.pre_download_game
if not diff_archive:
print("No game updates available.")
return
print("Latest game version: {}".format(game.latest.version))
if not self._ask("Do you want to update the game?"):
print("Aborting game update process.")
return
print("Downloading game update (This will take a long time)...")
await self._installer.download_game_update(pre_download=self._pre_download)
print("Installing game update...")
await self.install_from_file(
self._installer.temp_path.joinpath(diff_archive.get_name())
)
async def update_voiceover(self, languages: str | list):
if isinstance(languages, str):
languages = languages.split(" ")
game_ver = self._vo_version or await self._installer.get_game_version()
if not game_ver:
print("Couldn't detect current game installation, is game installed?")
return
installed_voiceovers = await self._installer.get_installed_voiceovers()
print(
f"Installed voiceovers: {None if installed_voiceovers == [] else ', '.join(installed_voiceovers)}"
)
for lng in languages:
if (
self._installer.voiceover_lang_translate(lng, "locale")
not in installed_voiceovers
):
await self.install_voiceover(lng)
continue
diff_archive = await self._installer.get_voiceover_diff_archive(
lng, pre_download=self._pre_download
)
if not diff_archive:
print("No voiceover updates available for {}.".format(lng))
continue
if not self._ask("Do you want to update this voiceover? ({})".format(lng)):
print("Aborting this voiceover language update process.")
continue
print("Downloading voiceover update (This may takes some time)...")
await self._installer.download_voiceover_update(
lng, pre_download=self._pre_download
)
print("Installing voiceover update for {}...".format(lng))
await self._apply_voiceover_from_archive(
self._installer.temp_path.joinpath(diff_archive.get_name())
)
async def update_game_voiceover(self, languages: str):
await self.update_game()
await self.update_voiceover(languages)
async def update_voiceover_all(self):
await self.update_voiceover(await self._installer.get_installed_voiceovers())
async def update_all(self):
await self.update_game()
await self.update_voiceover(await self._installer.get_installed_voiceovers())
async def verify_game(self):
game_ver = await self._installer.get_game_version()
if not game_ver:
print("Couldn't detect current game installation, is game installed?")
return
print("Verifying game contents... (This may takes a long time)")
failed_files = await self._installer.verify_game(ignore_mismatch=True)
if not failed_files:
print("All good.")
return
print("Some game files got corrupted (mismatch md5), uh oh.")
for file in failed_files:
print("{}: expected {}, actual {}".format(file[0], file[1], file[2]))
async def clear_cache(self):
if self._ask(
"Do you want to clear Installer cache (contains downloaded game files, etc)?"
):
await self._installer.clear_cache()
if self._ask(
"Do you want to clear Patcher cache (contains files used to patch)?"
):
await self._patcher.clear_cache()
async def main_async():
default_dirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
parser = argparse.ArgumentParser(
prog="worthless", description="A worthless launcher written in Python."
)
parser.add_argument(
"-D",
"--dir",
action="store",
type=Path,
default=Path.cwd(),
help="Specify the game directory (default current working directory)",
)
parser.add_argument(
"-W",
"--temporary-dir",
action="store",
type=Path,
default=None,
help="Specify the temporary directory (default {} and {})".format(
default_dirs.user_data_dir, default_dirs.user_cache_dir
),
)
parser.add_argument(
"-S",
"--install",
action="store_true",
help="Install/update the game (if not already installed, else do nothing)",
)
parser.add_argument(
"-U",
"--install-from-file",
action="store",
type=Path,
default=None,
help="Install the game from an archive (if not already installed, \
else update from archive)",
)
parser.add_argument(
"-Uv",
"--install-voiceover-from-file",
action="store",
type=Path,
default=None,
help="Install the voiceover from an archive (if not already installed, \
else update from archive)",
)
parser.add_argument(
"-Sp",
"--patch",
action="store_true",
help="Patch the game (if not already patched, else do nothing)",
)
parser.add_argument(
"--login-fix",
action="store_true",
help="Patch the game to fix login issues (if not already patched, else do nothing)",
)
parser.add_argument(
"-Sy",
"--update",
action="store",
type=str,
default="",
help="Update the game and specified voiceover pack only (or install if not found)",
)
parser.add_argument(
"-Sw",
"--download-game",
action="store_true",
help="Download the full game to the temporary directory",
)
parser.add_argument(
"-Swp",
"--download-patch",
action="store_true",
help="Download/Update the game patch to the temporary directory",
)
parser.add_argument(
"-Swv",
"--download-voiceover",
action="store",
type=str,
help="Download the full voiceover to the temporary directory",
)
parser.add_argument(
"-Syw",
"--download-game-update",
action="store",
type=str,
default="",
help="Download the game and the voiceover update to the temporary directory",
)
parser.add_argument(
"-Sywv",
"--download-voiceover-update",
action="store",
type=str,
help="Download the voiceover update to the temporary directory",
)
parser.add_argument(
"-Sv",
"--update-voiceover",
action="store",
type=str,
help="Update the voiceover pack only (or install if not found)",
)
parser.add_argument(
"-Syu",
"--update-all",
action="store_true",
help="Update the game and all installed voiceover packs (or install if not found)",
)
parser.add_argument(
"-Scc",
"--clear-cache",
action="store_true",
help="Clear cache used by worthless",
)
parser.add_argument(
"-Rs", "--remove", action="store_true", help="Remove the game (if installed)"
)
parser.add_argument(
"-Rp",
"--remove-patch",
action="store_true",
help="Revert the game patch (if patched)",
)
parser.add_argument(
"-Rv",
"--remove-voiceover",
action="store_true",
help="Remove a Voiceover pack (if installed)",
)
parser.add_argument(
"-V", "--verify", action="store_true", help="Verify the game installation"
)
parser.add_argument(
"--predownload",
action="store_true",
help="Download the game for the next update",
default=False,
)
parser.add_argument(
"--get-game-version", action="store_true", help="Get the current game version"
)
parser.add_argument(
"--no-overseas", action="store_true", help="Don't use overseas server"
)
parser.add_argument(
"--check-telemetry",
action="store_true",
help="Check for the telemetry information",
)
parser.add_argument(
"--from-ver",
action="store",
help="Override the detected game version",
type=str,
default=None,
)
parser.add_argument(
"--from-vo-ver",
action="store",
help="Override the detected game version for voiceover " "detection",
type=str,
default=None,
)
parser.add_argument(
"--noconfirm",
action="store_true",
help="Do not ask any for confirmation. (Ignored in interactive mode)",
)
args = parser.parse_args()
if args.temporary_dir:
args.temporary_dir.mkdir(parents=True, exist_ok=True)
ui = UI(args.dir, args.noconfirm, args.temporary_dir, args.predownload)
if args.install and args.update:
raise ValueError("Cannot specify both --install and --update arguments.")
if args.install_from_file and args.update:
raise ValueError(
"Cannot specify both --install-from-file and --update arguments."
)
if args.install_voiceover_from_file and args.update:
raise ValueError(
"Cannot specify both --install-voiceover-from-file and --update arguments."
)
if args.install_from_file and args.install:
raise ValueError(
"Cannot specify both --install-from-file and --install arguments."
)
if args.from_ver:
ui.override_game_version(args.from_ver)
if args.get_game_version:
await ui.get_game_version()
if args.check_telemetry:
await ui.check_telemetry()
if args.clear_cache:
await ui.clear_cache()
# Download
if args.download_game:
await ui.download_game()
if args.download_patch:
await ui.download_patch()
if args.download_voiceover:
await ui.download_voiceover(args.download_voiceover)
if args.download_game_update:
await ui.download_game_update()
if args.download_voiceover_update:
await ui.download_voiceover_update(args.download_voiceover_update)
# Install
if args.install:
await ui.install_game()
if args.install_from_file:
await ui.install_from_file(args.install_from_file)
if args.install_voiceover_from_file:
await ui.install_voiceover_from_file(args.install_voiceover_from_file)
# Update
if args.update_all:
await ui.update_all()
if args.update:
await ui.update_game_voiceover(args.update)
if args.update_voiceover:
await ui.update_voiceover(args.update_voiceover)
# Patch
if args.patch:
await ui.patch_game(args.login_fix)
if args.remove_patch:
await ui.revert_patch()
# Verify
if args.verify:
await ui.verify_game()
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()