cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edi...@apache.org
Subject [1/3] add xdist
Date Wed, 17 Sep 2014 01:06:07 GMT
Repository: cloudstack
Updated Branches:
  refs/heads/pytest 9610685f4 -> 797fff165


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/dsession.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/dsession.py b/tools/pytest-xdist/xdist/dsession.py
new file mode 100644
index 0000000..8385e76
--- /dev/null
+++ b/tools/pytest-xdist/xdist/dsession.py
@@ -0,0 +1,460 @@
+import difflib
+
+import pytest
+import py
+from xdist.slavemanage import NodeManager
+
+
+queue = py.builtin._tryimport('queue', 'Queue')
+
+
+class EachScheduling:
+
+    def __init__(self, numnodes, log=None):
+        self.numnodes = numnodes
+        self.node2collection = {}
+        self.node2pending = {}
+        if log is None:
+            self.log = py.log.Producer("eachsched")
+        else:
+            self.log = log.eachsched
+        self.collection_is_completed = False
+
+    def hasnodes(self):
+        return bool(self.node2pending)
+
+    def addnode(self, node):
+        self.node2collection[node] = None
+
+    def tests_finished(self):
+        if not self.collection_is_completed:
+            return False
+        return True
+
+    def addnode_collection(self, node, collection):
+        assert not self.collection_is_completed
+        assert self.node2collection[node] is None
+        self.node2collection[node] = list(collection)
+        self.node2pending[node] = []
+        if len(self.node2pending) >= self.numnodes:
+            self.collection_is_completed = True
+
+    def remove_item(self, node, item_index, duration=0):
+        self.node2pending[node].remove(item_index)
+
+    def remove_node(self, node):
+        # KeyError if we didn't get an addnode() yet
+        pending = self.node2pending.pop(node)
+        if not pending:
+            return
+        crashitem = self.node2collection[node][pending.pop(0)]
+        # XXX do or report something wrt the remaining per-node pending items?
+        return crashitem
+
+    def init_distribute(self):
+        assert self.collection_is_completed
+        for node, pending in self.node2pending.items():
+            node.send_runtest_all()
+            pending[:] = range(len(self.node2collection[node]))
+
+
+class LoadScheduling:
+    def __init__(self, numnodes, log=None):
+        self.numnodes = numnodes
+        self.node2pending = {}
+        self.node2collection = {}
+        self.nodes = []
+        self.pending = []
+        if log is None:
+            self.log = py.log.Producer("loadsched")
+        else:
+            self.log = log.loadsched
+        self.collection_is_completed = False
+
+    def hasnodes(self):
+        return bool(self.node2pending)
+
+    def addnode(self, node):
+        self.node2pending[node] = []
+        self.nodes.append(node)
+
+    def tests_finished(self):
+        if not self.collection_is_completed:
+            return False
+        for pending in self.node2pending.values():
+            if len(pending) >= 2:
+                return False
+        return True
+
+    def addnode_collection(self, node, collection):
+        assert not self.collection_is_completed
+        assert node in self.node2pending
+        self.node2collection[node] = list(collection)
+        if len(self.node2collection) >= self.numnodes:
+            self.collection_is_completed = True
+
+    def remove_item(self, node, item_index, duration=0):
+        self.node2pending[node].remove(item_index)
+        self.check_schedule(node, duration=duration)
+
+    def check_schedule(self, node, duration=0):
+        if self.pending:
+            # how many nodes do we have?
+            num_nodes = len(self.node2pending)
+            # if our node goes below a heuristic minimum, fill it out to
+            # heuristic maximum
+            items_per_node_min = max(
+                    2, len(self.pending) // num_nodes // 4)
+            items_per_node_max = max(
+                    2, len(self.pending) // num_nodes // 2)
+            node_pending = self.node2pending[node]
+            if len(node_pending) < items_per_node_min:
+                if duration >= 0.1 and len(node_pending) >= 2:
+                    # seems the node is doing long-running tests
+                    # and has enough items to continue
+                    # so let's rather wait with sending new items
+                    return
+                num_send = items_per_node_max - len(node_pending)
+                self._send_tests(node, num_send)
+
+        self.log("num items waiting for node:", len(self.pending))
+        #self.log("node2pending:", self.node2pending)
+
+    def remove_node(self, node):
+        self.nodes.remove(node)
+        pending = self.node2pending.pop(node)
+        if not pending:
+            return
+        # the node has crashed on the item if there are pending ones
+        # and we are told to remove the node
+        crashitem = self.collection[pending.pop(0)]
+
+        # put the remaining items back to the general pending list
+        self.pending.extend(pending)
+        # see if some nodes can pick the remaining tests up already
+        for node in self.node2pending:
+            self.check_schedule(node)
+        return crashitem
+
+    def init_distribute(self):
+        assert self.collection_is_completed
+        # XXX allow nodes to have different collections
+        if not self._check_nodes_have_same_collection():
+            self.log('**Different tests collected, aborting run**')
+            return
+
+        # all collections are the same, good.
+        # we now create an index
+        self.collection = list(self.node2collection.values())[0]
+        self.pending[:] = range(len(self.collection))
+        if not self.collection:
+            return
+
+        # how many items per node do we have about?
+        items_per_node = len(self.collection) // len(self.node2pending)
+        # take a fraction of tests for initial distribution
+        node_chunksize = max(items_per_node // 4, 2)
+        # and initialize each node with a chunk of tests
+        for node in self.nodes:
+            self._send_tests(node, node_chunksize)
+
+    #f = open("/tmp/sent", "w")
+    def _send_tests(self, node, num):
+        tests_per_node = self.pending[:num]
+        #print >>self.f, "sent", node, tests_per_node
+        if tests_per_node:
+            del self.pending[:num]
+            self.node2pending[node].extend(tests_per_node)
+            node.send_runtest_some(tests_per_node)
+
+    def _check_nodes_have_same_collection(self):
+        """
+        Return True if all nodes have collected the same items, False otherwise.
+        This method also logs the collection differences as they are found.
+        """
+        node_collection_items = list(self.node2collection.items())
+        first_node, col = node_collection_items[0]
+        same_collection = True
+        for node, collection in node_collection_items[1:]:
+            msg = report_collection_diff(
+                col,
+                collection,
+                first_node.gateway.id,
+                node.gateway.id,
+            )
+            if msg:
+                self.log(msg)
+                same_collection = False
+
+        return same_collection
+
+
+def report_collection_diff(from_collection, to_collection, from_id, to_id):
+    """Report the collected test difference between two nodes.
+
+    :returns: detailed message describing the difference between the given
+    collections, or None if they are equal.
+    """
+    if from_collection == to_collection:
+        return None
+
+    diff = difflib.unified_diff(
+        from_collection,
+        to_collection,
+        fromfile=from_id,
+        tofile=to_id,
+    )
+    error_message = py.builtin._totext(
+        'Different tests were collected between {from_id} and {to_id}. '
+        'The difference is:\n'
+        '{diff}'
+    ).format(from_id=from_id, to_id=to_id, diff='\n'.join(diff))
+    msg = "\n".join([x.rstrip() for x in error_message.split("\n")])
+    return msg
+
+
+class Interrupted(KeyboardInterrupt):
+    """ signals an immediate interruption. """
+
+class DSession:
+    def __init__(self, config):
+        self.config = config
+        self.log = py.log.Producer("dsession")
+        if not config.option.debug:
+            py.log.setconsumer(self.log._keywords, None)
+        self.shuttingdown = False
+        self.countfailures = 0
+        self.maxfail = config.getvalue("maxfail")
+        self.queue = queue.Queue()
+        self._failed_collection_errors = {}
+        try:
+            self.terminal = config.pluginmanager.getplugin("terminalreporter")
+        except KeyError:
+            self.terminal = None
+        else:
+            self.trdist = TerminalDistReporter(config)
+            config.pluginmanager.register(self.trdist, "terminaldistreporter")
+
+    def report_line(self, line):
+        if self.terminal and self.config.option.verbose >= 0:
+            self.terminal.write_line(line)
+
+    @pytest.mark.trylast
+    def pytest_sessionstart(self, session):
+        self.nodemanager = NodeManager(self.config)
+        self.nodemanager.setup_nodes(putevent=self.queue.put)
+
+    def pytest_sessionfinish(self, session):
+        """ teardown any resources after a test run. """
+        nm = getattr(self, 'nodemanager', None) # if not fully initialized
+        if nm is not None:
+            nm.teardown_nodes()
+
+    def pytest_collection(self):
+        # prohibit collection of test items in master process
+        return True
+
+    def pytest_runtestloop(self):
+        numnodes = len(self.nodemanager.specs)
+        dist = self.config.getvalue("dist")
+        if dist == "load":
+            self.sched = LoadScheduling(numnodes, log=self.log)
+        elif dist == "each":
+            self.sched = EachScheduling(numnodes, log=self.log)
+        else:
+            assert 0, dist
+        self.shouldstop = False
+        self.session_finished = False
+        while not self.session_finished:
+            self.loop_once()
+            if self.shouldstop:
+                raise Interrupted(str(self.shouldstop))
+        return True
+
+    def loop_once(self):
+        """ process one callback from one of the slaves. """
+        while 1:
+            try:
+                eventcall = self.queue.get(timeout=2.0)
+                break
+            except queue.Empty:
+                continue
+        callname, kwargs = eventcall
+        assert callname, kwargs
+        method = "slave_" + callname
+        call = getattr(self, method)
+        self.log("calling method", method, kwargs)
+        call(**kwargs)
+        if self.sched.tests_finished():
+            self.triggershutdown()
+
+    #
+    # callbacks for processing events from slaves
+    #
+
+    def slave_slaveready(self, node, slaveinfo):
+        node.slaveinfo = slaveinfo
+        node.slaveinfo['id'] = node.gateway.id
+        node.slaveinfo['spec'] = node.gateway.spec
+        self.config.hook.pytest_testnodeready(node=node)
+        self.sched.addnode(node)
+        if self.shuttingdown:
+            node.shutdown()
+
+    def slave_slavefinished(self, node):
+        self.config.hook.pytest_testnodedown(node=node, error=None)
+        if node.slaveoutput['exitstatus'] == 2: # keyboard-interrupt
+            self.shouldstop = "%s received keyboard-interrupt" % (node,)
+            self.slave_errordown(node, "keyboard-interrupt")
+            return
+        crashitem = self.sched.remove_node(node)
+        assert not crashitem, (crashitem, node)
+        if self.shuttingdown and not self.sched.hasnodes():
+            self.session_finished = True
+
+    def slave_errordown(self, node, error):
+        self.config.hook.pytest_testnodedown(node=node, error=error)
+        try:
+            crashitem = self.sched.remove_node(node)
+        except KeyError:
+            pass
+        else:
+            if crashitem:
+                self.handle_crashitem(crashitem, node)
+                #self.report_line("item crashed on node: %s" % crashitem)
+        if not self.sched.hasnodes():
+            self.session_finished = True
+
+    def slave_collectionfinish(self, node, ids):
+        self.sched.addnode_collection(node, ids)
+        if self.terminal:
+            self.trdist.setstatus(node.gateway.spec, "[%d]" %(len(ids)))
+
+        if self.sched.collection_is_completed:
+            if self.terminal:
+                self.trdist.ensure_show_status()
+                self.terminal.write_line("")
+                self.terminal.write_line("scheduling tests via %s" %(
+                    self.sched.__class__.__name__))
+
+            self.sched.init_distribute()
+
+    def slave_logstart(self, node, nodeid, location):
+        self.config.hook.pytest_runtest_logstart(
+            nodeid=nodeid, location=location)
+
+    def slave_testreport(self, node, rep):
+        if not (rep.passed and rep.when != "call"):
+            if rep.when in ("setup", "call"):
+                self.sched.remove_item(node, rep.item_index, rep.duration)
+        #self.report_line("testreport %s: %s" %(rep.id, rep.status))
+        rep.node = node
+        self.config.hook.pytest_runtest_logreport(report=rep)
+        self._handlefailures(rep)
+
+    def slave_collectreport(self, node, rep):
+        if rep.failed:
+            self._failed_slave_collectreport(node, rep)
+
+    def _failed_slave_collectreport(self, node, rep):
+        # Check we haven't already seen this report (from
+        # another slave).
+        if rep.longrepr not in self._failed_collection_errors:
+            self._failed_collection_errors[rep.longrepr] = True
+            self.config.hook.pytest_collectreport(report=rep)
+            self._handlefailures(rep)
+
+    def _handlefailures(self, rep):
+        if rep.failed:
+            self.countfailures += 1
+            if self.maxfail and self.countfailures >= self.maxfail:
+                self.shouldstop = "stopping after %d failures" % (
+                    self.countfailures)
+
+    def triggershutdown(self):
+        self.log("triggering shutdown")
+        self.shuttingdown = True
+        for node in self.sched.node2pending:
+            node.shutdown()
+
+    def handle_crashitem(self, nodeid, slave):
+        # XXX get more reporting info by recording pytest_runtest_logstart?
+        runner = self.config.pluginmanager.getplugin("runner")
+        fspath = nodeid.split("::")[0]
+        msg = "Slave %r crashed while running %r" %(slave.gateway.id, nodeid)
+        rep = runner.TestReport(nodeid, (fspath, None, fspath), (),
+            "failed", msg, "???")
+        rep.node = slave
+        self.config.hook.pytest_runtest_logreport(report=rep)
+
+class TerminalDistReporter:
+    def __init__(self, config):
+        self.config = config
+        self.tr = config.pluginmanager.getplugin("terminalreporter")
+        self._status = {}
+        self._lastlen = 0
+
+    def write_line(self, msg):
+        self.tr.write_line(msg)
+
+    def ensure_show_status(self):
+        if not self.tr.hasmarkup:
+            self.write_line(self.getstatus())
+
+    def setstatus(self, spec, status, show=True):
+        self._status[spec.id] = status
+        if show and self.tr.hasmarkup:
+            self.rewrite(self.getstatus())
+
+    def getstatus(self):
+        parts = ["%s %s" %(spec.id, self._status[spec.id])
+                   for spec in self._specs]
+        return " / ".join(parts)
+
+    def rewrite(self, line, newline=False):
+        pline = line + " " * max(self._lastlen-len(line), 0)
+        if newline:
+            self._lastlen = 0
+            pline += "\n"
+        else:
+            self._lastlen = len(line)
+        self.tr.rewrite(pline, bold=True)
+
+    def pytest_xdist_setupnodes(self, specs):
+        self._specs = specs
+        for spec in specs:
+            self.setstatus(spec, "I", show=False)
+        self.setstatus(spec, "I", show=True)
+        self.ensure_show_status()
+
+    def pytest_xdist_newgateway(self, gateway):
+        if self.config.option.verbose > 0:
+            rinfo = gateway._rinfo()
+            version = "%s.%s.%s" % rinfo.version_info[:3]
+            self.rewrite("[%s] %s Python %s cwd: %s" % (
+                gateway.id, rinfo.platform, version, rinfo.cwd),
+                newline=True)
+        self.setstatus(gateway.spec, "C")
+
+    def pytest_testnodeready(self, node):
+        if self.config.option.verbose > 0:
+            d = node.slaveinfo
+            infoline = "[%s] Python %s" %(
+                d['id'],
+                d['version'].replace('\n', ' -- '),)
+            self.rewrite(infoline, newline=True)
+        self.setstatus(node.gateway.spec, "ok")
+
+    def pytest_testnodedown(self, node, error):
+        if not error:
+            return
+        self.write_line("[%s] node down: %s" %(node.gateway.id, error))
+
+    #def pytest_xdist_rsyncstart(self, source, gateways):
+    #    targets = ",".join([gw.id for gw in gateways])
+    #    msg = "[%s] rsyncing: %s" %(targets, source)
+    #    self.write_line(msg)
+    #def pytest_xdist_rsyncfinish(self, source, gateways):
+    #    targets = ", ".join(["[%s]" % gw.id for gw in gateways])
+    #    self.write_line("rsyncfinish: %s -> %s" %(source, targets))
+

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/looponfail.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/looponfail.py b/tools/pytest-xdist/xdist/looponfail.py
new file mode 100644
index 0000000..e5675a2
--- /dev/null
+++ b/tools/pytest-xdist/xdist/looponfail.py
@@ -0,0 +1,230 @@
+"""
+    Implement -f aka looponfailing for py.test.
+
+    NOTE that we try to avoid loading and depending on application modules
+    within the controlling process (the one that starts repeatedly test
+    processes) otherwise changes to source code can crash
+    the controlling process which should best never happen.
+"""
+
+import py, pytest
+import sys
+import execnet
+
+def looponfail_main(config):
+    remotecontrol = RemoteControl(config)
+    rootdirs = config.getini("looponfailroots")
+    statrecorder = StatRecorder(rootdirs)
+    try:
+        while 1:
+            remotecontrol.loop_once()
+            if not remotecontrol.failures and remotecontrol.wasfailing:
+                continue # the last failures passed, let's immediately rerun all
+            repr_pytest_looponfailinfo(
+                failreports=remotecontrol.failures,
+                rootdirs=rootdirs)
+            statrecorder.waitonchange(checkinterval=2.0)
+    except KeyboardInterrupt:
+        print()
+
+class RemoteControl(object):
+    def __init__(self, config):
+        self.config = config
+        self.failures = []
+
+    def trace(self, *args):
+        if self.config.option.debug:
+            msg = " ".join([str(x) for x in args])
+            py.builtin.print_("RemoteControl:", msg)
+
+    def initgateway(self):
+        return execnet.makegateway("popen")
+
+    def setup(self, out=None):
+        if out is None:
+            out = py.io.TerminalWriter()
+        if hasattr(self, 'gateway'):
+            raise ValueError("already have gateway %r" % self.gateway)
+        self.trace("setting up slave session")
+        self.gateway = self.initgateway()
+        self.channel = channel = self.gateway.remote_exec(init_slave_session,
+            args=self.config.args,
+            option_dict=vars(self.config.option),
+        )
+        remote_outchannel = channel.receive()
+        def write(s):
+            out._file.write(s)
+            out._file.flush()
+        remote_outchannel.setcallback(write)
+
+    def ensure_teardown(self):
+        if hasattr(self, 'channel'):
+            if not self.channel.isclosed():
+                self.trace("closing", self.channel)
+                self.channel.close()
+            del self.channel
+        if hasattr(self, 'gateway'):
+            self.trace("exiting", self.gateway)
+            self.gateway.exit()
+            del self.gateway
+
+    def runsession(self):
+        try:
+            self.trace("sending", self.failures)
+            self.channel.send(self.failures)
+            try:
+                return self.channel.receive()
+            except self.channel.RemoteError:
+                e = sys.exc_info()[1]
+                self.trace("ERROR", e)
+                raise
+        finally:
+            self.ensure_teardown()
+
+    def loop_once(self):
+        self.setup()
+        self.wasfailing = self.failures and len(self.failures)
+        result = self.runsession()
+        failures, reports, collection_failed = result
+        if collection_failed:
+            pass # "Collection failed, keeping previous failure set"
+        else:
+            uniq_failures = []
+            for failure in failures:
+                if failure not in uniq_failures:
+                    uniq_failures.append(failure)
+            self.failures = uniq_failures
+
+def repr_pytest_looponfailinfo(failreports, rootdirs):
+    tr = py.io.TerminalWriter()
+    if failreports:
+        tr.sep("#", "LOOPONFAILING", bold=True)
+        for report in failreports:
+            if report:
+                tr.line(report, red=True)
+    tr.sep("#", "waiting for changes", bold=True)
+    for rootdir in rootdirs:
+        tr.line("### Watching:   %s" %(rootdir,), bold=True)
+
+
+def init_slave_session(channel, args, option_dict):
+    import os, sys
+    outchannel = channel.gateway.newchannel()
+    sys.stdout = sys.stderr = outchannel.makefile('w')
+    channel.send(outchannel)
+    # prune sys.path to not contain relative paths
+    newpaths = []
+    for p in sys.path:
+        if p:
+            if not os.path.isabs(p):
+                p = os.path.abspath(p)
+            newpaths.append(p)
+    sys.path[:] = newpaths
+
+    #fullwidth, hasmarkup = channel.receive()
+    from _pytest.config import Config
+    config = Config.fromdictargs(option_dict, list(args))
+    config.args = args
+    from xdist.looponfail import SlaveFailSession
+    SlaveFailSession(config, channel).main()
+
+class SlaveFailSession:
+    def __init__(self, config, channel):
+        self.config = config
+        self.channel = channel
+        self.recorded_failures = []
+        self.collection_failed = False
+        config.pluginmanager.register(self)
+        config.option.looponfail = False
+        config.option.usepdb = False
+
+    def DEBUG(self, *args):
+        if self.config.option.debug:
+            print(" ".join(map(str, args)))
+
+    def pytest_collection(self, session):
+        self.session = session
+        self.trails = self.current_command
+        hook = self.session.ihook
+        try:
+            items = session.perform_collect(self.trails or None)
+        except pytest.UsageError:
+            items = session.perform_collect(None)
+        hook.pytest_collection_modifyitems(session=session, config=session.config, items=items)
+        hook.pytest_collection_finish(session=session)
+        return True
+
+    def pytest_runtest_logreport(self, report):
+        if report.failed:
+            self.recorded_failures.append(report)
+
+    def pytest_collectreport(self, report):
+        if report.failed:
+            self.recorded_failures.append(report)
+            self.collection_failed = True
+
+    def main(self):
+        self.DEBUG("SLAVE: received configuration, waiting for command trails")
+        try:
+            command = self.channel.receive()
+        except KeyboardInterrupt:
+            return # in the slave we can't do much about this
+        self.DEBUG("received", command)
+        self.current_command = command
+        self.config.hook.pytest_cmdline_main(config=self.config)
+        trails, failreports = [], []
+        for rep in self.recorded_failures:
+            trails.append(rep.nodeid)
+            loc = rep.longrepr
+            loc = str(getattr(loc, 'reprcrash', loc))
+            failreports.append(loc)
+        self.channel.send((trails, failreports, self.collection_failed))
+
+class StatRecorder:
+    def __init__(self, rootdirlist):
+        self.rootdirlist = rootdirlist
+        self.statcache = {}
+        self.check() # snapshot state
+
+    def fil(self, p):
+        return p.check(file=1, dotfile=0) and p.ext != ".pyc"
+    def rec(self, p):
+        return p.check(dotfile=0)
+
+    def waitonchange(self, checkinterval=1.0):
+        while 1:
+            changed = self.check()
+            if changed:
+                return
+            py.std.time.sleep(checkinterval)
+
+    def check(self, removepycfiles=True):
+        changed = False
+        statcache = self.statcache
+        newstat = {}
+        for rootdir in self.rootdirlist:
+            for path in rootdir.visit(self.fil, self.rec):
+                oldstat = statcache.pop(path, None)
+                try:
+                    newstat[path] = curstat = path.stat()
+                except py.error.ENOENT:
+                    if oldstat:
+                        changed = True
+                else:
+                    if oldstat:
+                       if oldstat.mtime != curstat.mtime or \
+                          oldstat.size != curstat.size:
+                            changed = True
+                            py.builtin.print_("# MODIFIED", path)
+                            if removepycfiles and path.ext == ".py":
+                                pycfile = path + "c"
+                                if pycfile.check():
+                                    pycfile.remove()
+
+                    else:
+                        changed = True
+        if statcache:
+            changed = True
+        self.statcache = newstat
+        return changed
+

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/newhooks.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/newhooks.py b/tools/pytest-xdist/xdist/newhooks.py
new file mode 100644
index 0000000..2034617
--- /dev/null
+++ b/tools/pytest-xdist/xdist/newhooks.py
@@ -0,0 +1,21 @@
+
+def pytest_xdist_setupnodes(config, specs):
+    """ called before any remote node is set up. """
+
+def pytest_xdist_newgateway(gateway):
+    """ called on new raw gateway creation. """
+
+def pytest_xdist_rsyncstart(source, gateways):
+    """ called before rsyncing a directory to remote gateways takes place. """
+
+def pytest_xdist_rsyncfinish(source, gateways):
+    """ called after rsyncing a directory to remote gateways takes place. """
+
+def pytest_configure_node(node):
+    """ configure node information before it gets instantiated. """
+
+def pytest_testnodeready(node):
+    """ Test Node is ready to operate. """
+
+def pytest_testnodedown(node, error):
+    """ Test Node is down. """

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/plugin.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/plugin.py b/tools/pytest-xdist/xdist/plugin.py
new file mode 100644
index 0000000..bc32104
--- /dev/null
+++ b/tools/pytest-xdist/xdist/plugin.py
@@ -0,0 +1,131 @@
+import py
+import pytest
+
+def pytest_addoption(parser):
+    group = parser.getgroup("xdist", "distributed and subprocess testing")
+    group._addoption('-f', '--looponfail',
+           action="store_true", dest="looponfail", default=False,
+           help="run tests in subprocess, wait for modified files "
+                "and re-run failing test set until all pass.")
+    group._addoption('-n', dest="numprocesses", metavar="numprocesses",
+           action="store", type="int",
+           help="shortcut for '--dist=load --tx=NUM*popen'")
+    group.addoption('--boxed',
+           action="store_true", dest="boxed", default=False,
+           help="box each test run in a separate process (unix)")
+    group._addoption('--dist', metavar="distmode",
+           action="store", choices=['load', 'each', 'no'],
+           type="choice", dest="dist", default="no",
+           help=("set mode for distributing tests to exec environments.\n\n"
+                 "each: send each test to each available environment.\n\n"
+                 "load: send each test to available environment.\n\n"
+                 "(default) no: run tests inprocess, don't distribute."))
+    group._addoption('--tx', dest="tx", action="append", default=[],
+           metavar="xspec",
+           help=("add a test execution environment. some examples: "
+                 "--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
+                 "--tx ssh=user@codespeak.net//chdir=testcache"))
+    group._addoption('-d',
+           action="store_true", dest="distload", default=False,
+           help="load-balance tests.  shortcut for '--dist=load'")
+    group.addoption('--rsyncdir', action="append", default=[], metavar="DIR",
+           help="add directory for rsyncing to remote tx nodes.")
+    group.addoption('--rsyncignore', action="append", default=[], metavar="GLOB",
+           help="add expression for ignores when rsyncing to remote tx nodes.")
+
+    parser.addini('rsyncdirs', 'list of (relative) paths to be rsynced for'
+         ' remote distributed testing.', type="pathlist")
+    parser.addini('rsyncignore', 'list of (relative) glob-style paths to be ignored '
+         'for rsyncing.', type="pathlist")
+    parser.addini("looponfailroots", type="pathlist",
+        help="directories to check for changes", default=[py.path.local()])
+
+# -------------------------------------------------------------------------
+# distributed testing hooks
+# -------------------------------------------------------------------------
+def pytest_addhooks(pluginmanager):
+    from xdist import newhooks
+    pluginmanager.addhooks(newhooks)
+
+# -------------------------------------------------------------------------
+# distributed testing initialization
+# -------------------------------------------------------------------------
+
+def pytest_cmdline_main(config):
+    check_options(config)
+    if config.getoption("looponfail"):
+        from xdist.looponfail import looponfail_main
+        looponfail_main(config)
+        return 2 # looponfail only can get stop with ctrl-C anyway
+
+def pytest_configure(config, __multicall__):
+    __multicall__.execute()
+    if config.getoption("dist") != "no":
+        from xdist.dsession import DSession
+        session = DSession(config)
+        config.pluginmanager.register(session, "dsession")
+        tr = config.pluginmanager.getplugin("terminalreporter")
+        tr.showfspath = False
+
+def check_options(config):
+    if config.option.numprocesses:
+        config.option.dist = "load"
+        config.option.tx = ['popen'] * int(config.option.numprocesses)
+    if config.option.distload:
+        config.option.dist = "load"
+    val = config.getvalue
+    if not val("collectonly"):
+        usepdb = config.option.usepdb  # a core option
+        if val("looponfail"):
+            if usepdb:
+                raise pytest.UsageError("--pdb incompatible with --looponfail.")
+        elif val("dist") != "no":
+            if usepdb:
+                raise pytest.UsageError("--pdb incompatible with distributing tests.")
+
+
+def pytest_runtest_protocol(item):
+    if item.config.getvalue("boxed"):
+        reports = forked_run_report(item)
+        for rep in reports:
+            item.ihook.pytest_runtest_logreport(report=rep)
+        return True
+
+def forked_run_report(item):
+    # for now, we run setup/teardown in the subprocess
+    # XXX optionally allow sharing of setup/teardown
+    from _pytest.runner import runtestprotocol
+    EXITSTATUS_TESTEXIT = 4
+    import marshal
+    from xdist.remote import serialize_report
+    from xdist.slavemanage import unserialize_report
+    def runforked():
+        try:
+            reports = runtestprotocol(item, log=False)
+        except KeyboardInterrupt:
+            py.std.os._exit(EXITSTATUS_TESTEXIT)
+        return marshal.dumps([serialize_report(x) for x in reports])
+
+    ff = py.process.ForkedFunc(runforked)
+    result = ff.waitfinish()
+    if result.retval is not None:
+        report_dumps = marshal.loads(result.retval)
+        return [unserialize_report("testreport", x) for x in report_dumps]
+    else:
+        if result.exitstatus == EXITSTATUS_TESTEXIT:
+            py.test.exit("forked test item %s raised Exit" %(item,))
+        return [report_process_crash(item, result)]
+
+def report_process_crash(item, result):
+    path, lineno = item._getfslineno()
+    info = ("%s:%s: running the test CRASHED with signal %d" %
+            (path, lineno, result.signal))
+    from _pytest import runner
+    call = runner.CallInfo(lambda: 0/0, "???")
+    call.excinfo = info
+    rep = runner.pytest_runtest_makereport(item, call)
+    if result.out:
+        rep.sections.append(("captured stdout", result.out))
+    if result.err:
+        rep.sections.append(("captured stderr", result.err))
+    return rep

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/remote.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/remote.py b/tools/pytest-xdist/xdist/remote.py
new file mode 100644
index 0000000..a0b2cad
--- /dev/null
+++ b/tools/pytest-xdist/xdist/remote.py
@@ -0,0 +1,147 @@
+"""
+    This module is executed in remote subprocesses and helps to
+    control a remote testing session and relay back information.
+    It assumes that 'py' is importable and does not have dependencies
+    on the rest of the xdist code.  This means that the xdist-plugin
+    needs not to be installed in remote environments.
+"""
+
+import sys, os
+
+class SlaveInteractor:
+    def __init__(self, config, channel):
+        self.config = config
+        self.slaveid = config.slaveinput.get('slaveid', "?")
+        self.log = py.log.Producer("slave-%s" % self.slaveid)
+        if not config.option.debug:
+            py.log.setconsumer(self.log._keywords, None)
+        self.channel = channel
+        config.pluginmanager.register(self)
+
+    def sendevent(self, name, **kwargs):
+        self.log("sending", name, kwargs)
+        self.channel.send((name, kwargs))
+
+    def pytest_internalerror(self, excrepr):
+        for line in str(excrepr).split("\n"):
+            self.log("IERROR>", line)
+
+    def pytest_sessionstart(self, session):
+        self.session = session
+        slaveinfo = getinfodict()
+        self.sendevent("slaveready", slaveinfo=slaveinfo)
+
+    def pytest_sessionfinish(self, __multicall__, exitstatus):
+        self.config.slaveoutput['exitstatus'] = exitstatus
+        res = __multicall__.execute()
+        self.sendevent("slavefinished", slaveoutput=self.config.slaveoutput)
+        return res
+
+    def pytest_collection(self, session):
+        self.sendevent("collectionstart")
+
+    def pytest_runtestloop(self, session):
+        self.log("entering main loop")
+        torun = []
+        while 1:
+            name, kwargs = self.channel.receive()
+            self.log("received command", name, kwargs)
+            if name == "runtests":
+                torun.extend(kwargs['indices'])
+            elif name == "runtests_all":
+                torun.extend(range(len(session.items)))
+            self.log("items to run:", torun)
+            # only run if we have an item and a next item
+            while len(torun) >= 2:
+                self.run_tests(torun)
+            if name == "shutdown":
+                if torun:
+                    self.run_tests(torun)
+                break
+        return True
+
+    def run_tests(self, torun):
+        items = self.session.items
+        self.item_index = torun.pop(0)
+        if torun:
+            nextitem = items[torun[0]]
+        else:
+            nextitem = None
+        self.config.hook.pytest_runtest_protocol(
+            item=items[self.item_index],
+            nextitem=nextitem)
+
+    def pytest_collection_finish(self, session):
+        self.sendevent("collectionfinish",
+            topdir=str(session.fspath),
+            ids=[item.nodeid for item in session.items])
+
+    def pytest_runtest_logstart(self, nodeid, location):
+        self.sendevent("logstart", nodeid=nodeid, location=location)
+
+    def pytest_runtest_logreport(self, report):
+        data = serialize_report(report)
+        data["item_index"] = self.item_index
+        assert self.session.items[self.item_index].nodeid == report.nodeid
+        self.sendevent("testreport", data=data)
+
+    def pytest_collectreport(self, report):
+        data = serialize_report(report)
+        self.sendevent("collectreport", data=data)
+
+def serialize_report(rep):
+    import py
+    d = rep.__dict__.copy()
+    if hasattr(rep.longrepr, 'toterminal'):
+        d['longrepr'] = str(rep.longrepr)
+    else:
+        d['longrepr'] = rep.longrepr
+    for name in d:
+        if isinstance(d[name], py.path.local):
+            d[name] = str(d[name])
+        elif name == "result":
+            d[name] = None # for now
+    return d
+
+def getinfodict():
+    import platform
+    return dict(
+        version = sys.version,
+        version_info = tuple(sys.version_info),
+        sysplatform = sys.platform,
+        platform = platform.platform(),
+        executable = sys.executable,
+        cwd = os.getcwd(),
+    )
+
+def remote_initconfig(option_dict, args):
+    from _pytest.config import Config
+    option_dict['plugins'].append("no:terminal")
+    config = Config.fromdictargs(option_dict, args)
+    config.option.looponfail = False
+    config.option.usepdb = False
+    config.option.dist = "no"
+    config.option.distload = False
+    config.option.numprocesses = None
+    config.args = args
+    return config
+
+
+if __name__ == '__channelexec__':
+    channel = channel  # noqa
+    # python3.2 is not concurrent import safe, so let's play it safe
+    # https://bitbucket.org/hpk42/pytest/issue/347/pytest-xdist-and-python-32
+    if sys.version_info[:2] == (3,2):
+        os.environ["PYTHONDONTWRITEBYTECODE"] = "1"
+    slaveinput,args,option_dict = channel.receive()
+    importpath = os.getcwd()
+    sys.path.insert(0, importpath) # XXX only for remote situations
+    os.environ['PYTHONPATH'] = (importpath + os.pathsep +
+        os.environ.get('PYTHONPATH', ''))
+    #os.environ['PYTHONPATH'] = importpath
+    import py
+    config = remote_initconfig(option_dict, args)
+    config.slaveinput = slaveinput
+    config.slaveoutput = {}
+    interactor = SlaveInteractor(config, channel)
+    config.hook.pytest_cmdline_main(config=config)

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/145542d6/tools/pytest-xdist/xdist/slavemanage.py
----------------------------------------------------------------------
diff --git a/tools/pytest-xdist/xdist/slavemanage.py b/tools/pytest-xdist/xdist/slavemanage.py
new file mode 100644
index 0000000..4f3b8c0
--- /dev/null
+++ b/tools/pytest-xdist/xdist/slavemanage.py
@@ -0,0 +1,316 @@
+import fnmatch
+import os
+
+import py
+import pytest
+import execnet
+import xdist.remote
+
+from _pytest import runner # XXX load dynamically
+
+class NodeManager(object):
+    EXIT_TIMEOUT = 10
+    DEFAULT_IGNORES = ['.*', '*.pyc', '*.pyo', '*~']
+    def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"):
+        self.config = config
+        self._nodesready = py.std.threading.Event()
+        self.trace = self.config.trace.get("nodemanager")
+        self.group = execnet.Group()
+        if specs is None:
+            specs = self._getxspecs()
+        self.specs = []
+        for spec in specs:
+            if not isinstance(spec, execnet.XSpec):
+                spec = execnet.XSpec(spec)
+            if not spec.chdir and not spec.popen:
+                spec.chdir = defaultchdir
+            self.group.allocate_id(spec)
+            self.specs.append(spec)
+        self.roots = self._getrsyncdirs()
+        self.rsyncoptions = self._getrsyncoptions()
+
+    def rsync_roots(self):
+        """ make sure that all remote gateways
+            have the same set of roots in their
+            current directory.
+        """
+        if self.roots:
+            # send each rsync root
+            for root in self.roots:
+                self.rsync(root, **self.rsyncoptions)
+
+    def makegateways(self):
+        assert not list(self.group)
+        self.config.hook.pytest_xdist_setupnodes(config=self.config,
+            specs=self.specs)
+        for spec in self.specs:
+            gw = self.group.makegateway(spec)
+            self.config.hook.pytest_xdist_newgateway(gateway=gw)
+
+    def setup_nodes(self, putevent):
+        self.makegateways()
+        self.rsync_roots()
+        self.trace("setting up nodes")
+        for gateway in self.group:
+            node = SlaveController(self, gateway, self.config, putevent)
+            gateway.node = node  # to keep node alive
+            node.setup()
+            self.trace("started node %r" % node)
+
+    def teardown_nodes(self):
+        self.group.terminate(self.EXIT_TIMEOUT)
+
+    def _getxspecs(self):
+        xspeclist = []
+        for xspec in self.config.getvalue("tx"):
+            i = xspec.find("*")
+            try:
+                num = int(xspec[:i])
+            except ValueError:
+                xspeclist.append(xspec)
+            else:
+                xspeclist.extend([xspec[i+1:]] * num)
+        if not xspeclist:
+            raise pytest.UsageError(
+                "MISSING test execution (tx) nodes: please specify --tx")
+        return [execnet.XSpec(x) for x in xspeclist]
+
+    def _getrsyncdirs(self):
+        for spec in self.specs:
+            if not spec.popen or spec.chdir:
+                break
+        else:
+            return []
+        import pytest, _pytest
+        pytestpath = pytest.__file__.rstrip("co")
+        pytestdir = py.path.local(_pytest.__file__).dirpath()
+        config = self.config
+        candidates = [py._pydir,pytestpath,pytestdir]
+        candidates += config.option.rsyncdir
+        rsyncroots = config.getini("rsyncdirs")
+        if rsyncroots:
+            candidates.extend(rsyncroots)
+        roots = []
+        for root in candidates:
+            root = py.path.local(root).realpath()
+            if not root.check():
+                raise pytest.UsageError("rsyncdir doesn't exist: %r" %(root,))
+            if root not in roots:
+                roots.append(root)
+        return roots
+
+    def _getrsyncoptions(self):
+        """Get options to be passed for rsync."""
+        ignores = list(self.DEFAULT_IGNORES)
+        ignores += self.config.option.rsyncignore
+        ignores += self.config.getini("rsyncignore")
+
+        return {
+            'ignores': ignores,
+            'verbose': self.config.option.verbose,
+        }
+
+
+    def rsync(self, source, notify=None, verbose=False, ignores=None):
+        """ perform rsync to all remote hosts.
+        """
+        rsync = HostRSync(source, verbose=verbose, ignores=ignores)
+        seen = py.builtin.set()
+        gateways = []
+        for gateway in self.group:
+            spec = gateway.spec
+            if spec.popen and not spec.chdir:
+                # XXX this assumes that sources are python-packages
+                # and that adding the basedir does not hurt
+                gateway.remote_exec("""
+                    import sys ; sys.path.insert(0, %r)
+                """ % os.path.dirname(str(source))).waitclose()
+                continue
+            if spec not in seen:
+                def finished():
+                    if notify:
+                        notify("rsyncrootready", spec, source)
+                rsync.add_target_host(gateway, finished=finished)
+                seen.add(spec)
+                gateways.append(gateway)
+        if seen:
+            self.config.hook.pytest_xdist_rsyncstart(
+                source=source,
+                gateways=gateways,
+            )
+            rsync.send()
+            self.config.hook.pytest_xdist_rsyncfinish(
+                source=source,
+                gateways=gateways,
+            )
+
+class HostRSync(execnet.RSync):
+    """ RSyncer that filters out common files
+    """
+    def __init__(self, sourcedir, *args, **kwargs):
+        self._synced = {}
+        ignores= None
+        if 'ignores' in kwargs:
+            ignores = kwargs.pop('ignores')
+        self._ignores = ignores or []
+        super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
+
+    def filter(self, path):
+        path = py.path.local(path)
+        for x in self._ignores:
+            x = getattr(x, 'strpath', x)
+            if fnmatch.fnmatch(path.basename, x) or fnmatch.fnmatch(path.strpath, x):
+                return False
+        else:
+            return True
+
+    def add_target_host(self, gateway, finished=None):
+        remotepath = os.path.basename(self._sourcedir)
+        super(HostRSync, self).add_target(gateway, remotepath,
+                                          finishedcallback=finished,
+                                          delete=True,)
+
+    def _report_send_file(self, gateway, modified_rel_path):
+        if self._verbose:
+            path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
+            remotepath = gateway.spec.chdir
+            py.builtin.print_('%s:%s <= %s' %
+                              (gateway.spec, remotepath, path))
+
+
+def make_reltoroot(roots, args):
+    # XXX introduce/use public API for splitting py.test args
+    splitcode = "::"
+    l = []
+    for arg in args:
+        parts = arg.split(splitcode)
+        fspath = py.path.local(parts[0])
+        for root in roots:
+            x = fspath.relto(root)
+            if x or fspath == root:
+                parts[0] = root.basename + "/" + x
+                break
+        else:
+            raise ValueError("arg %s not relative to an rsync root" % (arg,))
+        l.append(splitcode.join(parts))
+    return l
+
+class SlaveController(object):
+    ENDMARK = -1
+
+    def __init__(self, nodemanager, gateway, config, putevent):
+        self.nodemanager = nodemanager
+        self.putevent = putevent
+        self.gateway = gateway
+        self.config = config
+        self.slaveinput = {'slaveid': gateway.id}
+        self._down = False
+        self.log = py.log.Producer("slavectl-%s" % gateway.id)
+        if not self.config.option.debug:
+            py.log.setconsumer(self.log._keywords, None)
+
+    def __repr__(self):
+        return "<%s %s>" %(self.__class__.__name__, self.gateway.id,)
+
+    def setup(self):
+        self.log("setting up slave session")
+        spec = self.gateway.spec
+        args = self.config.args
+        if not spec.popen or spec.chdir:
+            args = make_reltoroot(self.nodemanager.roots, args)
+        option_dict = vars(self.config.option)
+        if spec.popen:
+            name = "popen-%s" % self.gateway.id
+            basetemp = self.config._tmpdirhandler.getbasetemp()
+            option_dict['basetemp'] = str(basetemp.join(name))
+        self.config.hook.pytest_configure_node(node=self)
+        self.channel = self.gateway.remote_exec(xdist.remote)
+        self.channel.send((self.slaveinput, args, option_dict))
+        if self.putevent:
+            self.channel.setcallback(self.process_from_remote,
+                endmarker=self.ENDMARK)
+
+    def ensure_teardown(self):
+        if hasattr(self, 'channel'):
+            if not self.channel.isclosed():
+                self.log("closing", self.channel)
+                self.channel.close()
+            #del self.channel
+        if hasattr(self, 'gateway'):
+            self.log("exiting", self.gateway)
+            self.gateway.exit()
+            #del self.gateway
+
+    def send_runtest_some(self, indices):
+        self.sendcommand("runtests", indices=indices)
+
+    def send_runtest_all(self):
+        self.sendcommand("runtests_all",)
+
+    def shutdown(self):
+        if not self._down:
+            try:
+                self.sendcommand("shutdown")
+            except IOError:
+                pass
+
+    def sendcommand(self, name, **kwargs):
+        """ send a named parametrized command to the other side. """
+        self.log("sending command %s(**%s)" % (name, kwargs))
+        self.channel.send((name, kwargs))
+
+    def notify_inproc(self, eventname, **kwargs):
+        self.log("queuing %s(**%s)" % (eventname, kwargs))
+        self.putevent((eventname, kwargs))
+
+    def process_from_remote(self, eventcall):
+        """ this gets called for each object we receive from
+            the other side and if the channel closes.
+
+            Note that channel callbacks run in the receiver
+            thread of execnet gateways - we need to
+            avoid raising exceptions or doing heavy work.
+        """
+        try:
+            if eventcall == self.ENDMARK:
+                err = self.channel._getremoteerror()
+                if not self._down:
+                    if not err or isinstance(err, EOFError):
+                        err = "Not properly terminated" # lost connection?
+                    self.notify_inproc("errordown", node=self, error=err)
+                    self._down = True
+                return
+            eventname, kwargs = eventcall
+            if eventname in ("collectionstart"):
+                self.log("ignoring %s(%s)" %(eventname, kwargs))
+            elif eventname == "slaveready":
+                self.notify_inproc(eventname, node=self, **kwargs)
+            elif eventname == "slavefinished":
+                self._down = True
+                self.slaveoutput = kwargs['slaveoutput']
+                self.notify_inproc("slavefinished", node=self)
+            elif eventname == "logstart":
+                self.notify_inproc(eventname, node=self, **kwargs)
+            elif eventname in ("testreport", "collectreport", "teardownreport"):
+                item_index = kwargs.pop("item_index", None)
+                rep = unserialize_report(eventname, kwargs['data'])
+                if item_index is not None:
+                    rep.item_index = item_index
+                self.notify_inproc(eventname, node=self, rep=rep)
+            elif eventname == "collectionfinish":
+                self.notify_inproc(eventname, node=self, ids=kwargs['ids'])
+            else:
+                raise ValueError("unknown event: %s" %(eventname,))
+        except KeyboardInterrupt:
+            # should not land in receiver-thread
+            raise
+        except:
+            excinfo = py.code.ExceptionInfo()
+            py.builtin.print_("!" * 20, excinfo)
+            self.config.pluginmanager.notify_exception(excinfo)
+
+def unserialize_report(name, reportdict):
+    if name == "testreport":
+        return runner.TestReport(**reportdict)
+    elif name == "collectreport":
+        return runner.CollectReport(**reportdict)


Mime
View raw message