From 787c1cf81195e97dda5fdab3a0a5d8bda97f6770 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Thu, 19 Aug 2010 11:36:29 +0100 Subject: bitbake: Initial scenequeue implementation (needs major fixes) bitbake: scenequeue: Skip setscene if the underlying task already ran bitbake/setscene: Make sure uneeded dependencies are removed recursively Signed-off-by: Richard Purdie --- bitbake/lib/bb/runqueue.py | 286 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 262 insertions(+), 24 deletions(-) (limited to 'bitbake') diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 488aa04d06..9127f248d5 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py @@ -27,6 +27,7 @@ from bb import msg, data, event import signal import stat import fcntl +import copy class RunQueueStats: """ @@ -57,12 +58,14 @@ class RunQueueStats: # These values indicate the next step due to be run in the # runQueue state machine runQueuePrepare = 2 -runQueueRunInit = 3 -runQueueRunning = 4 -runQueueFailed = 6 -runQueueCleanUp = 7 -runQueueComplete = 8 -runQueueChildProcess = 9 +runQueueSceneInit = 3 +runQueueSceneRun = 4 +runQueueRunInit = 5 +runQueueRunning = 6 +runQueueFailed = 7 +runQueueCleanUp = 8 +runQueueComplete = 9 +runQueueChildProcess = 10 class RunQueueScheduler(object): """ @@ -672,6 +675,16 @@ class RunQueueData: #self.dump_data(taskData) + # Interate over the task list looking for tasks with a 'setscene' function + + self.runq_setscene = [] + for task in range(len(self.runq_fnid)): + setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False) + if not setscene: + continue + bb.note("Found setscene for %s %s" % (self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task])) + self.runq_setscene.append(task) + def dump_data(self, taskQueue): """ Dump some debug information on the internal data structures @@ -802,6 +815,13 @@ class RunQueue: return current def check_stamp_task(self, task, taskname = None): + def get_timestamp(f): + try: + if not os.access(f, os.F_OK): + return None + return os.stat(f)[stat.ST_MTIME] + except: + return None if self.stamppolicy == "perfile": fulldeptree = False @@ -825,23 +845,24 @@ class RunQueue: bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname)) return False + if taskname.endswith("_setscene"): + return True + iscurrent = True - t1 = os.stat(stampfile)[stat.ST_MTIME] + t1 = get_timestamp(stampfile) for dep in self.rqdata.runq_depends[task]: if iscurrent: fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]] taskname2 = self.rqdata.runq_task[dep] stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2) + t2 = get_timestamp(stampfile2) + t3 = get_timestamp(stampfile2 + "_setscene") + if t3 and t3 > t2: + continue if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): - try: - t2 = os.stat(stampfile2)[stat.ST_MTIME] - if t1 < t2: - bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile, stampfile2)) - iscurrent = False - except: - bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2, stampfile)) + if not t2 or t1 < t2: + bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s (or does not exist)" % (stampfile, stampfile2)) iscurrent = False - return iscurrent def execute_runqueue(self): @@ -855,7 +876,13 @@ class RunQueue: if self.state is runQueuePrepare: self.rqdata.prepare() - self.state = runQueueRunInit + self.state = runQueueSceneInit + + if self.state is runQueueSceneInit: + self.rqexe = RunQueueExecuteScenequeue(self) + + if self.state is runQueueSceneRun: + self.rqexe.execute() if self.state is runQueueRunInit: bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") @@ -948,14 +975,10 @@ class RunQueueExecute: for pipe in self.build_pipes: self.build_pipes[pipe].read() - try: - while self.stats.active > 0: - bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) - if self.runqueue_process_waitpid() is None: - return - except: - self.finish_now() - raise + if self.stats.active > 0: + bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) + self.runqueue_process_waitpid() + return if len(self.failed_fnids) != 0: self.rq.state = runQueueFailed @@ -1034,6 +1057,23 @@ class RunQueueExecuteTasks(RunQueueExecute): self.runq_buildable.append(1) else: self.runq_buildable.append(0) + if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(task) + + found = True + while found: + found = False + for task in range(self.stats.total): + if task in self.rq.scenequeue_covered: + continue + if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered): + self.rq.scenequeue_covered.add(task) + found = True + + bb.note("Full skip list %s" % self.rq.scenequeue_covered) + + for task in self.rq.scenequeue_covered: + self.task_skip(task) event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) @@ -1145,6 +1185,204 @@ class RunQueueExecuteTasks(RunQueueExecute): self.rq.state = runQueueComplete return +class RunQueueExecuteScenequeue(RunQueueExecute): + def __init__(self, rq): + RunQueueExecute.__init__(self, rq) + + self.scenequeue_covered = set() + self.scenequeue_notcovered = set() + + # If we don't have any setscene functions, skip this step + if len(self.rqdata.runq_setscene) == 0: + rq.scenequeue_covered = set() + rq.state = runQueueRunInit + return + + self.stats = RunQueueStats(len(self.rqdata.runq_setscene)) + + endpoints = {} + sq_revdeps = [] + sq_revdeps_new = [] + sq_revdeps_squash = [] + + # We need to construct a dependency graph for the setscene functions. Intermediate + # dependencies between the setscene tasks only complicate the code. This code + # therefore aims to collapse the huge runqueue dependency tree into a smaller one + # only containing the setscene functions. + + for task in range(self.stats.total): + self.runq_running.append(0) + self.runq_complete.append(0) + self.runq_buildable.append(0) + + for task in range(len(self.rqdata.runq_fnid)): + sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task])) + sq_revdeps_new.append(set()) + if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene: + endpoints[task] = None + + for task in self.rqdata.runq_setscene: + for dep in self.rqdata.runq_depends[task]: + endpoints[dep] = task + + def process_endpoints(endpoints): + newendpoints = {} + for point, task in endpoints.items(): + tasks = set() + if task: + tasks.add(task) + if sq_revdeps_new[point]: + tasks |= sq_revdeps_new[point] + sq_revdeps_new[point] = set() + for dep in self.rqdata.runq_depends[point]: + if point in sq_revdeps[dep]: + sq_revdeps[dep].remove(point) + if tasks: + sq_revdeps_new[dep] |= tasks + if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene: + newendpoints[dep] = task + if len(newendpoints) != 0: + process_endpoints(newendpoints) + + process_endpoints(endpoints) + + for task in range(len(self.rqdata.runq_fnid)): + if task in self.rqdata.runq_setscene: + deps = set() + for dep in sq_revdeps_new[task]: + deps.add(self.rqdata.runq_setscene.index(dep)) + sq_revdeps_squash.append(deps) + elif len(sq_revdeps_new[task]) != 0: + bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") + + #for task in range(len(sq_revdeps_squash)): + # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task]) + + self.sq_deps = [] + self.sq_revdeps = sq_revdeps_squash + self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps) + + for task in range(len(self.sq_revdeps)): + self.sq_deps.append(set()) + for task in range(len(self.sq_revdeps)): + for dep in self.sq_revdeps[task]: + self.sq_deps[dep].add(task) + + for task in range(len(self.sq_revdeps)): + if len(self.sq_revdeps[task]) == 0: + self.runq_buildable[task] = 1 + + bb.msg.note(1, bb.msg.domain.RunQueue, "Executing setscene Tasks") + + self.rq.state = runQueueSceneRun + + def scenequeue_updatecounters(self, task): + for dep in self.sq_deps[task]: + self.sq_revdeps2[dep].remove(task) + if len(self.sq_revdeps2[dep]) == 0: + self.runq_buildable[dep] = 1 + + def task_complete(self, task): + """ + Mark a task as completed + Look at the reverse dependencies and mark any task with + completed dependencies as buildable + """ + + index = self.rqdata.runq_setscene[task] + bb.msg.note(1, bb.msg.domain.RunQueue, "Found task %s could be accelerated" % self.rqdata.get_user_idstring(index)) + + self.scenequeue_covered.add(task) + self.scenequeue_updatecounters(task) + + def task_fail(self, task, result): + self.stats.taskFailed() + index = self.rqdata.runq_setscene[task] + bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task) + + def task_failoutright(self, task): + self.runq_running[task] = 1 + self.runq_buildable[task] = 1 + self.stats.taskCompleted() + self.stats.taskSkipped() + index = self.rqdata.runq_setscene[task] + self.scenequeue_notcovered.add(task) + self.scenequeue_updatecounters(task) + + def task_skip(self, task): + self.runq_running[task] = 1 + self.runq_buildable[task] = 1 + self.task_complete(task) + self.stats.taskCompleted() + self.stats.taskSkipped() + + def execute(self): + """ + Run the tasks in a queue prepared by prepare_runqueue + """ + + task = None + if self.stats.active < self.number_tasks: + # Find the next setscene to run + for nexttask in range(self.stats.total): + if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1: + #bb.note("Comparing %s to %s" % (self.sq_revdeps[nexttask], self.scenequeue_covered)) + #if len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered): + # bb.note("Skipping task %s" % nexttask) + # self.scenequeue_skip(nexttask) + # return True + task = nexttask + break + if task is not None: + realtask = self.rqdata.runq_setscene[task] + fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] + + taskname = self.rqdata.runq_task[realtask] + "_setscene" + if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]): + bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp for underlying task %s (%s) is current so skipping setscene varient" % (task, self.rqdata.get_user_idstring(task))) + self.task_failoutright(task) + return True + + if self.cooker.configuration.force: + for target in self.target_pairs: + if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]: + self.task_failoutright(task) + return True + + if self.rq.check_stamp_task(realtask, taskname): + bb.msg.debug(2, bb.msg.domain.RunQueue, "Setscene stamp current task %s (%s) so skip it and its dependencies" % (task, self.rqdata.get_user_idstring(realtask))) + self.task_skip(task) + return True + + pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) + + self.build_pids[pid] = task + self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) + self.runq_running[task] = 1 + self.stats.taskActive() + if self.stats.active < self.number_tasks: + return True + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + + if self.stats.active > 0: + if self.runqueue_process_waitpid() is None: + return True + return True + + # Convert scenequeue_covered task numbers into full taskgraph ids + oldcovered = self.scenequeue_covered + self.rq.scenequeue_covered = set() + for task in oldcovered: + self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task]) + + bb.note("We can skip tasks %s" % self.rq.scenequeue_covered) + + self.rq.state = runQueueRunInit + return True class TaskFailure(Exception): -- cgit 1.2.3-korg