ci/lava: Follow job execution via LogFollower

Now LogFollower is used to deal with the LAVA logs.

Moreover, this commit adds timeouts per Gitlab section, if a section
takes longer than expected, cancel the job and retry again.

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/16323>
This commit is contained in:
Guilherme Gallo 2022-04-04 11:26:17 -03:00 committed by Marge Bot
parent 2569d7d7df
commit aa26a6ab72
5 changed files with 300 additions and 62 deletions

View File

@ -49,9 +49,10 @@ from lava.exceptions import (
from lava.utils.lava_log import (
CONSOLE_LOG,
GitlabSection,
LogFollower,
LogSectionType,
fatal_err,
hide_sensitive_data,
parse_lava_lines,
print_log,
)
from lavacli.utils import loader
@ -73,6 +74,7 @@ NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT
# How many attempts should be made when a timeout happen during LAVA device boot.
NUMBER_OF_ATTEMPTS_LAVA_BOOT = int(getenv("LAVA_NUMBER_OF_ATTEMPTS_LAVA_BOOT", 3))
def generate_lava_yaml(args):
# General metadata and permissions, plus also inexplicably kernel arguments
values = {
@ -128,10 +130,7 @@ def generate_lava_yaml(args):
# skeleton test definition: only declaring each job as a single 'test'
# since LAVA's test parsing is not useful to us
setup_section = GitlabSection(
id="lava_setup", header="LAVA setup log", start_collapsed=True
)
run_steps = [f"printf '{setup_section.start()}'"]
run_steps = []
test = {
'timeout': { 'minutes': args.job_timeout },
'failure_retry': 1,
@ -182,7 +181,6 @@ def generate_lava_yaml(args):
'mkdir -p {}'.format(args.ci_project_dir),
'wget -S --progress=dot:giga -O- {} | tar -xz -C {}'.format(args.build_url, args.ci_project_dir),
'wget -S --progress=dot:giga -O- {} | tar -xz -C /'.format(args.job_rootfs_overlay_url),
f"printf '{setup_section.end()}'",
# Putting CI_JOB name as the testcase name, it may help LAVA farm
# maintainers with monitoring
@ -366,7 +364,7 @@ def show_job_data(job):
print("{}\t: {}".format(field, value))
def fetch_logs(job, max_idle_time) -> None:
def fetch_logs(job, max_idle_time, log_follower) -> None:
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now() - job.last_log_time > max_idle_time:
@ -393,7 +391,8 @@ def fetch_logs(job, max_idle_time) -> None:
else:
raise MesaCIParseException
parsed_lines = parse_lava_lines(new_log_lines)
log_follower.feed(new_log_lines)
parsed_lines = log_follower.flush()
for line in parsed_lines:
print_log(line)
@ -414,11 +413,21 @@ def follow_job_execution(job):
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
print_log(f"Job {job.job_id} started.")
gl = GitlabSection(
id="lava_boot",
header="LAVA boot",
type=LogSectionType.LAVA_BOOT,
start_collapsed=True,
)
print(gl.start())
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time)
with LogFollower(current_section=gl) as lf:
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time, lf)
show_job_data(job)
@ -505,6 +514,7 @@ def create_parser():
return parser
if __name__ == "__main__":
# given that we proxy from DUT -> LAVA dispatcher -> LAVA primary -> us ->
# GitLab runner -> GitLab primary -> user, safe to say we don't need any

View File

@ -31,9 +31,12 @@ import logging
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Optional, Pattern, Union
from lava.exceptions import MesaCITimeoutError
# Helper constants to colorize the job output
CONSOLE_LOG = {
"COLOR_GREEN": "\x1b[1;32;5;197m",
@ -45,38 +48,98 @@ CONSOLE_LOG = {
}
class LogSectionType(Enum):
LAVA_BOOT = auto()
TEST_SUITE = auto()
TEST_CASE = auto()
LAVA_POST_PROCESSING = auto()
FALLBACK_GITLAB_SECTION_TIMEOUT = timedelta(minutes=10)
DEFAULT_GITLAB_SECTION_TIMEOUTS = {
# Empirically, the devices boot time takes 3 minutes on average.
LogSectionType.LAVA_BOOT: timedelta(minutes=5),
# Test suite phase is where the initialization happens.
LogSectionType.TEST_SUITE: timedelta(minutes=5),
# Test cases may take a long time, this script has no right to interrupt
# them. But if the test case takes almost 1h, it will never succeed due to
# Gitlab job timeout.
LogSectionType.TEST_CASE: timedelta(minutes=60),
# LAVA post processing may refer to a test suite teardown, or the
# adjustments to start the next test_case
LogSectionType.LAVA_POST_PROCESSING: timedelta(minutes=5),
}
@dataclass
class GitlabSection:
id: str
header: str
type: LogSectionType
start_collapsed: bool = False
escape: str = "\x1b[0K"
colour: str = f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['COLOR_GREEN']}"
__start_time: Optional[datetime] = field(default=None, init=False)
__end_time: Optional[datetime] = field(default=None, init=False)
def get_timestamp(self) -> str:
unix_ts = datetime.timestamp(datetime.now())
@classmethod
def section_id_filter(cls, value) -> str:
return str(re.sub(r"[^\w_-]+", "-", value))
def __post_init__(self):
self.id = self.section_id_filter(self.id)
@property
def has_started(self) -> bool:
return self.__start_time is not None
@property
def has_finished(self) -> bool:
return self.__end_time is not None
def get_timestamp(self, time: datetime) -> str:
unix_ts = datetime.timestamp(time)
return str(int(unix_ts))
def section(self, marker: str, header: str) -> str:
def section(self, marker: str, header: str, time: datetime) -> str:
preamble = f"{self.escape}section_{marker}"
collapse = marker == "start" and self.start_collapsed
collapsed = "[collapsed=true]" if collapse else ""
section_id = f"{self.id}{collapsed}"
timestamp = self.get_timestamp()
timestamp = self.get_timestamp(time)
before_header = ":".join([preamble, timestamp, section_id])
colored_header = (
f"{self.colour}{header}{CONSOLE_LOG['RESET']}" if header else ""
)
colored_header = f"{self.colour}{header}\x1b[0m" if header else ""
header_wrapper = "\r" + f"{self.escape}{colored_header}"
return f"{before_header}{header_wrapper}"
def __enter__(self):
print(self.start())
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(self.end())
def start(self) -> str:
return self.section(marker="start", header=self.header)
assert not self.has_finished, "Starting an already finished section"
self.__start_time = datetime.now()
return self.section(marker="start", header=self.header, time=self.__start_time)
def end(self) -> str:
return self.section(marker="end", header="")
assert self.has_started, "Ending an uninitalized section"
self.__end_time = datetime.now()
assert (
self.__end_time >= self.__start_time
), "Section execution time will be negative"
return self.section(marker="end", header="", time=self.__end_time)
def delta_time(self) -> Optional[timedelta]:
if self.__start_time and self.__end_time:
return self.__end_time - self.__start_time
if self.has_started:
return datetime.now() - self.__start_time
return None
@dataclass(frozen=True)
@ -85,6 +148,7 @@ class LogSection:
level: str
section_id: str
section_header: str
section_type: LogSectionType
collapsed: bool = False
def from_log_line_to_section(
@ -97,6 +161,7 @@ class LogSection:
return GitlabSection(
id=section_id,
header=section_header,
type=self.section_type,
start_collapsed=self.collapsed,
)
@ -107,12 +172,14 @@ LOG_SECTIONS = (
level="debug",
section_id="{}",
section_header="test_case {}",
section_type=LogSectionType.TEST_CASE,
),
LogSection(
regex=re.compile(r".*<STARTRUN> (\S*)"),
level="debug",
section_id="{}",
section_header="test_suite {}",
section_type=LogSectionType.TEST_SUITE,
),
LogSection(
regex=re.compile(r"^<LAVA_SIGNAL_ENDTC ([^>]+)"),
@ -120,6 +187,7 @@ LOG_SECTIONS = (
section_id="post-{}",
section_header="Post test_case {}",
collapsed=True,
section_type=LogSectionType.LAVA_POST_PROCESSING,
),
)
@ -127,10 +195,21 @@ LOG_SECTIONS = (
@dataclass
class LogFollower:
current_section: Optional[GitlabSection] = None
sections: list[str] = field(default_factory=list)
collapsed_sections: tuple[str] = ("setup",)
timeout_durations: dict[LogSectionType, timedelta] = field(
default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
)
fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT
_buffer: list[str] = field(default_factory=list, init=False)
def __post_init__(self):
section_is_created = bool(self.current_section)
section_has_started = bool(
self.current_section and self.current_section.has_started
)
assert (
section_is_created == section_has_started
), "Can't follow logs beginning from uninitalized GitLab sections."
def __enter__(self):
return self
@ -141,8 +220,22 @@ class LogFollower:
for line in last_lines:
print(line)
def watchdog(self):
if not self.current_section:
return
timeout_duration = self.timeout_durations.get(
self.current_section.type, self.fallback_timeout
)
if self.current_section.delta_time() > timeout_duration:
raise MesaCITimeoutError(
f"Gitlab Section {self.current_section} has timed out",
timeout_duration=timeout_duration,
)
def clear_current_section(self):
if self.current_section:
if self.current_section and not self.current_section.has_finished:
self._buffer.append(self.current_section.end())
self.current_section = None
@ -161,9 +254,11 @@ class LogFollower:
self.update_section(new_section)
def feed(self, new_lines: list[dict[str, str]]) -> None:
self.watchdog()
for line in new_lines:
self.manage_gl_sections(line)
self._buffer.append(line)
if parsed_line := parse_lava_line(line):
self._buffer.append(parsed_line)
def flush(self) -> list[str]:
buffer = self._buffer
@ -221,30 +316,25 @@ def filter_debug_messages(line: dict[str, str]) -> bool:
return False
def parse_lava_lines(new_lines) -> list[str]:
parsed_lines: list[str] = []
for line in new_lines:
prefix = ""
def parse_lava_line(line) -> Optional[str]:
prefix = ""
suffix = ""
if line["lvl"] in ["results", "feedback"]:
return
elif line["lvl"] in ["warning", "error"]:
prefix = CONSOLE_LOG["COLOR_RED"]
suffix = CONSOLE_LOG["RESET"]
elif filter_debug_messages(line):
return
elif line["lvl"] == "input":
prefix = "$ "
suffix = ""
elif line["lvl"] == "target":
fix_lava_color_log(line)
fix_lava_gitlab_section_log(line)
if line["lvl"] in ["results", "feedback"]:
continue
elif line["lvl"] in ["warning", "error"]:
prefix = CONSOLE_LOG["COLOR_RED"]
suffix = CONSOLE_LOG["RESET"]
elif filter_debug_messages(line):
continue
elif line["lvl"] == "input":
prefix = "$ "
suffix = ""
elif line["lvl"] == "target":
fix_lava_color_log(line)
fix_lava_gitlab_section_log(line)
line = f'{prefix}{line["msg"]}{suffix}'
parsed_lines.append(line)
return parsed_lines
return f'{prefix}{line["msg"]}{suffix}'
def print_log(msg):

View File

@ -1,10 +1,23 @@
from contextlib import nullcontext as does_not_raise
from datetime import datetime, timedelta
from datetime import datetime
from itertools import cycle
from typing import Callable, Generator, Iterable, Tuple, Union
import yaml
from freezegun import freeze_time
from lava.utils.lava_log import (
DEFAULT_GITLAB_SECTION_TIMEOUTS,
FALLBACK_GITLAB_SECTION_TIMEOUT,
LogSectionType,
)
def section_timeout(section_type: LogSectionType) -> int:
return int(
DEFAULT_GITLAB_SECTION_TIMEOUTS.get(
section_type, FALLBACK_GITLAB_SECTION_TIMEOUT
).total_seconds()
)
def create_lava_yaml_msg(
@ -21,8 +34,6 @@ def generate_testsuite_result(
if extra is None:
extra = {}
return {"metadata": {"result": result, **metadata_extra}, "name": name}
def jobs_logs_response(
finished=False, msg=None, lvl="target", result=None
) -> Tuple[bool, str]:
@ -36,6 +47,19 @@ def jobs_logs_response(
return finished, yaml.safe_dump(logs)
def section_aware_message_generator(
messages: dict[LogSectionType, Iterable[int]]
) -> Iterable[tuple[dict, Iterable[int]]]:
default = [1]
for section_type in LogSectionType:
delay = messages.get(section_type, default)
yield mock_lava_signal(section_type), delay
def message_generator():
for section_type in LogSectionType:
yield mock_lava_signal(section_type)
def level_generator():
# Tests all known levels by default
@ -80,3 +104,28 @@ def to_iterable(tick_fn):
else:
tick_gen = cycle((tick_fn,))
return tick_gen
def mock_logs(messages={}, result="pass"):
with freeze_time(datetime.now()) as time_travel:
# Simulate a complete run given by message_fn
for msg, tick_list in section_aware_message_generator(messages):
for tick_sec in tick_list:
yield jobs_logs_response(finished=False, msg=[msg])
time_travel.tick(tick_sec)
yield jobs_logs_response(finished=True, result="pass")
def mock_lava_signal(type: LogSectionType) -> dict[str, str]:
return {
LogSectionType.TEST_CASE: create_lava_yaml_msg(
msg="<STARTTC> case", lvl="debug"
),
LogSectionType.TEST_SUITE: create_lava_yaml_msg(
msg="<STARTRUN> suite", lvl="debug"
),
LogSectionType.LAVA_POST_PROCESSING: create_lava_yaml_msg(
msg="<LAVA_SIGNAL_ENDTC case>", lvl="target"
),
}.get(type, create_lava_yaml_msg())

View File

@ -36,12 +36,15 @@ from lava.lava_job_submitter import (
follow_job_execution,
retriable_follow_job,
)
from lava.utils.lava_log import LogSectionType
from .lava.helpers import (
create_lava_yaml_msg,
generate_n_logs,
generate_testsuite_result,
jobs_logs_response,
mock_logs,
section_timeout,
)
NUMBER_OF_MAX_ATTEMPTS = NUMBER_OF_RETRIES_TIMEOUT_DETECTION + 1
@ -74,17 +77,43 @@ XMLRPC_FAULT = xmlrpc.client.Fault(0, "test")
PROXY_SCENARIOS = {
"finish case": (generate_n_logs(1), does_not_raise(), True, {}),
"works at last retry": (
generate_n_logs(n=NUMBER_OF_MAX_ATTEMPTS, tick_fn=[ DEVICE_HANGING_TIMEOUT_SEC + 1 ] * NUMBER_OF_RETRIES_TIMEOUT_DETECTION + [1]),
"boot works at last retry": (
mock_logs(
{
LogSectionType.LAVA_BOOT: [
section_timeout(LogSectionType.LAVA_BOOT) + 1
]
* NUMBER_OF_RETRIES_TIMEOUT_DETECTION
+ [1]
},
),
does_not_raise(),
True,
{},
),
"timed out more times than retry attempts": (
generate_n_logs(
n=NUMBER_OF_MAX_ATTEMPTS + 1, tick_fn=DEVICE_HANGING_TIMEOUT_SEC + 1
"post process test case took too long": pytest.param(
mock_logs(
{
LogSectionType.LAVA_POST_PROCESSING: [
section_timeout(LogSectionType.LAVA_POST_PROCESSING) + 1
]
* (NUMBER_OF_MAX_ATTEMPTS + 1)
},
),
pytest.raises(MesaCIRetryError),
True,
{},
marks=pytest.mark.xfail(
reason=(
"The time travel mock is not behaving as expected. "
"It makes a gitlab section end in the past when an "
"exception happens."
)
),
),
"timed out more times than retry attempts": (
generate_n_logs(n=4, tick_fn=9999999),
pytest.raises(MesaCIRetryError),
False,
{},
),
@ -150,15 +179,20 @@ PROXY_SCENARIOS = {
@pytest.mark.parametrize(
"side_effect, expectation, job_result, proxy_args",
"test_log, expectation, job_result, proxy_args",
PROXY_SCENARIOS.values(),
ids=PROXY_SCENARIOS.keys(),
)
def test_retriable_follow_job(
mock_sleep, side_effect, expectation, job_result, proxy_args, mock_proxy
mock_sleep,
test_log,
expectation,
job_result,
proxy_args,
mock_proxy,
):
with expectation:
proxy = mock_proxy(side_effect=side_effect, **proxy_args)
proxy = mock_proxy(side_effect=test_log, **proxy_args)
job: LAVAJob = retriable_follow_job(proxy, "")
assert job_result == (job.status == "pass")
@ -196,6 +230,7 @@ def test_simulate_a_long_wait_to_start_a_job(
assert delta_time.total_seconds() >= wait_time
CORRUPTED_LOG_SCENARIOS = {
"too much subsequent corrupted data": (
[(False, "{'msg': 'Incomplete}")] * 100 + [jobs_logs_response(True)],

View File

@ -22,13 +22,15 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from datetime import datetime
from datetime import datetime, timedelta
import pytest
import yaml
from lava.exceptions import MesaCITimeoutError
from lava.utils.lava_log import (
GitlabSection,
LogFollower,
LogSectionType,
filter_debug_messages,
fix_lava_color_log,
fix_lava_gitlab_section_log,
@ -66,8 +68,14 @@ GITLAB_SECTION_SCENARIOS = {
ids=GITLAB_SECTION_SCENARIOS.keys(),
)
def test_gitlab_section(method, collapsed, expectation):
gs = GitlabSection(id="my_first_section", header="my_header", start_collapsed=collapsed)
gs.get_timestamp = lambda: "mock_date"
gs = GitlabSection(
id="my_first_section",
header="my_header",
type=LogSectionType.TEST_CASE,
start_collapsed=collapsed,
)
gs.get_timestamp = lambda x: "mock_date"
gs.start()
result = getattr(gs, method)()
assert result == expectation
@ -274,3 +282,49 @@ LAVA_DEBUG_SPAM_MESSAGES = {
)
def test_filter_debug_messages(message, expectation):
assert filter_debug_messages(message) == expectation
WATCHDOG_SCENARIOS = {
"1 second before timeout": ({"seconds": -1}, does_not_raise()),
"1 second after timeout": ({"seconds": 1}, pytest.raises(MesaCITimeoutError)),
}
@pytest.mark.parametrize(
"timedelta_kwargs, exception",
WATCHDOG_SCENARIOS.values(),
ids=WATCHDOG_SCENARIOS.keys(),
)
def test_log_follower_watchdog(frozen_time, timedelta_kwargs, exception):
lines = [
{
"dt": datetime.now(),
"lvl": "debug",
"msg": "Received signal: <STARTTC> mesa-ci_iris-kbl-traces",
},
]
td = {LogSectionType.TEST_CASE: timedelta(minutes=1)}
lf = LogFollower(timeout_durations=td)
lf.feed(lines)
frozen_time.tick(
lf.timeout_durations[LogSectionType.TEST_CASE] + timedelta(**timedelta_kwargs)
)
lines = [create_lava_yaml_msg()]
with exception:
lf.feed(lines)
GITLAB_SECTION_ID_SCENARIOS = [
("a-good_name", "a-good_name"),
("spaces are not welcome", "spaces-are-not-welcome"),
("abc:amd64 1/3", "abc-amd64-1-3"),
]
@pytest.mark.parametrize("case_name, expected_id", GITLAB_SECTION_ID_SCENARIOS)
def test_gitlab_section_id(case_name, expected_id):
gl = GitlabSection(
id=case_name, header=case_name, type=LogSectionType.LAVA_POST_PROCESSING
)
assert gl.id == expected_id