aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/runqemu-export-rootfs
AgeCommit message (Expand)Author
2012-10-04runqemu: allow multiple unfs instances to run simultaneouslyScott Garman
2012-10-04runqemu-export-rootfs: improve rpcbind error detectionScott Garman
2012-10-04runqemu-export-rootfs: use consistent whitespaceScott Garman
2012-08-28runqemu-export-rootfs and friends: don't put pseudo db in target fsPeter Seebach
2011-10-10Allow user mode NFS server to run without rpcbind / portmapJason Wessel
2011-10-04runqemu-export-rootfs: Add HOW-TO for ubuntu 11.10 for rpcbind problemKhem Raj
2011-09-02scripts: Show sensible warning messages if expected binaries don't existRichard Purdie
2011-08-23Use OECORE_DISTRO_VERSION instead of POKY_DISTRO_VERSIONOtavio Salvador
2011-04-21Further cleanup of various poky referencesRichard Purdie
2011-04-21Rename ~/.poky-sdk/ temp workdir to ~/.runqemu-sdk/Richard Purdie
2011-04-20Rename the remaining poky-* scripts to oe-* or runqemu-*Richard Purdie
ef='#n214'>214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
#!/usr/bin/env python

import os
import sys
import warnings
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
from bb import fetch2
import logging
import bb
import select
import errno
import signal

# Users shouldn't be running this code directly
if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
    sys.exit(1)

logger = logging.getLogger("BitBake")

try:
    import cPickle as pickle
except ImportError:
    import pickle
    bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")


worker_pipe = sys.stdout.fileno()
bb.utils.nonblockingfd(worker_pipe)

handler = bb.event.LogHandler()
logger.addHandler(handler)

if 0:
    # Code to write out a log file of all events passing through the worker
    logfilename = "/tmp/workerlogfile"
    format_str = "%(levelname)s: %(message)s"
    conlogformat = bb.msg.BBLogFormatter(format_str)
    consolelog = logging.FileHandler(logfilename)
    bb.msg.addDefaultlogFilter(consolelog)
    consolelog.setFormatter(conlogformat)
    logger.addHandler(consolelog)

worker_queue = ""

def worker_fire(event, d):
    data = "<event>" + pickle.dumps(event) + "</event>"
    worker_fire_prepickled(data)

def worker_fire_prepickled(event):
    global worker_queue

    worker_queue = worker_queue + event
    worker_flush()

def worker_flush():
    global worker_queue, worker_pipe

    if not worker_queue:
        return

    try:
        written = os.write(worker_pipe, worker_queue)
        worker_queue = worker_queue[written:]
    except (IOError, OSError) as e:
        if e.errno != errno.EAGAIN:
            raise

def worker_child_fire(event, d):
    global worker_pipe

    data = "<event>" + pickle.dumps(event) + "</event>"
    worker_pipe.write(data)

bb.event.worker_fire = worker_fire

lf = None
#lf = open("/tmp/workercommandlog", "w+")
def workerlog_write(msg):
    if lf:
        lf.write(msg)
        lf.flush()

def sigterm_handler(signum, frame):
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    os.killpg(0, signal.SIGTERM)
    sys.exit()

def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
    # We need to setup the environment BEFORE the fork, since
    # a fork() or exec*() activates PSEUDO...

    envbackup = {}
    fakeenv = {}
    umask = None

    taskdep = workerdata["taskdeps"][fn]
    if 'umask' in taskdep and taskname in taskdep['umask']:
        # umask might come in as a number or text string..
        try:
             umask = int(taskdep['umask'][taskname],8)
        except TypeError:
             umask = taskdep['umask'][taskname]

    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
        envvars = (workerdata["fakerootenv"][fn] or "").split()
        for key, value in (var.split('=') for var in envvars):
            envbackup[key] = os.environ.get(key)
            os.environ[key] = value
            fakeenv[key] = value

        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
        for p in fakedirs:
            bb.utils.mkdirhier(p)
        logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
                        (fn, taskname, ', '.join(fakedirs)))
    else:
        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
        for key, value in (var.split('=') for var in envvars):
            envbackup[key] = os.environ.get(key)
            os.environ[key] = value
            fakeenv[key] = value

    sys.stdout.flush()
    sys.stderr.flush()

    try:
        pipein, pipeout = os.pipe()
        pipein = os.fdopen(pipein, 'rb', 4096)
        pipeout = os.fdopen(pipeout, 'wb', 0)
        pid = os.fork()
    except OSError as e:
        bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))

    if pid == 0:
            global worker_pipe
            pipein.close()

            signal.signal(signal.SIGTERM, sigterm_handler)

            # Save out the PID so that the event can include it the
            # events
            bb.event.worker_pid = os.getpid()
            bb.event.worker_fire = worker_child_fire
            worker_pipe = pipeout

            # Make the child the process group leader
            os.setpgid(0, 0)
            # No stdin
            newsi = os.open(os.devnull, os.O_RDWR)
            os.dup2(newsi, sys.stdin.fileno())

            if umask:
                os.umask(umask)

            data.setVar("BB_WORKERCONTEXT", "1")
            data.setVar("BB_TASKDEPDATA", taskdepdata)
            data.setVar("BUILDNAME", workerdata["buildname"])
            data.setVar("DATE", workerdata["date"])
            data.setVar("TIME", workerdata["time"])
            bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
            ret = 0
            try:
                the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
                the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])

                # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 
                # successfully. We also need to unset anything from the environment which shouldn't be there 
                exports = bb.data.exported_vars(the_data)
                bb.utils.empty_environment()
                for e, v in exports:
                    os.environ[e] = v
                for e in fakeenv:
                    os.environ[e] = fakeenv[e]
                    the_data.setVar(e, fakeenv[e])
                    the_data.setVarFlag(e, 'export', "1")

                if quieterrors:
                    the_data.setVarFlag(taskname, "quieterrors", "1")

            except Exception as exc:
                if not quieterrors:
                    logger.critical(str(exc))
                os._exit(1)
            try:
                if not cfg.dry_run:
                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
                os._exit(ret)
            except:
                os._exit(1)
    else:
        for key, value in envbackup.iteritems():
            if value is None:
                del os.environ[key]
            else:
                os.environ[key] = value

    return pid, pipein, pipeout

class runQueueWorkerPipe():
    """
    Abstraction for a pipe between a worker thread and the worker server
    """
    def __init__(self, pipein, pipeout):
        self.input = pipein
        if pipeout:
            pipeout.close()
        bb.utils.nonblockingfd(self.input)
        self.queue = ""

    def read(self):
        start = len(self.queue)
        try:
            self.queue = self.queue + self.input.read(102400)
        except (OSError, IOError) as e:
            if e.errno != errno.EAGAIN:
                raise

        end = len(self.queue)
        index = self.queue.find("</event>")
        while index != -1:
            worker_fire_prepickled(self.queue[:index+8])
            self.queue = self.queue[index+8:]
            index = self.queue.find("</event>")
        return (end > start)

    def close(self):
        while self.read():
            continue
        if len(self.queue) > 0:
            print("Warning, worker child left partial message: %s" % self.queue)
        self.input.close()

normalexit = False

class BitbakeWorker(object):
    def __init__(self, din):
        self.input = din
        bb.utils.nonblockingfd(self.input)
        self.queue = ""
        self.cookercfg = None
        self.databuilder = None
        self.data = None
        self.build_pids = {}
        self.build_pipes = {}
    
        signal.signal(signal.SIGTERM, self.sigterm_exception)

    def sigterm_exception(self, signum, stackframe):
        bb.warn("Worker recieved SIGTERM, shutting down...")
        self.handle_finishnow(None)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        os.kill(os.getpid(), signal.SIGTERM)

    def serve(self):        
        while True:
            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
            if self.input in ready or len(self.queue):
                start = len(self.queue)
                try:
                    self.queue = self.queue + self.input.read()
                except (OSError, IOError):
                    pass
                end = len(self.queue)
                self.handle_item("cookerconfig", self.handle_cookercfg)
                self.handle_item("workerdata", self.handle_workerdata)
                self.handle_item("runtask", self.handle_runtask)
                self.handle_item("finishnow", self.handle_finishnow)
                self.handle_item("ping", self.handle_ping)
                self.handle_item("quit", self.handle_quit)

            for pipe in self.build_pipes:
                self.build_pipes[pipe].read()
            if len(self.build_pids):
                self.process_waitpid()
            worker_flush()


    def handle_item(self, item, func):
        if self.queue.startswith("<" + item + ">"):
            index = self.queue.find("</" + item + ">")
            while index != -1:
                func(self.queue[(len(item) + 2):index])
                self.queue = self.queue[(index + len(item) + 3):]
                index = self.queue.find("</" + item + ">")

    def handle_cookercfg(self, data):
        self.cookercfg = pickle.loads(data)
        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
        self.databuilder.parseBaseConfiguration()
        self.data = self.databuilder.data

    def handle_workerdata(self, data):
        self.workerdata = pickle.loads(data)
        bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
        bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
        bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
        self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])

    def handle_ping(self, _):
        workerlog_write("Handling ping\n")

        logger.warn("Pong from bitbake-worker!")

    def handle_quit(self, data):
        workerlog_write("Handling quit\n")

        global normalexit
        normalexit = True
        sys.exit(0)

    def handle_runtask(self, data):
        fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))

        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)

        self.build_pids[pid] = task
        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)

    def process_waitpid(self):
        """
        Return none is there are no processes awaiting result collection, otherwise
        collect the process exit codes and close the information pipe.
        """
        try:
            pid, status = os.waitpid(-1, os.WNOHANG)
            if pid == 0 or os.WIFSTOPPED(status):
                return None
        except OSError:
            return None

        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))

        if os.WIFEXITED(status):
            status = os.WEXITSTATUS(status)
        elif os.WIFSIGNALED(status):
            # Per shell conventions for $?, when a process exits due to
            # a signal, we return an exit code of 128 + SIGNUM
            status = 128 + os.WTERMSIG(status)

        task = self.build_pids[pid]
        del self.build_pids[pid]

        self.build_pipes[pid].close()
        del self.build_pipes[pid]

        worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")

    def handle_finishnow(self, _):
        if self.build_pids:
            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
            for k, v in self.build_pids.iteritems():
                try:
                    os.kill(-k, signal.SIGTERM)
                    os.waitpid(-1, 0)
                except:
                    pass
        for pipe in self.build_pipes:
            self.build_pipes[pipe].read()

try:
    worker = BitbakeWorker(sys.stdin)
    worker.serve()
except BaseException as e:
    if not normalexit:
        import traceback
        sys.stderr.write(traceback.format_exc())
        sys.stderr.write(str(e))
while len(worker_queue):
    worker_flush()
workerlog_write("exitting")
sys.exit(0)