Add support for downloading HDiffPatch

This commit is contained in:
tretrauit 2022-02-27 18:22:21 +07:00
parent 992048ede3
commit 7c45133508
Signed by: tretrauit
GPG Key ID: 862760FF1903319E
2 changed files with 59 additions and 8 deletions

View File

@ -49,11 +49,13 @@ class HDiffPatch:
self._git_url = git_url
if not data_dir:
self._appdirs = appdirs.AppDirs(constants.APP_NAME, constants.APP_AUTHOR)
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("Tools/HDiffPatch")
self.temp_path = Path(self._appdirs.user_cache_dir).joinpath("HDiffPatch")
self.data_path = Path(self._appdirs.user_data_dir).joinpath("Tools/HDiffPatch")
else:
if not isinstance(data_dir, Path):
data_dir = Path(data_dir)
self.temp_path = data_dir.joinpath("Temp/Tools/HDiffPatch")
self.data_path = Path(data_dir).joinpath("Tools/HDiffPatch")
self.temp_path = data_dir.joinpath("Temp/HDiffPatch")
self.temp_path.mkdir(parents=True, exist_ok=True)
@staticmethod
@ -74,14 +76,15 @@ class HDiffPatch:
case "Darwin":
return "macos"
# Rip BSD they need to use Linux compatibility layer to run this (or use Wine if they prefer that)
raise RuntimeError("Unsupported platform")
def _get_hdiffpatch_exec(self, exec_name):
if shutil.which(exec_name):
return exec_name
if not any(self.temp_path.iterdir()):
if not any(self.data_path.iterdir()):
return None
platform_arch_path = self.temp_path.joinpath(self._get_platform_arch())
platform_arch_path = self.data_path.joinpath(self._get_platform_arch())
if platform_arch_path.joinpath(exec_name).exists():
return str(platform_arch_path.joinpath(exec_name))
return None
@ -92,6 +95,39 @@ class HDiffPatch:
def get_hdiffz_executable(self):
return self._get_hdiffpatch_exec("hdiffz")
async def _get_latest_release_info(self):
async with aiohttp.ClientSession() as session:
split = self._git_url.split("/")
repo = split[-1]
owner = split[-2]
rsp = await session.get("https://api.github.com/repos/{}/{}/releases/latest".format(owner, repo),
params={"Headers": "Accept: application/vnd.github.v3+json"})
rsp.raise_for_status()
for asset in await rsp.json()["assets"]:
if asset["name"].endswith(".zip") and not "linux" in asset["name"] and not "windows" in asset["name"] \
and not "macos" in asset["name"] and not "android" in asset["name"]:
return asset
async def get_latest_release_url(self):
asset = await self._get_latest_release_info()
return asset["browser_download_url"]
async def get_latest_release_name(self):
asset = await self._get_latest_release_info()
return asset["name"]
async def download_latest_release(self, extract=True):
url = await self.get_latest_release_url()
name = await self.get_latest_release_name()
if not url:
raise RuntimeError("Unable to find latest release")
await _download_file(url, name, self.temp_path, overwrite=True)
if not extract:
return
archive = zipfile.ZipFile(self.temp_path.joinpath(name))
archive.extractall(self.data_path)
archive.close()
class Installer:
def _read_version_from_config(self):

View File

@ -6,11 +6,18 @@ from pathlib import Path
import shutil
import aiohttp
import asyncio
from worthless import linux
from worthless import constants
from worthless.launcher import Launcher
from worthless.installer import Installer
match platform.system():
case "Linux":
from worthless import linux
case "Windows":
pass # TODO
case "Darwin":
pass # TODO
NO_XDELTA3_MODULE = False
try:
import xdelta3
@ -104,7 +111,7 @@ class Patcher:
"""
await self._download_repo()
async def is_telemetry_blocked(self):
async def is_telemetry_blocked(self, optional=False):
"""
Check if the telemetry is blocked.
@ -122,10 +129,18 @@ class Patcher:
continue
else:
unblocked_list.append(url)
if optional:
for url in constants.TELEMETRY_OPTIONAL_URL_LIST:
try:
await session.get("https://" + url)
except (aiohttp.ClientResponseError, aiohttp.ClientConnectorError):
continue
else:
unblocked_list.append(url)
return None if unblocked_list == [] else unblocked_list
async def block_telemetry(self):
telemetry = await self.is_telemetry_blocked()
async def block_telemetry(self, optional=False):
telemetry = await self.is_telemetry_blocked(optional)
if not telemetry:
raise ValueError("All telemetry are blocked")
telemetry_hosts = "\n"