From 84a5ea422826c678fb36eaa294b172f6387f01f1 Mon Sep 17 00:00:00 2001 From: Guilherme Gallo Date: Wed, 16 Feb 2022 18:06:20 +0000 Subject: [PATCH] ci/lava: Encapsulate job data in a class Less free-form passing stuff around, and also makes it easier to implement log-based following in future. The new class has: - job log polling: This allows us to get rid of some more function-local state; the job now contains where we are, and the timeout etc is localised within the thing polling it. - has-started detection into job class - heartbeat logic to update the job instance state with the start time when the submitter begins to track the logs from the LAVA device Besides: - Split LAVA jobs and Mesa CI policy - Update unit tests with LAVAJob class Signed-off-by: Guilherme Gallo Part-of: --- .gitlab-ci/lava/lava_job_submitter.py | 199 ++++++++++++-------- .gitlab-ci/tests/test_lava_job_submitter.py | 51 +++-- 2 files changed, 153 insertions(+), 97 deletions(-) diff --git a/.gitlab-ci/lava/lava_job_submitter.py b/.gitlab-ci/lava/lava_job_submitter.py index 1f9eb8bffcf..406a0dd3095 100755 --- a/.gitlab-ci/lava/lava_job_submitter.py +++ b/.gitlab-ci/lava/lava_job_submitter.py @@ -31,7 +31,6 @@ import time import traceback import urllib.parse import xmlrpc - from datetime import datetime, timedelta from os import getenv @@ -211,6 +210,62 @@ def _call_proxy(fn, *args): fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode)) +class MesaCIException(Exception): + pass + + +class LAVAJob(): + def __init__(self, proxy, definition): + self.job_id = None + self.proxy = proxy + self.definition = definition + self.last_log_line = 0 + self.last_log_time = None + self.is_finished = False + + def heartbeat(self): + self.last_log_time = datetime.now() + + def validate(self): + try: + return _call_proxy( + self.proxy.scheduler.jobs.validate, self.definition, True + ) + except MesaCIException: + return False + + def submit(self): + try: + self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition) + except MesaCIException: + return False + return True + + def cancel(self): + if self.job_id: + self.proxy.scheduler.jobs.cancel(self.job_id) + + def is_started(self): + waiting_states = ["Submitted", "Scheduling", "Scheduled"] + job_state = _call_proxy(self.proxy.scheduler.job_state, self.job_id) + return job_state["job_state"] not in waiting_states + + def get_logs(self): + try: + (finished, data) = _call_proxy( + self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line + ) + lines = yaml.load(str(data), Loader=loader(False)) + self.is_finished = finished + if not lines: + return [] + self.heartbeat() + self.last_log_line += len(lines) + return lines + except MesaCIException as mesa_exception: + fatal_err(f"Could not get LAVA job logs. Reason: {mesa_exception}") + + def get_job_results(proxy, job_id, test_suite, test_case): # Look for infrastructure errors and retry if we see them. results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id) @@ -220,125 +275,103 @@ def get_job_results(proxy, job_id, test_suite, test_case): if "result" not in metadata or metadata["result"] != "fail": continue if 'error_type' in metadata and metadata['error_type'] == "Infrastructure": - print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id)) - return False + raise MesaCIException("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id)) if 'case' in metadata and metadata['case'] == "validate": - print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id)) - return False + raise MesaCIException("LAVA job {} failed validation (possible download error). Retry.".format(job_id)) results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case) results = yaml.load(results_yaml, Loader=loader(False)) if not results: - fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case)) + raise MesaCIException("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case)) print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result'])) if results[0]['result'] != 'pass': - fatal_err("FAIL") + return False return True -def wait_until_job_is_started(proxy, job_id): - print_log(f"Waiting for job {job_id} to start.") - current_state = "Submitted" - waiting_states = ["Submitted", "Scheduling", "Scheduled"] - while current_state in waiting_states: - time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) - job_state = _call_proxy(proxy.scheduler.job_state, job_id) - current_state = job_state["job_state"] - print_log(f"Job {job_id} started.") - -def follow_job_execution(proxy, job_id): - line_count = 0 - finished = False - last_time_logs = datetime.now() - while not finished: - # `proxy.scheduler.jobs.logs` does not block, even when there is no - # new log to be fetched. To avoid dosing the LAVA dispatcher - # machine, let's add a sleep to save them some stamina. - time.sleep(LOG_POLLING_TIME_SEC) - - (finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count) - if logs := yaml.load(str(data), Loader=loader(False)): - # Reset the timeout - last_time_logs = datetime.now() - for line in logs: - print("{} {}".format(line["dt"], line["msg"])) - - line_count += len(logs) - else: - time_limit = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) - if datetime.now() - last_time_logs > time_limit: - print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id)) - return False - - return True - -def show_job_data(proxy, job_id): - show = _call_proxy(proxy.scheduler.jobs.show, job_id) +def show_job_data(job): + show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id) for field, value in show.items(): print("{}\t: {}".format(field, value)) -def validate_job(proxy, job_file): +def follow_job_execution(job): try: - return _call_proxy(proxy.scheduler.jobs.validate, job_file, True) - except: - return False + job.submit() + except MesaCIException as mesa_exception: + fatal_err(f"Could not submit LAVA job. Reason: {mesa_exception}") -def submit_job(proxy, job_file): - return _call_proxy(proxy.scheduler.jobs.submit, job_file) + print_log(f"Waiting for job {job.job_id} to start.") + while not job.is_started(): + time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC) + print_log(f"Job {job.job_id} started.") + + max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC) + # Start to check job's health + job.heartbeat() + while not job.is_finished: + # Poll to check for new logs, assuming that a prolonged period of + # silence means that the device has died and we should try it again + if datetime.now() - job.last_log_time > max_idle_time: + print_log( + f"No log output for {max_idle_time} seconds; assuming device has died, retrying" + ) + + raise MesaCIException( + f"LAVA job {job.job_id} does not respond for {max_idle_time}. Retry." + ) + + time.sleep(LOG_POLLING_TIME_SEC) + + new_lines = job.get_logs() + + for line in new_lines: + print(line) + + show_job_data(job) + return get_job_results(job.proxy, job.job_id, "0_mesa", "mesa") -def retriable_follow_job(proxy, yaml_file): +def retriable_follow_job(proxy, job_definition): retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION - while retry_count >= 0: - job_id = submit_job(proxy, yaml_file) + for attempt_no in range(1, retry_count + 2): + job = LAVAJob(proxy, job_definition) + try: + return follow_job_execution(job) + except MesaCIException as mesa_exception: + print_log(mesa_exception) + job.cancel() + finally: + print_log(f"Finished executing LAVA job in the attempt #{attempt_no}") - print_log("LAVA job id: {}".format(job_id)) - - wait_until_job_is_started(proxy, job_id) - - if not follow_job_execution(proxy, job_id): - print_log(f"Job {job_id} has timed out. Cancelling it.") - # Cancel the job as it is considered unreachable by Mesa CI. - proxy.scheduler.jobs.cancel(job_id) - - retry_count -= 1 - continue - - show_job_data(proxy, job_id) - - if get_job_results(proxy, job_id, "0_mesa", "mesa") == True: - break - else: - # The script attempted all the retries. The job seemed to fail. - return False - - return True + fatal_err( + "Job failed after it exceeded the number of " + f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries." + ) def main(args): proxy = setup_lava_proxy() - yaml_file = generate_lava_yaml(args) + job_definition = generate_lava_yaml(args) if args.dump_yaml: - print(hide_sensitive_data(generate_lava_yaml(args))) + print("LAVA job definition (YAML):") + print(hide_sensitive_data(job_definition)) if args.validate_only: - ret = validate_job(proxy, yaml_file) + job = LAVAJob(proxy, job_definition) + ret = job.validate() if not ret: fatal_err("Error in LAVA job definition") print("LAVA job definition validated successfully") return - if not retriable_follow_job(proxy, yaml_file): - fatal_err( - "Job failed after it exceeded the number of" - f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries." - ) + ret = retriable_follow_job(proxy, job_definition) + sys.exit(ret) def create_parser(): diff --git a/.gitlab-ci/tests/test_lava_job_submitter.py b/.gitlab-ci/tests/test_lava_job_submitter.py index 0ed19efeea4..43896db20fe 100644 --- a/.gitlab-ci/tests/test_lava_job_submitter.py +++ b/.gitlab-ci/tests/test_lava_job_submitter.py @@ -25,23 +25,27 @@ import xmlrpc.client from contextlib import nullcontext as does_not_raise from datetime import datetime -from itertools import repeat -from typing import Tuple +from itertools import cycle, repeat +from typing import Iterable, Union, Generator, Tuple from unittest.mock import MagicMock, patch import pytest import yaml from freezegun import freeze_time from lava.lava_job_submitter import ( + NUMBER_OF_RETRIES_TIMEOUT_DETECTION, DEVICE_HANGING_TIMEOUT_SEC, follow_job_execution, hide_sensitive_data, retriable_follow_job, + LAVAJob ) +NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1 -def jobs_logs_response(finished=False, msg=None) -> Tuple[bool, str]: - timed_msg = {"dt": str(datetime.now()), "msg": "New message"} + +def jobs_logs_response(finished=False, msg=None, lvl="target") -> Tuple[bool, str]: + timed_msg = {"dt": str(datetime.now()), "msg": "New message", "lvl": lvl} logs = [timed_msg] if msg is None else msg return finished, yaml.safe_dump(logs) @@ -114,17 +118,35 @@ def frozen_time(mock_sleep): @pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError]) def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception): with pytest.raises(exception): - follow_job_execution(mock_proxy(side_effect=exception), "") + proxy = mock_proxy(side_effect=exception) + job = LAVAJob(proxy, '') + follow_job_execution(job) -def generate_n_logs(n=1, tick_sec=1): +def level_generator(): + # Tests all known levels by default + yield from cycle(( "results", "feedback", "warning", "error", "debug", "target" )) + +def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator): """Simulate a log partitionated in n components""" + level_gen = level_fn() + + if isinstance(tick_fn, Generator): + tick_gen = tick_fn + elif isinstance(tick_fn, Iterable): + tick_gen = cycle(tick_fn) + else: + tick_gen = cycle((tick_fn,)) + with freeze_time(datetime.now()) as time_travel: + tick_sec: int = next(tick_gen) while True: # Simulate a scenario where the target job is waiting for being started for _ in range(n - 1): + level: str = next(level_gen) + time_travel.tick(tick_sec) - yield jobs_logs_response(finished=False, msg=[]) + yield jobs_logs_response(finished=False, msg=[], lvl=level) time_travel.tick(tick_sec) yield jobs_logs_response(finished=True) @@ -136,23 +158,23 @@ XMLRPC_FAULT = xmlrpc.client.Fault(0, "test") PROXY_SCENARIOS = { "finish case": (generate_n_logs(1), does_not_raise(), True), "works at last retry": ( - generate_n_logs(n=3, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1), + generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS, tick_fn=[ DEVICE_HANGING_TIMEOUT_SEC + 1 ] * NUMBER_OF_RETRIES_TIMEOUT_DETECTION + [1]), does_not_raise(), True, ), "timed out more times than retry attempts": ( - generate_n_logs(n=4, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1), - does_not_raise(), + generate_n_logs(n=4, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1), + pytest.raises(SystemExit), False, ), "long log case, no silence": ( - generate_n_logs(n=1000, tick_sec=0), + generate_n_logs(n=1000, tick_fn=0), does_not_raise(), True, ), "very long silence": ( - generate_n_logs(n=4, tick_sec=100000), - does_not_raise(), + generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000), + pytest.raises(SystemExit), False, ), # If a protocol error happens, _call_proxy will retry without affecting timeouts @@ -181,7 +203,8 @@ def test_retriable_follow_job( mock_sleep, side_effect, expectation, has_finished, mock_proxy ): with expectation: - result = retriable_follow_job(mock_proxy(side_effect=side_effect), "") + proxy = mock_proxy(side_effect=side_effect) + result = retriable_follow_job(proxy, "") assert has_finished == result