summaryrefslogtreecommitdiffstats
path: root/meta/lib
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib')
-rw-r--r--meta/lib/oe/utils.py26
1 files changed, 20 insertions, 6 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0de880013a..f0d3c14137 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
- self.start()
def run(self):
+ from Queue import Empty
+
while True:
- func, args, kargs = self.tasks.get()
+ try:
+ func, args, kargs = self.tasks.get(block=False)
+ except Empty:
+ break
+
try:
func(*args, **kargs)
except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
class ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
- def __init__(self, num_threads):
- self.tasks = Queue(num_threads)
- for _ in range(num_threads): ThreadedWorker(self.tasks)
+ def __init__(self, num_workers, num_tasks):
+ self.tasks = Queue(num_tasks)
+ self.workers = []
+
+ for _ in range(num_workers):
+ worker = ThreadedWorker(self.tasks)
+ self.workers.append(worker)
+
+ def start(self):
+ for worker in self.workers:
+ worker.start()
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
-
+ for worker in self.workers:
+ worker.join()