mesa/bin/ci/ci_run_n_monitor.py

311 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright © 2020 - 2022 Collabora Ltd.
# Authors:
# Tomeu Vizoso <tomeu.vizoso@collabora.com>
# David Heidelberg <david.heidelberg@collabora.com>
#
# SPDX-License-Identifier: MIT
"""
Helper script to restrict running only required CI jobs
and show the job(s) logs.
"""
import argparse
import re
from subprocess import check_output
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from itertools import chain
from typing import Optional
import gitlab
from colorama import Fore, Style
from gitlab_common import get_gitlab_project, read_token, wait_for_pipeline
from gitlab_gql import GitlabGQL, create_job_needs_dag, filter_dag, print_dag
REFRESH_WAIT_LOG = 10
REFRESH_WAIT_JOBS = 6
URL_START = "\033]8;;"
URL_END = "\033]8;;\a"
STATUS_COLORS = {
"created": "",
"running": Fore.BLUE,
"success": Fore.GREEN,
"failed": Fore.RED,
"canceled": Fore.MAGENTA,
"manual": "",
"pending": "",
"skipped": "",
}
COMPLETED_STATUSES = ["success", "failed"]
def print_job_status(job) -> None:
"""It prints a nice, colored job status with a link to the job."""
if job.status == "canceled":
return
print(
STATUS_COLORS[job.status]
+ "🞋 job "
+ URL_START
+ f"{job.web_url}\a{job.name}"
+ URL_END
+ f" :: {job.status}"
+ Style.RESET_ALL
)
def print_job_status_change(job) -> None:
"""It reports job status changes."""
if job.status == "canceled":
return
print(
STATUS_COLORS[job.status]
+ "🗘 job "
+ URL_START
+ f"{job.web_url}\a{job.name}"
+ URL_END
+ f" has new status: {job.status}"
+ Style.RESET_ALL
)
def pretty_wait(sec: int) -> None:
"""shows progressbar in dots"""
for val in range(sec, 0, -1):
print(f"{val} seconds", end="\r")
time.sleep(1)
def monitor_pipeline(
project,
pipeline,
target_job: Optional[str],
dependencies,
force_manual: bool,
stress: bool,
) -> tuple[Optional[int], Optional[int]]:
"""Monitors pipeline and delegate canceling jobs"""
statuses = {}
target_statuses = {}
stress_succ = 0
stress_fail = 0
if target_job:
target_jobs_regex = re.compile(target_job.strip())
while True:
to_cancel = []
for job in pipeline.jobs.list(all=True, sort="desc"):
# target jobs
if target_job and target_jobs_regex.match(job.name):
if force_manual and job.status == "manual":
enable_job(project, job, True)
if stress and job.status in ["success", "failed"]:
if job.status == "success":
stress_succ += 1
if job.status == "failed":
stress_fail += 1
retry_job(project, job)
if (job.id not in target_statuses) or (
job.status not in target_statuses[job.id]
):
print_job_status_change(job)
target_statuses[job.id] = job.status
else:
print_job_status(job)
continue
# all jobs
if (job.id not in statuses) or (job.status not in statuses[job.id]):
print_job_status_change(job)
statuses[job.id] = job.status
# dependencies and cancelling the rest
if job.name in dependencies:
if job.status == "manual":
enable_job(project, job, False)
elif target_job and job.status not in [
"canceled",
"success",
"failed",
"skipped",
]:
to_cancel.append(job)
if target_job:
cancel_jobs(project, to_cancel)
if stress:
print(
"∑ succ: " + str(stress_succ) + "; fail: " + str(stress_fail),
flush=False,
)
pretty_wait(REFRESH_WAIT_JOBS)
continue
print("---------------------------------", flush=False)
if len(target_statuses) == 1 and {"running"}.intersection(
target_statuses.values()
):
return next(iter(target_statuses)), None
if {"failed", "canceled"}.intersection(target_statuses.values()):
return None, 1
if {"success", "manual"}.issuperset(target_statuses.values()):
return None, 0
pretty_wait(REFRESH_WAIT_JOBS)
def enable_job(project, job, target: bool) -> None:
"""enable manual job"""
pjob = project.jobs.get(job.id, lazy=True)
pjob.play()
if target:
jtype = "🞋 "
else:
jtype = "(dependency)"
print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL)
def retry_job(project, job) -> None:
"""retry job"""
pjob = project.jobs.get(job.id, lazy=True)
pjob.retry()
jtype = ""
print(Fore.MAGENTA + f"{jtype} job {job.name} manually enabled" + Style.RESET_ALL)
def cancel_job(project, job) -> None:
"""Cancel GitLab job"""
pjob = project.jobs.get(job.id, lazy=True)
pjob.cancel()
print(f"{job.name}")
def cancel_jobs(project, to_cancel) -> None:
"""Cancel unwanted GitLab jobs"""
if not to_cancel:
return
with ThreadPoolExecutor(max_workers=6) as exe:
part = partial(cancel_job, project)
exe.map(part, to_cancel)
def print_log(project, job_id) -> None:
"""Print job log into output"""
printed_lines = 0
while True:
job = project.jobs.get(job_id)
# GitLab's REST API doesn't offer pagination for logs, so we have to refetch it all
lines = job.trace().decode("raw_unicode_escape").splitlines()
for line in lines[printed_lines:]:
print(line)
printed_lines = len(lines)
if job.status in COMPLETED_STATUSES:
print(Fore.GREEN + f"Job finished: {job.web_url}" + Style.RESET_ALL)
return
pretty_wait(REFRESH_WAIT_LOG)
def parse_args() -> None:
"""Parse args"""
parser = argparse.ArgumentParser(
description="Tool to trigger a subset of container jobs "
+ "and monitor the progress of a test job",
epilog="Example: mesa-monitor.py --rev $(git rev-parse HEAD) "
+ '--target ".*traces" ',
)
parser.add_argument("--target", metavar="target-job", help="Target job")
parser.add_argument(
"--rev", metavar="revision", help="repository git revision (default: HEAD)"
)
parser.add_argument(
"--token",
metavar="token",
help="force GitLab token, otherwise it's read from ~/.config/gitlab-token",
)
parser.add_argument(
"--force-manual", action="store_true", help="Force jobs marked as manual"
)
parser.add_argument("--stress", action="store_true", help="Stresstest job(s)")
return parser.parse_args()
def find_dependencies(target_job: str, project_path: str, sha: str) -> set[str]:
gql_instance = GitlabGQL()
dag, _ = create_job_needs_dag(
gql_instance, {"projectPath": project_path.path_with_namespace, "sha": sha}
)
target_dep_dag = filter_dag(dag, target_job)
if not target_dep_dag:
print(Fore.RED + "The job(s) were not found in the pipeline." + Fore.RESET)
sys.exit(1)
print(Fore.YELLOW)
print("Detected job dependencies:")
print()
print_dag(target_dep_dag)
print(Fore.RESET)
return set(chain.from_iterable(target_dep_dag.values()))
if __name__ == "__main__":
try:
t_start = time.perf_counter()
args = parse_args()
token = read_token(args.token)
gl = gitlab.Gitlab(url="https://gitlab.freedesktop.org",
private_token=token,
retry_transient_errors=True)
cur_project = get_gitlab_project(gl, "mesa")
REV: str = args.rev
if not REV:
REV = check_output(['git', 'rev-parse', 'HEAD']).decode('ascii').strip()
print(f"Revision: {REV}")
pipe = wait_for_pipeline(cur_project, REV)
print(f"Pipeline: {pipe.web_url}")
deps = set()
if args.target:
print("🞋 job: " + Fore.BLUE + args.target + Style.RESET_ALL)
deps = find_dependencies(
target_job=args.target, sha=REV, project_path=cur_project
)
target_job_id, ret = monitor_pipeline(
cur_project, pipe, args.target, deps, args.force_manual, args.stress
)
if target_job_id:
print_log(cur_project, target_job_id)
t_end = time.perf_counter()
spend_minutes = (t_end - t_start) / 60
print(f"⏲ Duration of script execution: {spend_minutes:0.1f} minutes")
sys.exit(ret)
except KeyboardInterrupt:
sys.exit(1)