aboutsummaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-03-06 15:32:35 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-03-06 15:40:56 +0000
commit164a4cb2fc64e76836182ad2d412343a7b26b964 (patch)
treec4246277b4148eec0b77ba8b89ac234b114cad6b /bitbake
parented76a48e6832c4a2015b9675e758f8c31229938e (diff)
downloadopenembedded-core-contrib-164a4cb2fc64e76836182ad2d412343a7b26b964.tar.gz
bitbake: Revert "cooker: parse using bb.compat.Pool"
Reverting the pool changes, terminate does not work reliably on bb.compat.Pool :( [YOCTO #3978] This reverts commit 8af519a49a3374bd9004864ef31ca8aa328e9f34. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/bb/cooker.py161
1 files changed, 134 insertions, 27 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 9f7121fefc..9d051fa30f 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -34,7 +34,7 @@ from cStringIO import StringIO
from contextlib import closing
from functools import wraps
from collections import defaultdict
-import bb, bb.exceptions, bb.command, bb.compat
+import bb, bb.exceptions, bb.command
from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
import Queue
import prserv.serv
@@ -1556,19 +1556,87 @@ class ParsingFailure(Exception):
self.recipe = recipe
Exception.__init__(self, realexception, recipe)
-def parse_file((filename, appends, caches_array)):
- try:
- return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array)
- except Exception as exc:
- tb = sys.exc_info()[2]
- exc.recipe = filename
- exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
- return True, exc
- # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
- # and for example a worker thread doesn't just exit on its own in response to
- # a SystemExit event for example.
- except BaseException as exc:
- return True, ParsingFailure(exc, filename)
+class Feeder(multiprocessing.Process):
+ def __init__(self, jobs, to_parsers, quit):
+ self.quit = quit
+ self.jobs = jobs
+ self.to_parsers = to_parsers
+ multiprocessing.Process.__init__(self)
+
+ def run(self):
+ while True:
+ try:
+ quit = self.quit.get_nowait()
+ except Queue.Empty:
+ pass
+ else:
+ if quit == 'cancel':
+ self.to_parsers.cancel_join_thread()
+ break
+
+ try:
+ job = self.jobs.pop()
+ except IndexError:
+ break
+
+ try:
+ self.to_parsers.put(job, timeout=0.5)
+ except Queue.Full:
+ self.jobs.insert(0, job)
+ continue
+
+class Parser(multiprocessing.Process):
+ def __init__(self, jobs, results, quit, init):
+ self.jobs = jobs
+ self.results = results
+ self.quit = quit
+ self.init = init
+ multiprocessing.Process.__init__(self)
+
+ def run(self):
+ if self.init:
+ self.init()
+
+ pending = []
+ while True:
+ try:
+ self.quit.get_nowait()
+ except Queue.Empty:
+ pass
+ else:
+ self.results.cancel_join_thread()
+ break
+
+ if pending:
+ result = pending.pop()
+ else:
+ try:
+ job = self.jobs.get(timeout=0.25)
+ except Queue.Empty:
+ continue
+
+ if job is None:
+ break
+ result = self.parse(*job)
+
+ try:
+ self.results.put(result, timeout=0.25)
+ except Queue.Full:
+ pending.append(result)
+
+ def parse(self, filename, appends, caches_array):
+ try:
+ return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
+ except Exception as exc:
+ tb = sys.exc_info()[2]
+ exc.recipe = filename
+ exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
+ return True, exc
+ # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
+ # and for example a worker thread doesn't just exit on its own in response to
+ # a SystemExit event for example.
+ except BaseException as exc:
+ return True, ParsingFailure(exc, filename)
class CookerParser(object):
def __init__(self, cooker, filelist, masked):
@@ -1602,25 +1670,32 @@ class CookerParser(object):
self.fromcache.append((filename, appends))
self.toparse = self.total - len(self.fromcache)
self.progress_chunk = max(self.toparse / 100, 1)
- self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
self.start()
self.haveshutdown = False
def start(self):
self.results = self.load_cached()
+ self.processes = []
if self.toparse:
- def process_init():
- parse_file.cfg = self.cfgdata
- multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
- multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
-
bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
-
- self.pool = bb.compat.Pool(self.num_processes, process_init)
- parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk)
- self.pool.close()
- self.results = itertools.chain(self.results, parsed)
+ def init():
+ Parser.cfg = self.cfgdata
+ multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
+ multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
+
+ self.feeder_quit = multiprocessing.Queue(maxsize=1)
+ self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
+ self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
+ self.result_queue = multiprocessing.Queue()
+ self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
+ self.feeder.start()
+ for i in range(0, self.num_processes):
+ parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
+ parser.start()
+ self.processes.append(parser)
+
+ self.results = itertools.chain(self.results, self.parse_generator())
def shutdown(self, clean=True, force=False):
if not self.toparse:
@@ -1636,9 +1711,25 @@ class CookerParser(object):
self.total)
bb.event.fire(event, self.cfgdata)
+ self.feeder_quit.put(None)
+ for process in self.processes:
+ self.jobs.put(None)
else:
- self.pool.terminate()
- self.pool.join()
+ self.feeder_quit.put('cancel')
+
+ self.parser_quit.cancel_join_thread()
+ for process in self.processes:
+ self.parser_quit.put(None)
+
+ self.jobs.cancel_join_thread()
+
+ for process in self.processes:
+ if force:
+ process.join(.1)
+ process.terminate()
+ else:
+ process.join()
+ self.feeder.join()
sync = threading.Thread(target=self.bb_cache.sync)
sync.start()
@@ -1651,6 +1742,22 @@ class CookerParser(object):
cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
yield not cached, infos
+ def parse_generator(self):
+ while True:
+ if self.parsed >= self.toparse:
+ break
+
+ try:
+ result = self.result_queue.get(timeout=0.25)
+ except Queue.Empty:
+ pass
+ else:
+ value = result[1]
+ if isinstance(value, BaseException):
+ raise value
+ else:
+ yield result
+
def parse_next(self):
result = []
parsed = None