diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-07-03 16:22:15 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-07-15 09:31:50 +0100 |
commit | 7539fe22bc831bb835901e3aca77985ab4ebc4c7 (patch) | |
tree | 0241cd4e960be5e5e285d509be0bcacacde6944b | |
parent | 32f39bbd5d3b7394689da9ba05be2c15b4523b27 (diff) | |
download | bitbake-7539fe22bc831bb835901e3aca77985ab4ebc4c7.tar.gz |
runqueue: Merge scenequeue and real task queue code together
Merge the unique functions from the Tasks and Scenequeue Tasks classes
into the common base class.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r-- | lib/bb/runqueue.py | 516 |
1 files changed, 258 insertions, 258 deletions
diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 3cc804de4..80cc60ed7 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -1839,66 +1839,6 @@ class RunQueueExecute: can_start = active < self.number_tasks return can_start -class RunQueueExecuteDummy(RunQueueExecute): - def __init__(self, rq): - self.rq = rq - self.stats = RunQueueStats(0) - - def finish(self): - self.rq.state = runQueueComplete - return - -class RunQueueExecuteTasks(RunQueueExecute): - def __init__(self, rq): - RunQueueExecute.__init__(self, rq) - - self.stampcache = {} - - # Mark initial buildable tasks - for tid in self.rqdata.runtaskentries: - if len(self.rqdata.runtaskentries[tid].depends) == 0: - self.runq_buildable.add(tid) - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - self.rq.scenequeue_covered.add(tid) - - found = True - while found: - found = False - for tid in self.rqdata.runtaskentries: - if tid in self.rq.scenequeue_covered: - continue - logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps))) - - if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): - if tid in self.rq.scenequeue_notcovered: - continue - found = True - self.rq.scenequeue_covered.add(tid) - - logger.debug(1, 'Skip list %s', sorted(self.rq.scenequeue_covered)) - - for task in self.rq.scenequeue_notcovered: - logger.debug(1, 'Not skipping task %s', task) - - for mc in self.rqdata.dataCaches: - target_pairs = [] - for tid in self.rqdata.target_tids: - (tidmc, fn, taskname, _) = split_tid_mcfn(tid) - if tidmc == mc: - target_pairs.append((fn, taskname)) - - event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData) - - schedulers = self.get_schedulers() - for scheduler in schedulers: - if self.scheduler == scheduler.name: - self.sched = scheduler(self, self.rqdata) - logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) - break - else: - bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % - (self.scheduler, ", ".join(obj.name for obj in schedulers))) - def get_schedulers(self): schedulers = set(obj for obj in globals().values() if type(obj) is type and @@ -2100,6 +2040,204 @@ class RunQueueExecuteTasks(RunQueueExecute): #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) return taskdepdata + def scenequeue_updatecounters(self, task, fail = False): + for dep in self.sqdata.sq_deps[task]: + if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: + logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) + self.scenequeue_updatecounters(dep, fail) + continue + if task not in self.sqdata.sq_revdeps2[dep]: + # May already have been removed by the fail case above + continue + self.sqdata.sq_revdeps2[dep].remove(task) + if len(self.sqdata.sq_revdeps2[dep]) == 0: + self.sq_buildable.add(dep) + + def sq_task_completeoutright(self, task): + """ + Mark a task as completed + Look at the reverse dependencies and mark any task with + completed dependencies as buildable + """ + + logger.debug(1, 'Found task %s which could be accelerated', task) + self.scenequeue_covered.add(task) + self.scenequeue_updatecounters(task) + + def sq_check_taskfail(self, task): + if self.rqdata.setscenewhitelist is not None: + realtask = task.split('_setscene')[0] + (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): + logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) + self.rq.state = runQueueCleanUp + + def sq_task_complete(self, task): + self.sq_stats.taskCompleted() + bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) + self.sq_task_completeoutright(task) + + def sq_task_fail(self, task, result): + self.sq_stats.taskFailed() + bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task, True) + self.sq_check_taskfail(task) + + def sq_task_failoutright(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) + self.sq_stats.taskSkipped() + self.sq_stats.taskCompleted() + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task, True) + + def sq_task_skip(self, task): + self.sq_running.add(task) + self.sq_buildable.add(task) + self.sq_task_completeoutright(task) + self.sq_stats.taskSkipped() + self.sq_stats.taskCompleted() + + def sq_execute(self): + """ + Run the tasks in a queue prepared by prepare_runqueue + """ + + self.rq.read_workers() + + task = None + if self.can_start_task(): + # Find the next setscene to run + for nexttask in self.rqdata.runq_setscene_tids: + if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): + if nexttask in self.sqdata.unskippable: + logger.debug(2, "Setscene task %s is unskippable" % nexttask) + if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): + fn = fn_from_tid(nexttask) + foundtarget = False + + if nexttask in self.rqdata.target_tids: + foundtarget = True + if not foundtarget: + logger.debug(2, "Skipping setscene for task %s" % nexttask) + self.sq_task_skip(nexttask) + self.scenequeue_notneeded.add(nexttask) + return True + if nexttask in self.sqdata.outrightfail: + self.sq_task_failoutright(nexttask) + return True + task = nexttask + break + if task is not None: + (mc, fn, taskname, taskfn) = split_tid_mcfn(task) + taskname = taskname + "_setscene" + if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): + logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) + self.sq_task_failoutright(task) + return True + + if self.cooker.configuration.force: + if task in self.rqdata.target_tids: + self.sq_task_failoutright(task) + return True + + if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): + logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) + self.sq_task_skip(task) + return True + + if self.cooker.configuration.skipsetscene: + logger.debug(2, 'No setscene tasks should be executed. Skipping %s', task) + self.sq_task_failoutright(task) + return True + + startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) + bb.event.fire(startevent, self.cfgData) + + taskdepdata = self.sq_build_taskdepdata(task) + + taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] + taskhash = self.rqdata.get_task_hash(task) + unihash = self.rqdata.get_task_unihash(task) + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: + if not mc in self.rq.fakeworker: + self.rq.start_fakeworker(self, mc) + self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") + self.rq.fakeworker[mc].process.stdin.flush() + else: + self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") + self.rq.worker[mc].process.stdin.flush() + + self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) + self.build_stamps2.append(self.build_stamps[task]) + self.sq_running.add(task) + self.sq_live.add(task) + self.sq_stats.taskActive() + if self.can_start_task(): + return True + + if self.sq_stats.active > 0: + self.rq.read_workers() + return self.rq.active_fds() + + #for tid in self.sqdata.sq_revdeps: + # if tid not in self.sq_running: + # buildable = tid in self.sq_buildable + # revdeps = self.sqdata.sq_revdeps[tid] + # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps))) + + self.rq.scenequeue_covered = self.scenequeue_covered + self.rq.scenequeue_notcovered = self.scenequeue_notcovered + + logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered))) + + self.rq.state = runQueueRunInit + + completeevent = sceneQueueComplete(self.sq_stats, self.rq) + bb.event.fire(completeevent, self.cfgData) + + return True + + def sq_build_taskdepdata(self, task): + def getsetscenedeps(tid): + deps = set() + (mc, fn, taskname, _) = split_tid_mcfn(tid) + realtid = tid + "_setscene" + idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends + for (depname, idependtask) in idepends: + if depname not in self.rqdata.taskData[mc].build_targets: + continue + + depfn = self.rqdata.taskData[mc].build_targets[depname][0] + if depfn is None: + continue + deptid = depfn + ":" + idependtask.replace("_setscene", "") + deps.add(deptid) + return deps + + taskdepdata = {} + next = getsetscenedeps(task) + next.add(task) + while next: + additional = [] + for revdep in next: + (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] + deps = getsetscenedeps(revdep) + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] + taskhash = self.rqdata.runtaskentries[revdep].hash + unihash = self.rqdata.runtaskentries[revdep].unihash + taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] + for revdep2 in deps: + if revdep2 not in taskdepdata: + additional.append(revdep2) + next = additional + + #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) + return taskdepdata + class SQData(object): def __init__(self): # SceneQueue dependencies @@ -2346,6 +2484,66 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): sqdata.outrightfail.append(tid) +class RunQueueExecuteDummy(RunQueueExecute): + def __init__(self, rq): + self.rq = rq + self.stats = RunQueueStats(0) + + def finish(self): + self.rq.state = runQueueComplete + return + +class RunQueueExecuteTasks(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.stampcache = {} + + # Mark initial buildable tasks + for tid in self.rqdata.runtaskentries: + if len(self.rqdata.runtaskentries[tid].depends) == 0: + self.runq_buildable.add(tid) + if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(tid) + + found = True + while found: + found = False + for tid in self.rqdata.runtaskentries: + if tid in self.rq.scenequeue_covered: + continue + logger.debug(1, 'Considering %s: %s' % (tid, str(self.rqdata.runtaskentries[tid].revdeps))) + + if len(self.rqdata.runtaskentries[tid].revdeps) > 0 and self.rqdata.runtaskentries[tid].revdeps.issubset(self.rq.scenequeue_covered): + if tid in self.rq.scenequeue_notcovered: + continue + found = True + self.rq.scenequeue_covered.add(tid) + + logger.debug(1, 'Skip list %s', sorted(self.rq.scenequeue_covered)) + + for task in self.rq.scenequeue_notcovered: + logger.debug(1, 'Not skipping task %s', task) + + for mc in self.rqdata.dataCaches: + target_pairs = [] + for tid in self.rqdata.target_tids: + (tidmc, fn, taskname, _) = split_tid_mcfn(tid) + if tidmc == mc: + target_pairs.append((fn, taskname)) + + event.fire(bb.event.StampUpdate(target_pairs, self.rqdata.dataCaches[mc].stamp), self.cfgData) + + schedulers = self.get_schedulers() + for scheduler in schedulers: + if self.scheduler == scheduler.name: + self.sched = scheduler(self, self.rqdata) + logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) + break + else: + bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % + (self.scheduler, ", ".join(obj.name for obj in schedulers))) + class RunQueueExecuteScenequeue(RunQueueExecute): def __init__(self, rq): RunQueueExecute.__init__(self, rq) @@ -2368,204 +2566,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute): self.rq.state = runQueueSceneRun - def scenequeue_updatecounters(self, task, fail = False): - for dep in self.sqdata.sq_deps[task]: - if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: - logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) - self.scenequeue_updatecounters(dep, fail) - continue - if task not in self.sqdata.sq_revdeps2[dep]: - # May already have been removed by the fail case above - continue - self.sqdata.sq_revdeps2[dep].remove(task) - if len(self.sqdata.sq_revdeps2[dep]) == 0: - self.sq_buildable.add(dep) - - def sq_task_completeoutright(self, task): - """ - Mark a task as completed - Look at the reverse dependencies and mark any task with - completed dependencies as buildable - """ - - logger.debug(1, 'Found task %s which could be accelerated', task) - self.scenequeue_covered.add(task) - self.scenequeue_updatecounters(task) - - def sq_check_taskfail(self, task): - if self.rqdata.setscenewhitelist is not None: - realtask = task.split('_setscene')[0] - (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist): - logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) - self.rq.state = runQueueCleanUp - - def sq_task_complete(self, task): - self.sq_stats.taskCompleted() - bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData) - self.sq_task_completeoutright(task) - - def sq_task_fail(self, task, result): - self.sq_stats.taskFailed() - bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) - self.scenequeue_notcovered.add(task) - self.scenequeue_updatecounters(task, True) - self.sq_check_taskfail(task) - - def sq_task_failoutright(self, task): - self.sq_running.add(task) - self.sq_buildable.add(task) - self.sq_stats.taskSkipped() - self.sq_stats.taskCompleted() - self.scenequeue_notcovered.add(task) - self.scenequeue_updatecounters(task, True) - - def sq_task_skip(self, task): - self.sq_running.add(task) - self.sq_buildable.add(task) - self.sq_task_completeoutright(task) - self.sq_stats.taskSkipped() - self.sq_stats.taskCompleted() - - def sq_execute(self): - """ - Run the tasks in a queue prepared by prepare_runqueue - """ - - self.rq.read_workers() - - task = None - if self.can_start_task(): - # Find the next setscene to run - for nexttask in self.rqdata.runq_setscene_tids: - if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values(): - if nexttask in self.sqdata.unskippable: - logger.debug(2, "Setscene task %s is unskippable" % nexttask) - if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): - fn = fn_from_tid(nexttask) - foundtarget = False - - if nexttask in self.rqdata.target_tids: - foundtarget = True - if not foundtarget: - logger.debug(2, "Skipping setscene for task %s" % nexttask) - self.sq_task_skip(nexttask) - self.scenequeue_notneeded.add(nexttask) - return True - if nexttask in self.sqdata.outrightfail: - self.sq_task_failoutright(nexttask) - return True - task = nexttask - break - if task is not None: - (mc, fn, taskname, taskfn) = split_tid_mcfn(task) - taskname = taskname + "_setscene" - if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): - logger.debug(2, 'Stamp for underlying task %s is current, so skipping setscene variant', task) - self.sq_task_failoutright(task) - return True - - if self.cooker.configuration.force: - if task in self.rqdata.target_tids: - self.sq_task_failoutright(task) - return True - - if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): - logger.debug(2, 'Setscene stamp current task %s, so skip it and its dependencies', task) - self.sq_task_skip(task) - return True - - if self.cooker.configuration.skipsetscene: - logger.debug(2, 'No setscene tasks should be executed. Skipping %s', task) - self.sq_task_failoutright(task) - return True - - startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq) - bb.event.fire(startevent, self.cfgData) - - taskdepdata = self.sq_build_taskdepdata(task) - - taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] - taskhash = self.rqdata.get_task_hash(task) - unihash = self.rqdata.get_task_unihash(task) - if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: - if not mc in self.rq.fakeworker: - self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") - self.rq.fakeworker[mc].process.stdin.flush() - else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collection.get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>") - self.rq.worker[mc].process.stdin.flush() - - self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True) - self.build_stamps2.append(self.build_stamps[task]) - self.sq_running.add(task) - self.sq_live.add(task) - self.sq_stats.taskActive() - if self.can_start_task(): - return True - - if self.sq_stats.active > 0: - self.rq.read_workers() - return self.rq.active_fds() - - #for tid in self.sqdata.sq_revdeps: - # if tid not in self.sq_running: - # buildable = tid in self.sq_buildable - # revdeps = self.sqdata.sq_revdeps[tid] - # bb.warn("Found we didn't run %s %s %s" % (tid, buildable, str(revdeps))) - - self.rq.scenequeue_covered = self.scenequeue_covered - self.rq.scenequeue_notcovered = self.scenequeue_notcovered - - logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.rq.scenequeue_covered))) - - self.rq.state = runQueueRunInit - - completeevent = sceneQueueComplete(self.sq_stats, self.rq) - bb.event.fire(completeevent, self.cfgData) - - return True - - def sq_build_taskdepdata(self, task): - def getsetscenedeps(tid): - deps = set() - (mc, fn, taskname, _) = split_tid_mcfn(tid) - realtid = tid + "_setscene" - idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends - for (depname, idependtask) in idepends: - if depname not in self.rqdata.taskData[mc].build_targets: - continue - - depfn = self.rqdata.taskData[mc].build_targets[depname][0] - if depfn is None: - continue - deptid = depfn + ":" + idependtask.replace("_setscene", "") - deps.add(deptid) - return deps - - taskdepdata = {} - next = getsetscenedeps(task) - next.add(task) - while next: - additional = [] - for revdep in next: - (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - deps = getsetscenedeps(revdep) - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[revdep].hash - unihash = self.rqdata.runtaskentries[revdep].unihash - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash] - for revdep2 in deps: - if revdep2 not in taskdepdata: - additional.append(revdep2) - next = additional - - #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) - return taskdepdata - class TaskFailure(Exception): """ Exception raised when a task in a runqueue fails |