ci/lava: Encapsulate job data in a class

Less free-form passing stuff around, and also makes it easier to
implement log-based following in future.

The new class has:
- job log polling: This allows us to get rid of some more function-local
  state; the job now contains where we are, and the timeout etc is
  localised within the thing polling it.
- has-started detection into job class
- heartbeat logic to update the job instance state with the start time
  when the submitter begins to track the logs from the LAVA device

Besides:

- Split LAVA jobs and Mesa CI policy
- Update unit tests with LAVAJob class

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/15938>
This commit is contained in:
Guilherme Gallo 2022-02-16 18:06:20 +00:00 committed by Marge Bot
parent b3ba448ba5
commit 84a5ea4228
2 changed files with 153 additions and 97 deletions

View File

@ -31,7 +31,6 @@ import time
import traceback
import urllib.parse
import xmlrpc
from datetime import datetime, timedelta
from os import getenv
@ -211,6 +210,62 @@ def _call_proxy(fn, *args):
fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
class MesaCIException(Exception):
pass
class LAVAJob():
def __init__(self, proxy, definition):
self.job_id = None
self.proxy = proxy
self.definition = definition
self.last_log_line = 0
self.last_log_time = None
self.is_finished = False
def heartbeat(self):
self.last_log_time = datetime.now()
def validate(self):
try:
return _call_proxy(
self.proxy.scheduler.jobs.validate, self.definition, True
)
except MesaCIException:
return False
def submit(self):
try:
self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
except MesaCIException:
return False
return True
def cancel(self):
if self.job_id:
self.proxy.scheduler.jobs.cancel(self.job_id)
def is_started(self):
waiting_states = ["Submitted", "Scheduling", "Scheduled"]
job_state = _call_proxy(self.proxy.scheduler.job_state, self.job_id)
return job_state["job_state"] not in waiting_states
def get_logs(self):
try:
(finished, data) = _call_proxy(
self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
)
lines = yaml.load(str(data), Loader=loader(False))
self.is_finished = finished
if not lines:
return []
self.heartbeat()
self.last_log_line += len(lines)
return lines
except MesaCIException as mesa_exception:
fatal_err(f"Could not get LAVA job logs. Reason: {mesa_exception}")
def get_job_results(proxy, job_id, test_suite, test_case):
# Look for infrastructure errors and retry if we see them.
results_yaml = _call_proxy(proxy.results.get_testjob_results_yaml, job_id)
@ -220,125 +275,103 @@ def get_job_results(proxy, job_id, test_suite, test_case):
if "result" not in metadata or metadata["result"] != "fail":
continue
if 'error_type' in metadata and metadata['error_type'] == "Infrastructure":
print_log("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
return False
raise MesaCIException("LAVA job {} failed with Infrastructure Error. Retry.".format(job_id))
if 'case' in metadata and metadata['case'] == "validate":
print_log("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
return False
raise MesaCIException("LAVA job {} failed validation (possible download error). Retry.".format(job_id))
results_yaml = _call_proxy(proxy.results.get_testcase_results_yaml, job_id, test_suite, test_case)
results = yaml.load(results_yaml, Loader=loader(False))
if not results:
fatal_err("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
raise MesaCIException("LAVA: no result for test_suite '{}', test_case '{}'".format(test_suite, test_case))
print_log("LAVA: result for test_suite '{}', test_case '{}': {}".format(test_suite, test_case, results[0]['result']))
if results[0]['result'] != 'pass':
fatal_err("FAIL")
return False
return True
def wait_until_job_is_started(proxy, job_id):
print_log(f"Waiting for job {job_id} to start.")
current_state = "Submitted"
waiting_states = ["Submitted", "Scheduling", "Scheduled"]
while current_state in waiting_states:
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
job_state = _call_proxy(proxy.scheduler.job_state, job_id)
current_state = job_state["job_state"]
print_log(f"Job {job_id} started.")
def follow_job_execution(proxy, job_id):
line_count = 0
finished = False
last_time_logs = datetime.now()
while not finished:
# `proxy.scheduler.jobs.logs` does not block, even when there is no
# new log to be fetched. To avoid dosing the LAVA dispatcher
# machine, let's add a sleep to save them some stamina.
time.sleep(LOG_POLLING_TIME_SEC)
(finished, data) = _call_proxy(proxy.scheduler.jobs.logs, job_id, line_count)
if logs := yaml.load(str(data), Loader=loader(False)):
# Reset the timeout
last_time_logs = datetime.now()
for line in logs:
print("{} {}".format(line["dt"], line["msg"]))
line_count += len(logs)
else:
time_limit = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
if datetime.now() - last_time_logs > time_limit:
print_log("LAVA job {} doesn't advance (machine got hung?). Retry.".format(job_id))
return False
return True
def show_job_data(proxy, job_id):
show = _call_proxy(proxy.scheduler.jobs.show, job_id)
def show_job_data(job):
show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
for field, value in show.items():
print("{}\t: {}".format(field, value))
def validate_job(proxy, job_file):
def follow_job_execution(job):
try:
return _call_proxy(proxy.scheduler.jobs.validate, job_file, True)
except:
return False
job.submit()
except MesaCIException as mesa_exception:
fatal_err(f"Could not submit LAVA job. Reason: {mesa_exception}")
def submit_job(proxy, job_file):
return _call_proxy(proxy.scheduler.jobs.submit, job_file)
print_log(f"Waiting for job {job.job_id} to start.")
while not job.is_started():
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
print_log(f"Job {job.job_id} started.")
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now() - job.last_log_time > max_idle_time:
print_log(
f"No log output for {max_idle_time} seconds; assuming device has died, retrying"
)
raise MesaCIException(
f"LAVA job {job.job_id} does not respond for {max_idle_time}. Retry."
)
time.sleep(LOG_POLLING_TIME_SEC)
new_lines = job.get_logs()
for line in new_lines:
print(line)
show_job_data(job)
return get_job_results(job.proxy, job.job_id, "0_mesa", "mesa")
def retriable_follow_job(proxy, yaml_file):
def retriable_follow_job(proxy, job_definition):
retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
while retry_count >= 0:
job_id = submit_job(proxy, yaml_file)
for attempt_no in range(1, retry_count + 2):
job = LAVAJob(proxy, job_definition)
try:
return follow_job_execution(job)
except MesaCIException as mesa_exception:
print_log(mesa_exception)
job.cancel()
finally:
print_log(f"Finished executing LAVA job in the attempt #{attempt_no}")
print_log("LAVA job id: {}".format(job_id))
wait_until_job_is_started(proxy, job_id)
if not follow_job_execution(proxy, job_id):
print_log(f"Job {job_id} has timed out. Cancelling it.")
# Cancel the job as it is considered unreachable by Mesa CI.
proxy.scheduler.jobs.cancel(job_id)
retry_count -= 1
continue
show_job_data(proxy, job_id)
if get_job_results(proxy, job_id, "0_mesa", "mesa") == True:
break
else:
# The script attempted all the retries. The job seemed to fail.
return False
return True
fatal_err(
"Job failed after it exceeded the number of "
f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
)
def main(args):
proxy = setup_lava_proxy()
yaml_file = generate_lava_yaml(args)
job_definition = generate_lava_yaml(args)
if args.dump_yaml:
print(hide_sensitive_data(generate_lava_yaml(args)))
print("LAVA job definition (YAML):")
print(hide_sensitive_data(job_definition))
if args.validate_only:
ret = validate_job(proxy, yaml_file)
job = LAVAJob(proxy, job_definition)
ret = job.validate()
if not ret:
fatal_err("Error in LAVA job definition")
print("LAVA job definition validated successfully")
return
if not retriable_follow_job(proxy, yaml_file):
fatal_err(
"Job failed after it exceeded the number of"
f"{NUMBER_OF_RETRIES_TIMEOUT_DETECTION} retries."
)
ret = retriable_follow_job(proxy, job_definition)
sys.exit(ret)
def create_parser():

View File

@ -25,23 +25,27 @@
import xmlrpc.client
from contextlib import nullcontext as does_not_raise
from datetime import datetime
from itertools import repeat
from typing import Tuple
from itertools import cycle, repeat
from typing import Iterable, Union, Generator, Tuple
from unittest.mock import MagicMock, patch
import pytest
import yaml
from freezegun import freeze_time
from lava.lava_job_submitter import (
NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
DEVICE_HANGING_TIMEOUT_SEC,
follow_job_execution,
hide_sensitive_data,
retriable_follow_job,
LAVAJob
)
NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
def jobs_logs_response(finished=False, msg=None) -> Tuple[bool, str]:
timed_msg = {"dt": str(datetime.now()), "msg": "New message"}
def jobs_logs_response(finished=False, msg=None, lvl="target") -> Tuple[bool, str]:
timed_msg = {"dt": str(datetime.now()), "msg": "New message", "lvl": lvl}
logs = [timed_msg] if msg is None else msg
return finished, yaml.safe_dump(logs)
@ -114,17 +118,35 @@ def frozen_time(mock_sleep):
@pytest.mark.parametrize("exception", [RuntimeError, SystemError, KeyError])
def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception):
with pytest.raises(exception):
follow_job_execution(mock_proxy(side_effect=exception), "")
proxy = mock_proxy(side_effect=exception)
job = LAVAJob(proxy, '')
follow_job_execution(job)
def generate_n_logs(n=1, tick_sec=1):
def level_generator():
# Tests all known levels by default
yield from cycle(( "results", "feedback", "warning", "error", "debug", "target" ))
def generate_n_logs(n=1, tick_fn: Union[Generator, Iterable[int], int]=1, level_fn=level_generator):
"""Simulate a log partitionated in n components"""
level_gen = level_fn()
if isinstance(tick_fn, Generator):
tick_gen = tick_fn
elif isinstance(tick_fn, Iterable):
tick_gen = cycle(tick_fn)
else:
tick_gen = cycle((tick_fn,))
with freeze_time(datetime.now()) as time_travel:
tick_sec: int = next(tick_gen)
while True:
# Simulate a scenario where the target job is waiting for being started
for _ in range(n - 1):
level: str = next(level_gen)
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=False, msg=[])
yield jobs_logs_response(finished=False, msg=[], lvl=level)
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=True)
@ -136,23 +158,23 @@ XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
PROXY_SCENARIOS = {
"finish case": (generate_n_logs(1), does_not_raise(), True),
"works at last retry": (
generate_n_logs(n=3, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS, tick_fn=[ DEVICE_HANGING_TIMEOUT_SEC + 1 ] * NUMBER_OF_RETRIES_TIMEOUT_DETECTION + [1]),
does_not_raise(),
True,
),
"timed out more times than retry attempts": (
generate_n_logs(n=4, tick_sec=DEVICE_HANGING_TIMEOUT_SEC + 1),
does_not_raise(),
generate_n_logs(n=4, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1),
pytest.raises(SystemExit),
False,
),
"long log case, no silence": (
generate_n_logs(n=1000, tick_sec=0),
generate_n_logs(n=1000, tick_fn=0),
does_not_raise(),
True,
),
"very long silence": (
generate_n_logs(n=4, tick_sec=100000),
does_not_raise(),
generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=100000),
pytest.raises(SystemExit),
False,
),
# If a protocol error happens, _call_proxy will retry without affecting timeouts
@ -181,7 +203,8 @@ def test_retriable_follow_job(
mock_sleep, side_effect, expectation, has_finished, mock_proxy
):
with expectation:
result = retriable_follow_job(mock_proxy(side_effect=side_effect), "")
proxy = mock_proxy(side_effect=side_effect)
result = retriable_follow_job(proxy, "")
assert has_finished == result