# # Copyright OpenEmbedded Contributors # # SPDX-License-Identifier: MIT # import enum import os import re # A parser that can be used to identify weather a line is a test result or a section statement. class PtestParser(object): def __init__(self): self.results = {} self.sections = {} def parse(self, logfile): test_regex = {} test_regex['PASSED'] = re.compile(r"^PASS:(.+)") test_regex['FAILED'] = re.compile(r"^FAIL:([^(]+)") test_regex['SKIPPED'] = re.compile(r"^SKIP:(.+)") section_regex = {} section_regex['begin'] = re.compile(r"^BEGIN: .*/(.+)/ptest") section_regex['end'] = re.compile(r"^END: .*/(.+)/ptest") section_regex['duration'] = re.compile(r"^DURATION: (.+)") section_regex['exitcode'] = re.compile(r"^ERROR: Exit status is (.+)") section_regex['timeout'] = re.compile(r"^TIMEOUT: .*/(.+)/ptest") # Cache markers so we don't take the re.search() hit all the time. markers = ("PASS:", "FAIL:", "SKIP:", "BEGIN:", "END:", "DURATION:", "ERROR: Exit", "TIMEOUT:") def newsection(): return { 'name': "No-section", 'log': [] } current_section = newsection() with open(logfile, errors='replace') as f: for line in f: if not line.startswith(markers): current_section['log'].append(line) continue result = section_regex['begin'].search(line) if result: current_section['name'] = result.group(1) if current_section['name'] not in self.results: self.results[current_section['name']] = {} continue result = section_regex['end'].search(line) if result: if current_section['name'] != result.group(1): bb.warn("Ptest END log section mismatch %s vs. %s" % (current_section['name'], result.group(1))) if current_section['name'] in self.sections: bb.warn("Ptest duplicate section for %s" % (current_section['name'])) self.sections[current_section['name']] = current_section del self.sections[current_section['name']]['name'] current_section = newsection() continue result = section_regex['timeout'].search(line) if result: if current_section['name'] != result.group(1): bb.warn("Ptest TIMEOUT log section mismatch %s vs. %s" % (current_section['name'], result.group(1))) current_section['timeout'] = True continue for t in ['duration', 'exitcode']: result = section_regex[t].search(line) if result: current_section[t] = result.group(1) continue current_section['log'].append(line) for t in test_regex: result = test_regex[t].search(line) if result: try: self.results[current_section['name']][result.group(1).strip()] = t except KeyError: bb.warn("Result with no section: %s - %s" % (t, result.group(1).strip())) # Python performance for repeatedly joining long strings is poor, do it all at once at the end. # For 2.1 million lines in a log this reduces 18 hours to 12s. for section in self.sections: self.sections[section]['log'] = "".join(self.sections[section]['log']) return self.results, self.sections # Log the results as files. The file name is the section name and the contents are the tests in that section. def results_as_files(self, target_dir): if not os.path.exists(target_dir): raise Exception("Target directory does not exist: %s" % target_dir) for section in self.results: prefix = 'No-section' if section: prefix = section section_file = os.path.join(target_dir, prefix) # purge the file contents if it exists with open(section_file, 'w') as f: for test_name in sorted(self.results[section]): status = self.results[section][test_name] f.write(status + ": " + test_name + "\n") class LtpParser: """ Parse the machine-readable LTP log output into a ptest-friendly data structure. """ def parse(self, logfile): results = {} # Aaccumulate the duration here but as the log rounds quick tests down # to 0 seconds this is very much a lower bound. The caller can replace # the value. section = {"duration": 0, "log": ""} class LtpExitCode(enum.IntEnum): # Exit codes as defined in ltp/include/tst_res_flags.h TPASS = 0 # Test passed flag TFAIL = 1 # Test failed flag TBROK = 2 # Test broken flag TWARN = 4 # Test warning flag TINFO = 16 # Test information flag TCONF = 32 # Test not appropriate for configuration flag with open(logfile, errors="replace") as f: # Lines look like this: # tag=cfs_bandwidth01 stime=1689762564 dur=0 exit=exited stat=32 core=no cu=0 cs=0 for line in f: if not line.startswith("tag="): continue values = dict(s.split("=") for s in line.strip().split()) section["duration"] += int(values["dur"]) exitcode = int(values["stat"]) if values["exit"] == "exited" and exitcode == LtpExitCode.TCONF: # Exited normally with the "invalid configuration" code results[values["tag"]] = "SKIPPED" elif exitcode == LtpExitCode.TPASS: # Successful exit results[values["tag"]] = "PASSED" else: # Other exit results[values["tag"]] = "FAILED" return results, section # ltp Compliance log parsing class LtpComplianceParser(object): def __init__(self): self.results = {} self.section = {'duration': "", 'log': ""} def parse(self, logfile): test_regex = {} test_regex['FAILED'] = re.compile(r"FAIL") section_regex = {} section_regex['test'] = re.compile(r"^Executing") with open(logfile, errors='replace') as f: name = logfile result = "PASSED" for line in f: regex_result = section_regex['test'].search(line) if regex_result: name = line.split()[1].strip() regex_result = test_regex['FAILED'].search(line) if regex_result: result = "FAILED" self.results[name] = result for test in self.results: result = self.results[test] print (self.results) self.section['log'] = self.section['log'] + ("%s: %s\n" % (result.strip()[:-2], test.strip())) return self.results, self.section