diff --git a/.gitlab-ci/lava/exceptions.py b/.gitlab-ci/lava/exceptions.py new file mode 100644 index 00000000000..576924f2b7d --- /dev/null +++ b/.gitlab-ci/lava/exceptions.py @@ -0,0 +1,17 @@ +from datetime import timedelta + + +class MesaCIException(Exception): + pass + + +class MesaCITimeoutError(MesaCIException): + def __init__(self, *args, timeout_duration: timedelta) -> None: + super().__init__(*args) + self.timeout_duration = timeout_duration + + +class MesaCIRetryError(MesaCIException): + def __init__(self, *args, retry_count: int) -> None: + super().__init__(*args) + self.retry_count = retry_count \ No newline at end of file diff --git a/.gitlab-ci/lava/lava-submit.sh b/.gitlab-ci/lava/lava-submit.sh index ce919ac64fa..a3affe1be66 100755 --- a/.gitlab-ci/lava/lava-submit.sh +++ b/.gitlab-ci/lava/lava-submit.sh @@ -28,7 +28,7 @@ ci-fairy minio cp job-rootfs-overlay.tar.gz "minio://${JOB_ROOTFS_OVERLAY_PATH}" touch results/lava.log tail -f results/lava.log & -artifacts/lava/lava_job_submitter.py \ +PYTHONPATH=artifacts/ artifacts/lava/lava_job_submitter.py \ --dump-yaml \ --pipeline-info "$CI_JOB_NAME: $CI_PIPELINE_URL on $CI_COMMIT_REF_NAME ${CI_NODE_INDEX}/${CI_NODE_TOTAL}" \ --rootfs-url-prefix "https://${BASE_SYSTEM_HOST_PATH}" \ diff --git a/.gitlab-ci/lava/lava_job_submitter.py b/.gitlab-ci/lava/lava_job_submitter.py index 486fd1608da..cc5063b2727 100755 --- a/.gitlab-ci/lava/lava_job_submitter.py +++ b/.gitlab-ci/lava/lava_job_submitter.py @@ -31,13 +31,14 @@ import sys import time import traceback import urllib.parse -import xmlrpc +import xmlrpc.client from datetime import datetime, timedelta from os import getenv -from typing import Optional +from typing import Any, Optional import lavacli import yaml +from lava.exceptions import MesaCIException, MesaCIRetryError, MesaCITimeoutError from lavacli.utils import loader # Timeout in seconds to decide if the device from the dispatched LAVA job has @@ -176,7 +177,6 @@ def generate_lava_yaml(args): 'mkdir -p {}'.format(args.ci_project_dir), 'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.build_url, args.ci_project_dir), 'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url), - f'echo "export CI_JOB_JWT_FILE={args.jwt_file}" >> /set-job-env-vars.sh', # Putting CI_JOB name as the testcase name, it may help LAVA farm # maintainers with monitoring f"lava-test-case 'mesa-ci_{args.mesa_job_name}' --shell /init-stage2.sh", @@ -226,11 +226,7 @@ def _call_proxy(fn, *args): fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode)) -class MesaCIException(Exception): - pass - - -class LAVAJob(): +class LAVAJob: def __init__(self, proxy, definition): self.job_id = None self.proxy = proxy @@ -243,12 +239,12 @@ class LAVAJob(): self.last_log_time = datetime.now() def validate(self) -> Optional[dict]: - try: - return _call_proxy( - self.proxy.scheduler.jobs.validate, self.definition, True - ) - except MesaCIException: - return False + """Returns a dict with errors, if the validation fails. + + Returns: + Optional[dict]: a dict with the validation errors, if any + """ + return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True) def submit(self): try: @@ -261,12 +257,14 @@ class LAVAJob(): if self.job_id: self.proxy.scheduler.jobs.cancel(self.job_id) - def is_started(self): + def is_started(self) -> bool: waiting_states = ["Submitted", "Scheduling", "Scheduled"] - job_state = _call_proxy(self.proxy.scheduler.job_state, self.job_id) + job_state: dict[str, str] = _call_proxy( + self.proxy.scheduler.job_state, self.job_id + ) return job_state["job_state"] not in waiting_states - def get_logs(self): + def get_logs(self) -> list[str]: try: (finished, data) = _call_proxy( self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line @@ -278,8 +276,35 @@ class LAVAJob(): self.heartbeat() self.last_log_line += len(lines) return lines - except MesaCIException as mesa_exception: - fatal_err(f"Could not get LAVA job logs. Reason: {mesa_exception}") + except Exception as mesa_ci_err: + raise MesaCIException( + f"Could not get LAVA job logs. Reason: {mesa_ci_err}" + ) from mesa_ci_err + + +def find_exception_from_metadata(metadata, job_id): + if "result" not in metadata or metadata["result"] != "fail": + return + if "error_type" in metadata: + error_type = metadata["error_type"] + if error_type == "Infrastructure": + raise MesaCIException( + f"LAVA job {job_id} failed with Infrastructure Error. Retry." + ) + if error_type == "Job": + # This happens when LAVA assumes that the job cannot terminate or + # with mal-formed job definitions. As we are always validating the + # jobs, only the former is probable to happen. E.g.: When some LAVA + # action timed out more times than expected in job definition. + raise MesaCIException( + f"LAVA job {job_id} failed with JobError " + "(possible LAVA timeout misconfiguration/bug). Retry." + ) + if "case" in metadata and metadata["case"] == "validate": + raise MesaCIException( + f"LAVA job {job_id} failed validation (possible download error). Retry." + ) + return metadata def get_job_results(proxy, job_id, test_suite): @@ -288,16 +313,7 @@ def get_job_results(proxy, job_id, test_suite): results = yaml.load(results_yaml, Loader=loader(False)) for res in results: metadata = res["metadata"] - if "result" not in metadata or metadata["result"] != "fail": - continue - if "error_type" in metadata and metadata["error_type"] == "Infrastructure": - raise MesaCIException( - f"LAVA job {job_id} failed with Infrastructure Error. Retry." - ) - if "case" in metadata and metadata["case"] == "validate": - raise MesaCIException( - f"LAVA job {job_id} failed validation (possible download error). Retry." - ) + find_exception_from_metadata(metadata, job_id) results_yaml = _call_proxy( proxy.results.get_testsuite_results_yaml, job_id, test_suite @@ -347,11 +363,38 @@ def parse_lava_lines(new_lines) -> list[str]: return parsed_lines +def fetch_logs(job, max_idle_time): + # Poll to check for new logs, assuming that a prolonged period of + # silence means that the device has died and we should try it again + if datetime.now() - job.last_log_time > max_idle_time: + max_idle_time_min = max_idle_time.total_seconds() / 60 + print_log( + f"No log output for {max_idle_time_min} minutes; " + "assuming device has died, retrying" + ) + + raise MesaCITimeoutError( + f"LAVA job {job.job_id} does not respond for {max_idle_time_min} " + "minutes. Retry.", + timeout_duration=max_idle_time, + ) + + time.sleep(LOG_POLLING_TIME_SEC) + + new_lines = job.get_logs() + parsed_lines = parse_lava_lines(new_lines) + + for line in parsed_lines: + print_log(line) + + def follow_job_execution(job): try: job.submit() - except MesaCIException as mesa_exception: - fatal_err(f"Could not submit LAVA job. Reason: {mesa_exception}") + except Exception as mesa_ci_err: + raise MesaCIException( + f"Could not submit LAVA job. Reason: {mesa_ci_err}" + ) from mesa_ci_err print_log(f"Waiting for job {job.job_id} to start.") while not job.is_started(): @@ -362,24 +405,7 @@ def follow_job_execution(job): # Start to check job's health job.heartbeat() while not job.is_finished: - # Poll to check for new logs, assuming that a prolonged period of - # silence means that the device has died and we should try it again - if datetime.now() - job.last_log_time > max_idle_time: - print_log( - f"No log output for {max_idle_time} seconds; assuming device has died, retrying" - ) - - raise MesaCIException( - f"LAVA job {job.job_id} does not respond for {max_idle_time}. Retry." - ) - - time.sleep(LOG_POLLING_TIME_SEC) - - new_lines = job.get_logs() - parsed_lines = parse_lava_lines(new_lines) - - for line in parsed_lines: - print(line) + fetch_logs(job, max_idle_time) show_job_data(job) return get_job_results(job.proxy, job.job_id, "0_mesa") @@ -402,9 +428,9 @@ def retriable_follow_job(proxy, job_definition): finally: print_log(f"Finished executing LAVA job in the attempt #{attempt_no}") - fatal_err( - "Job failed after it exceeded the number of " - f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries." + raise MesaCIRetryError( + "Job failed after it exceeded the number of " f"{retry_count} retries.", + retry_count=retry_count, ) diff --git a/.gitlab-ci/tests/lava/helpers.py b/.gitlab-ci/tests/lava/helpers.py new file mode 100644 index 00000000000..dd6170b00dd --- /dev/null +++ b/.gitlab-ci/tests/lava/helpers.py @@ -0,0 +1,107 @@ +from contextlib import nullcontext as does_not_raise +from datetime import datetime, timedelta +from itertools import cycle +from typing import Callable, Generator, Iterable, Tuple, Union + +import yaml +from freezegun import freeze_time +from lava.utils.lava_log import ( + DEFAULT_GITLAB_SECTION_TIMEOUTS, + FALLBACK_GITLAB_SECTION_TIMEOUT, + LogSectionType, +) + + +def section_timeout(section_type: LogSectionType) -> int: + return int( + DEFAULT_GITLAB_SECTION_TIMEOUTS.get( + section_type, FALLBACK_GITLAB_SECTION_TIMEOUT + ).total_seconds() + ) + + +def create_lava_yaml_msg( + dt: Callable = datetime.now, msg="test", lvl="target" +) -> dict[str, str]: + return {"dt": str(dt()), "msg": msg, "lvl": lvl} + + +def jobs_logs_response(finished=False, msg=None, **kwargs) -> Tuple[bool, str]: + timed_msg = create_lava_yaml_msg(**kwargs) + logs = [timed_msg] if msg is None else msg + + return finished, yaml.safe_dump(logs) + + +def message_generator_new( + messages: dict[LogSectionType, Iterable[int]] +) -> Iterable[tuple[dict, Iterable[int]]]: + default = [1] + for section_type in LogSectionType: + delay = messages.get(section_type, default) + yield mock_lava_signal(section_type), delay + + +def message_generator(): + for section_type in LogSectionType: + yield mock_lava_signal(section_type) + + +def generate_n_logs( + n=0, + tick_fn: Union[Generator, Iterable[int], int] = 1, + message_fn=message_generator, +): + if isinstance(tick_fn, Generator): + tick_gen = tick_fn + elif isinstance(tick_fn, Iterable): + tick_gen = cycle(tick_fn) + else: + tick_gen = cycle((tick_fn,)) + + with freeze_time(datetime.now()) as time_travel: + tick_sec: int = next(tick_gen) + while True: + # Simulate a complete run given by message_fn + for msg in message_fn(): + yield jobs_logs_response(finished=False, msg=[msg]) + time_travel.tick(tick_sec) + + yield jobs_logs_response(finished=True) + + +def to_iterable(tick_fn): + if isinstance(tick_fn, Generator): + tick_gen = tick_fn + elif isinstance(tick_fn, Iterable): + tick_gen = cycle(tick_fn) + else: + tick_gen = cycle((tick_fn,)) + return tick_gen + + +def mock_logs( + messages={}, +): + with freeze_time(datetime.now()) as time_travel: + # Simulate a complete run given by message_fn + for msg, tick_list in message_generator_new(messages): + for tick_sec in tick_list: + yield jobs_logs_response(finished=False, msg=[msg]) + time_travel.tick(tick_sec) + + yield jobs_logs_response(finished=True) + + +def mock_lava_signal(type: LogSectionType) -> dict[str, str]: + return { + LogSectionType.TEST_CASE: create_lava_yaml_msg( + msg=" case", lvl="debug" + ), + LogSectionType.TEST_SUITE: create_lava_yaml_msg( + msg=" suite", lvl="debug" + ), + LogSectionType.LAVA_POST_PROCESSING: create_lava_yaml_msg( + msg="", lvl="target" + ), + }.get(type, create_lava_yaml_msg()) \ No newline at end of file diff --git a/.gitlab-ci/tests/test_lava_job_submitter.py b/.gitlab-ci/tests/test_lava_job_submitter.py index bd21958c075..d2f900a2794 100644 --- a/.gitlab-ci/tests/test_lava_job_submitter.py +++ b/.gitlab-ci/tests/test_lava_job_submitter.py @@ -32,6 +32,7 @@ from unittest.mock import MagicMock, patch import pytest import yaml from freezegun import freeze_time +from lava.exceptions import MesaCIException, MesaCIRetryError, MesaCITimeoutError from lava.lava_job_submitter import ( DEVICE_HANGING_TIMEOUT_SEC, NUMBER_OF_RETRIES_TIMEOUT_DETECTION, @@ -120,7 +121,7 @@ def frozen_time(mock_sleep): @pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError]) def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception): - with pytest.raises(exception): + with pytest.raises(MesaCIException): proxy = mock_proxy(side_effect=exception) job = LAVAJob(proxy, '') follow_job_execution(job) @@ -168,7 +169,7 @@ PROXY_SCENARIOS = { ), "timed out more times than retry attempts": ( generate_n_logs(n=4, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1), - pytest.raises(SystemExit), + pytest.raises(MesaCIRetryError), False, {}, ), @@ -211,7 +212,7 @@ PROXY_SCENARIOS = { ), "very long silence": ( generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000), - pytest.raises(SystemExit), + pytest.raises(MesaCIRetryError), False, {}, ),