From 326ababfd620ae5ea29bf486b9d68ba3d60cad30 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Mon, 9 Jul 2018 15:20:34 +0000 Subject: oeqa: Add selftest parallelisation support This allows oe-selftest to take a -j option which specifies how much test parallelisation to use. Currently this is "module" based with each module being split and run in a separate build directory. Further splitting could be done but this seems a good compromise between test setup and parallelism. You need python-testtools and python-subunit installed to use this but only when the -j option is specified. See notes posted to the openedmbedded-architecture list for more details about the design choices here. Some of this functionality may make more sense in the oeqa core ultimately. Signed-off-by: Richard Purdie --- meta/lib/oeqa/core/context.py | 10 +- meta/lib/oeqa/core/runner.py | 24 ++- meta/lib/oeqa/core/utils/concurrencytest.py | 254 ++++++++++++++++++++++++++++ meta/lib/oeqa/selftest/context.py | 8 +- 4 files changed, 288 insertions(+), 8 deletions(-) create mode 100644 meta/lib/oeqa/core/utils/concurrencytest.py (limited to 'meta/lib/oeqa') diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py index 10481b44b6..8cdfbf834f 100644 --- a/meta/lib/oeqa/core/context.py +++ b/meta/lib/oeqa/core/context.py @@ -58,14 +58,20 @@ class OETestContext(object): modules_required, filters) self.suites = self.loader.discover() - def runTests(self, skips=[]): + def runTests(self, processes=None, skips=[]): self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True) # Dinamically skip those tests specified though arguments self.skipTests(skips) self._run_start_time = time.time() - result = self.runner.run(self.suites) + if processes: + from oeqa.core.utils.concurrencytest import ConcurrentTestSuite + + concurrent_suite = ConcurrentTestSuite(self.suites, processes) + result = self.runner.run(concurrent_suite) + else: + result = self.runner.run(self.suites) self._run_end_time = time.time() return result diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py index 219102c6b0..6adbe3827b 100644 --- a/meta/lib/oeqa/core/runner.py +++ b/meta/lib/oeqa/core/runner.py @@ -43,11 +43,17 @@ class OETestResult(_TestResult): super(OETestResult, self).__init__(*args, **kwargs) self.successes = [] + self.starttime = {} + self.endtime = {} + self.progressinfo = {} self.tc = tc self._tc_map_results() def startTest(self, test): + # May have been set by concurrencytest + if test.id() not in self.starttime: + self.starttime[test.id()] = time.time() super(OETestResult, self).startTest(test) def _tc_map_results(self): @@ -57,6 +63,12 @@ class OETestResult(_TestResult): self.tc._results['expectedFailures'] = self.expectedFailures self.tc._results['successes'] = self.successes + def stopTest(self, test): + self.endtime[test.id()] = time.time() + super(OETestResult, self).stopTest(test) + if test.id() in self.progressinfo: + print(self.progressinfo[test.id()]) + def logSummary(self, component, context_msg=''): elapsed_time = self.tc._run_end_time - self.tc._run_start_time self.tc.logger.info("SUMMARY:") @@ -141,12 +153,16 @@ class OETestResult(_TestResult): if hasattr(d, 'oeid'): oeid = d.oeid + t = "" + if case.id() in self.starttime and case.id() in self.endtime: + t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)" + if fail: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, desc)) + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), + oeid, desc, t)) else: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, 'UNKNOWN')) + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), + oeid, 'UNKNOWN', t)) class OEListTestsResult(object): def wasSuccessful(self): diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py new file mode 100644 index 0000000000..850586516a --- /dev/null +++ b/meta/lib/oeqa/core/utils/concurrencytest.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 +# +# Modified for use in OE by Richard Purdie, 2018 +# +# Modified by: Corey Goldberg, 2013 +# License: GPLv2+ +# +# Original code from: +# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) +# Copyright (C) 2005-2011 Canonical Ltd +# License: GPLv2+ + +import os +import sys +import traceback +import unittest +import subprocess +import testtools +import threading +import time +import io + +from queue import Queue +from itertools import cycle +from subunit import ProtocolTestCase, TestProtocolClient +from subunit.test_results import AutoTimingTestResultDecorator +from testtools import ThreadsafeForwardingResult, iterate_tests + +import bb.utils +import oe.path + +_all__ = [ + 'ConcurrentTestSuite', + 'fork_for_tests', + 'partition_tests', +] + +# +# Patch the version from testtools to allow access to _test_start and allow +# computation of timing information and threading progress +# +class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): + + def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): + super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) + self.threadnum = threadnum + self.totalinprocess = totalinprocess + self.totaltests = totaltests + + def _add_result_with_semaphore(self, method, test, *args, **kwargs): + self.semaphore.acquire() + try: + self.result.starttime[test.id()] = self._test_start.timestamp() + self.result.threadprogress[self.threadnum].append(test.id()) + totalprogress = sum(len(x) for x in self.result.threadprogress.values()) + self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( + self.threadnum, + len(self.result.threadprogress[self.threadnum]), + self.totalinprocess, + totalprogress, + self.totaltests, + "{0:.2f}".format(time.time()-self._test_start.timestamp()), + test.id()) + finally: + self.semaphore.release() + super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) + +# +# A dummy structure to add to io.StringIO so that the .buffer object +# is available and accepts writes. This allows unittest with buffer=True +# to interact ok with subunit which wants to access sys.stdout.buffer. +# +class dummybuf(object): + def __init__(self, parent): + self.p = parent + def write(self, data): + self.p.write(data.decode("utf-8")) + +# +# Taken from testtools.ConncurrencyTestSuite but modified for OE use +# +class ConcurrentTestSuite(unittest.TestSuite): + + def __init__(self, suite, processes): + super(ConcurrentTestSuite, self).__init__([suite]) + self.processes = processes + + def run(self, result): + tests, totaltests = fork_for_tests(self.processes, self) + try: + threads = {} + queue = Queue() + semaphore = threading.Semaphore(1) + result.threadprogress = {} + for i, (test, testnum) in enumerate(tests): + result.threadprogress[i] = [] + process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests) + # Force buffering of stdout/stderr so the console doesn't get corrupted by test output + # as per default in parent code + process_result.buffer = True + # We have to add a buffer object to stdout to keep subunit happy + process_result._stderr_buffer = io.StringIO() + process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) + process_result._stdout_buffer = io.StringIO() + process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) + reader_thread = threading.Thread( + target=self._run_test, args=(test, process_result, queue)) + threads[test] = reader_thread, process_result + reader_thread.start() + while threads: + finished_test = queue.get() + threads[finished_test][0].join() + del threads[finished_test] + except: + for thread, process_result in threads.values(): + process_result.stop() + raise + + def _run_test(self, test, process_result, queue): + try: + try: + test.run(process_result) + except Exception: + # The run logic itself failed + case = testtools.ErrorHolder( + "broken-runner", + error=sys.exc_info()) + case.run(process_result) + finally: + queue.put(test) + +def removebuilddir(d): + delay = 5 + while delay and os.path.exists(d + "/bitbake.lock"): + time.sleep(1) + delay = delay - 1 + bb.utils.prunedir(d) + +def fork_for_tests(concurrency_num, suite): + result = [] + test_blocks = partition_tests(suite, concurrency_num) + # Clear the tests from the original suite so it doesn't keep them alive + suite._tests[:] = [] + totaltests = sum(len(x) for x in test_blocks) + for process_tests in test_blocks: + numtests = len(process_tests) + process_suite = unittest.TestSuite(process_tests) + # Also clear each split list so new suite has only reference + process_tests[:] = [] + c2pread, c2pwrite = os.pipe() + # Clear buffers before fork to avoid duplicate output + sys.stdout.flush() + sys.stderr.flush() + pid = os.fork() + if pid == 0: + ourpid = os.getpid() + try: + newbuilddir = None + stream = os.fdopen(c2pwrite, 'wb', 1) + os.close(c2pread) + + # Create a new separate BUILDDIR for each group of tests + if 'BUILDDIR' in os.environ: + builddir = os.environ['BUILDDIR'] + newbuilddir = builddir + "-st-" + str(ourpid) + selftestdir = os.path.abspath(builddir + "/../meta-selftest") + newselftestdir = newbuilddir + "/meta-selftest" + + bb.utils.mkdirhier(newbuilddir) + oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") + oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") + oe.path.copytree(selftestdir, newselftestdir) + + for e in os.environ: + if builddir in os.environ[e]: + os.environ[e] = os.environ[e].replace(builddir, newbuilddir) + + subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) + + # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow + subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) + + os.chdir(newbuilddir) + + for t in process_suite: + if not hasattr(t, "tc"): + continue + cp = t.tc.config_paths + for p in cp: + if selftestdir in cp[p] and newselftestdir not in cp[p]: + cp[p] = cp[p].replace(selftestdir, newselftestdir) + if builddir in cp[p] and newbuilddir not in cp[p]: + cp[p] = cp[p].replace(builddir, newbuilddir) + + # Leave stderr and stdout open so we can see test noise + # Close stdin so that the child goes away if it decides to + # read from stdin (otherwise its a roulette to see what + # child actually gets keystrokes for pdb etc). + newsi = os.open(os.devnull, os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + subunit_client = TestProtocolClient(stream) + # Force buffering of stdout/stderr so the console doesn't get corrupted by test output + # as per default in parent code + subunit_client.buffer = True + subunit_result = AutoTimingTestResultDecorator(subunit_client) + process_suite.run(subunit_result) + if ourpid != os.getpid(): + os._exit(0) + if newbuilddir: + removebuilddir(newbuilddir) + except: + # Don't do anything with process children + if ourpid != os.getpid(): + os._exit(1) + # Try and report traceback on stream, but exit with error + # even if stream couldn't be created or something else + # goes wrong. The traceback is formatted to a string and + # written in one go to avoid interleaving lines from + # multiple failing children. + try: + stream.write(traceback.format_exc().encode('utf-8')) + except: + sys.stderr.write(traceback.format_exc()) + finally: + if newbuilddir: + removebuilddir(newbuilddir) + os._exit(1) + os._exit(0) + else: + os.close(c2pwrite) + stream = os.fdopen(c2pread, 'rb', 1) + test = ProtocolTestCase(stream) + result.append((test, numtests)) + return result, totaltests + +def partition_tests(suite, count): + # Keep tests from the same class together but allow tests from modules + # to go to different processes to aid parallelisation. + modules = {} + for test in iterate_tests(suite): + m = test.__module__ + "." + test.__class__.__name__ + if m not in modules: + modules[m] = [] + modules[m].append(test) + + # Simply divide the test blocks between the available processes + partitions = [list() for _ in range(count)] + for partition, m in zip(cycle(partitions), modules): + partition.extend(modules[m]) + + # No point in empty threads so drop them + return [p for p in partitions if p] + diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py index 9e90d3c256..c937b8171c 100644 --- a/meta/lib/oeqa/selftest/context.py +++ b/meta/lib/oeqa/selftest/context.py @@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext): self.custommachine = None self.config_paths = config_paths - def runTests(self, machine=None, skips=[]): + def runTests(self, processes=None, machine=None, skips=[]): if machine: self.custommachine = machine if machine == 'random': self.custommachine = choice(self.machines) self.logger.info('Run tests with custom MACHINE set to: %s' % \ self.custommachine) - return super(OESelftestTestContext, self).runTests(skips) + return super(OESelftestTestContext, self).runTests(processes, skips) def listTests(self, display_type, machine=None): return super(OESelftestTestContext, self).listTests(display_type) @@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor): action="store_true", default=False, help='List all available tests.') + parser.add_argument('-j', '--num-processes', dest='processes', action='store', + type=int, help="number of processes to execute in parallel with") + parser.add_argument('--machine', required=False, choices=['random', 'all'], help='Run tests on different machines (random/all).') @@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor): self.tc_kwargs['init']['config_paths']['bblayers_backup']) self.tc_kwargs['run']['skips'] = args.skips + self.tc_kwargs['run']['processes'] = args.processes def _pre_run(self): def _check_required_env_variables(vars): -- cgit 1.2.3-korg