cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject [22/36] cassandra-dtest git commit: Migrate dtests to use pytest and python3
Date Mon, 29 Jan 2018 21:10:25 GMT
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/paxos_test.py
----------------------------------------------------------------------
diff --git a/paxos_test.py b/paxos_test.py
new file mode 100644
index 0000000..736ca46
--- /dev/null
+++ b/paxos_test.py
@@ -0,0 +1,195 @@
+import time
+import pytest
+import logging
+
+from threading import Thread
+
+from cassandra import ConsistencyLevel, WriteTimeout
+from cassandra.query import SimpleStatement
+
+from tools.assertions import assert_unavailable
+from dtest import Tester, create_ks
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+
+@since('2.0.6')
+class TestPaxos(Tester):
+
+    def prepare(self, ordered=False, create_keyspace=True, use_cache=False, nodes=1, rf=1):
+        cluster = self.cluster
+
+        if (ordered):
+            cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner")
+
+        if (use_cache):
+            cluster.set_configuration_options(values={'row_cache_size_in_mb': 100})
+
+        cluster.populate(nodes).start()
+        node1 = cluster.nodelist()[0]
+        time.sleep(0.2)
+
+        session = self.patient_cql_connection(node1)
+        if create_keyspace:
+            create_ks(session, 'ks', rf)
+        return session
+
+    def test_replica_availability(self):
+        """
+        @jira_ticket CASSANDRA-8640
+
+        Regression test for a bug (CASSANDRA-8640) that required all nodes to
+        be available in order to run LWT queries, even if the query could
+        complete correctly with quorum nodes available.
+        """
+        session = self.prepare(nodes=3, rf=3)
+        session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)")
+        session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS")
+
+        self.cluster.nodelist()[2].stop()
+        session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS")
+
+        self.cluster.nodelist()[1].stop()
+        assert_unavailable(session.execute, "INSERT INTO test (k, v) VALUES (2, 2) IF NOT EXISTS")
+
+        self.cluster.nodelist()[1].start(wait_for_binary_proto=True, wait_other_notice=True)
+        session.execute("INSERT INTO test (k, v) VALUES (3, 3) IF NOT EXISTS")
+
+        self.cluster.nodelist()[2].start(wait_for_binary_proto=True)
+        session.execute("INSERT INTO test (k, v) VALUES (4, 4) IF NOT EXISTS")
+
+    @pytest.mark.no_vnodes
+    def test_cluster_availability(self):
+        # Warning, a change in partitioner or a change in CCM token allocation
+        # may require the partition keys of these inserts to be changed.
+        # This must not use vnodes as it relies on assumed token values.
+
+        session = self.prepare(nodes=3)
+        session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)")
+        session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS")
+
+        self.cluster.nodelist()[2].stop()
+        session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS")
+
+        self.cluster.nodelist()[1].stop()
+        session.execute("INSERT INTO test (k, v) VALUES (3, 2) IF NOT EXISTS")
+
+        self.cluster.nodelist()[1].start(wait_for_binary_proto=True)
+        session.execute("INSERT INTO test (k, v) VALUES (5, 5) IF NOT EXISTS")
+
+        self.cluster.nodelist()[2].start(wait_for_binary_proto=True)
+        session.execute("INSERT INTO test (k, v) VALUES (6, 6) IF NOT EXISTS")
+
+    def test_contention_multi_iterations(self):
+        pytest.skip("Hanging the build")
+        self._contention_test(8, 100)
+
+    # Warning, this test will require you to raise the open
+    # file limit on OSX. Use 'ulimit -n 1000'
+    def test_contention_many_threads(self):
+        self._contention_test(300, 1)
+
+    def _contention_test(self, threads, iterations):
+        """
+        Test threads repeatedly contending on the same row.
+        """
+
+        verbose = False
+
+        session = self.prepare(nodes=3)
+        session.execute("CREATE TABLE test (k int, v int static, id int, PRIMARY KEY (k, id))")
+        session.execute("INSERT INTO test(k, v) VALUES (0, 0)")
+
+        class Worker(Thread):
+
+            def __init__(self, wid, session, iterations, query):
+                Thread.__init__(self)
+                self.wid = wid
+                self.iterations = iterations
+                self.query = query
+                self.session = session
+                self.errors = 0
+                self.retries = 0
+
+            def run(self):
+                global worker_done
+                i = 0
+                prev = 0
+                while i < self.iterations:
+                    done = False
+                    while not done:
+                        try:
+                            res = self.session.execute(self.query, (prev + 1, prev, self.wid))
+                            if verbose:
+                                print("[%3d] CAS %3d -> %3d (res: %s)" % (self.wid, prev, prev + 1, str(res)))
+                            if res[0][0] is True:
+                                done = True
+                                prev = prev + 1
+                            else:
+                                self.retries = self.retries + 1
+                                # There is 2 conditions, so 2 reasons to fail: if we failed because the row with our
+                                # worker ID already exists, it means we timeout earlier but our update did went in,
+                                # so do consider this as a success
+                                prev = res[0][3]
+                                if res[0][2] is not None:
+                                    if verbose:
+                                        print("[%3d] Update was inserted on previous try (res = %s)" % (self.wid, str(res)))
+                                    done = True
+                        except WriteTimeout as e:
+                            if verbose:
+                                print("[%3d] TIMEOUT (%s)" % (self.wid, str(e)))
+                            # This means a timeout: just retry, if it happens that our update was indeed persisted,
+                            # we'll figure it out on the next run.
+                            self.retries = self.retries + 1
+                        except Exception as e:
+                            if verbose:
+                                print("[%3d] ERROR: %s" % (self.wid, str(e)))
+                            self.errors = self.errors + 1
+                            done = True
+                    i = i + 1
+                    # Clean up for next iteration
+                    while True:
+                        try:
+                            self.session.execute("DELETE FROM test WHERE k = 0 AND id = %d IF EXISTS" % self.wid)
+                            break
+                        except WriteTimeout as e:
+                            pass
+
+        nodes = self.cluster.nodelist()
+        workers = []
+
+        c = self.patient_cql_connection(nodes[0], keyspace='ks')
+        q = c.prepare("""
+                BEGIN BATCH
+                   UPDATE test SET v = ? WHERE k = 0 IF v = ?;
+                   INSERT INTO test (k, id) VALUES (0, ?) IF NOT EXISTS;
+                APPLY BATCH
+            """)
+
+        for n in range(0, threads):
+            workers.append(Worker(n, c, iterations, q))
+
+        start = time.time()
+
+        for w in workers:
+            w.start()
+
+        for w in workers:
+            w.join()
+
+        if verbose:
+            runtime = time.time() - start
+            print("runtime:", runtime)
+
+        query = SimpleStatement("SELECT v FROM test WHERE k = 0", consistency_level=ConsistencyLevel.ALL)
+        rows = session.execute(query)
+        value = rows[0][0]
+
+        errors = 0
+        retries = 0
+        for w in workers:
+            errors = errors + w.errors
+            retries = retries + w.retries
+
+        assert (value == threads * iterations) and (errors == 0), "value={}, errors={}, retries={}".format(value, errors, retries)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/paxos_tests.py
----------------------------------------------------------------------
diff --git a/paxos_tests.py b/paxos_tests.py
deleted file mode 100644
index 6c0bd28..0000000
--- a/paxos_tests.py
+++ /dev/null
@@ -1,192 +0,0 @@
-# coding: utf-8
-
-import time
-from threading import Thread
-
-from cassandra import ConsistencyLevel, WriteTimeout
-from cassandra.query import SimpleStatement
-
-from tools.assertions import assert_unavailable
-from dtest import Tester, create_ks
-from tools.decorators import no_vnodes, since
-
-
-@since('2.0.6')
-class TestPaxos(Tester):
-
-    def prepare(self, ordered=False, create_keyspace=True, use_cache=False, nodes=1, rf=1):
-        cluster = self.cluster
-
-        if (ordered):
-            cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner")
-
-        if (use_cache):
-            cluster.set_configuration_options(values={'row_cache_size_in_mb': 100})
-
-        cluster.populate(nodes).start()
-        node1 = cluster.nodelist()[0]
-        time.sleep(0.2)
-
-        session = self.patient_cql_connection(node1)
-        if create_keyspace:
-            create_ks(session, 'ks', rf)
-        return session
-
-    def replica_availability_test(self):
-        """
-        @jira_ticket CASSANDRA-8640
-
-        Regression test for a bug (CASSANDRA-8640) that required all nodes to
-        be available in order to run LWT queries, even if the query could
-        complete correctly with quorum nodes available.
-        """
-        session = self.prepare(nodes=3, rf=3)
-        session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)")
-        session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS")
-
-        self.cluster.nodelist()[2].stop()
-        session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS")
-
-        self.cluster.nodelist()[1].stop()
-        assert_unavailable(session.execute, "INSERT INTO test (k, v) VALUES (2, 2) IF NOT EXISTS")
-
-        self.cluster.nodelist()[1].start(wait_for_binary_proto=True, wait_other_notice=True)
-        session.execute("INSERT INTO test (k, v) VALUES (3, 3) IF NOT EXISTS")
-
-        self.cluster.nodelist()[2].start(wait_for_binary_proto=True)
-        session.execute("INSERT INTO test (k, v) VALUES (4, 4) IF NOT EXISTS")
-
-    @no_vnodes()
-    def cluster_availability_test(self):
-        # Warning, a change in partitioner or a change in CCM token allocation
-        # may require the partition keys of these inserts to be changed.
-        # This must not use vnodes as it relies on assumed token values.
-
-        session = self.prepare(nodes=3)
-        session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)")
-        session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS")
-
-        self.cluster.nodelist()[2].stop()
-        session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS")
-
-        self.cluster.nodelist()[1].stop()
-        session.execute("INSERT INTO test (k, v) VALUES (3, 2) IF NOT EXISTS")
-
-        self.cluster.nodelist()[1].start(wait_for_binary_proto=True)
-        session.execute("INSERT INTO test (k, v) VALUES (5, 5) IF NOT EXISTS")
-
-        self.cluster.nodelist()[2].start(wait_for_binary_proto=True)
-        session.execute("INSERT INTO test (k, v) VALUES (6, 6) IF NOT EXISTS")
-
-    def contention_test_multi_iterations(self):
-        self.skipTest("Hanging the build")
-        self._contention_test(8, 100)
-
-    # Warning, this test will require you to raise the open
-    # file limit on OSX. Use 'ulimit -n 1000'
-    def contention_test_many_threads(self):
-        self._contention_test(300, 1)
-
-    def _contention_test(self, threads, iterations):
-        """
-        Test threads repeatedly contending on the same row.
-        """
-
-        verbose = False
-
-        session = self.prepare(nodes=3)
-        session.execute("CREATE TABLE test (k int, v int static, id int, PRIMARY KEY (k, id))")
-        session.execute("INSERT INTO test(k, v) VALUES (0, 0)")
-
-        class Worker(Thread):
-
-            def __init__(self, wid, session, iterations, query):
-                Thread.__init__(self)
-                self.wid = wid
-                self.iterations = iterations
-                self.query = query
-                self.session = session
-                self.errors = 0
-                self.retries = 0
-
-            def run(self):
-                global worker_done
-                i = 0
-                prev = 0
-                while i < self.iterations:
-                    done = False
-                    while not done:
-                        try:
-                            res = self.session.execute(self.query, (prev + 1, prev, self.wid))
-                            if verbose:
-                                print "[%3d] CAS %3d -> %3d (res: %s)" % (self.wid, prev, prev + 1, str(res))
-                            if res[0][0] is True:
-                                done = True
-                                prev = prev + 1
-                            else:
-                                self.retries = self.retries + 1
-                                # There is 2 conditions, so 2 reasons to fail: if we failed because the row with our
-                                # worker ID already exists, it means we timeout earlier but our update did went in,
-                                # so do consider this as a success
-                                prev = res[0][3]
-                                if res[0][2] is not None:
-                                    if verbose:
-                                        print "[%3d] Update was inserted on previous try (res = %s)" % (self.wid, str(res))
-                                    done = True
-                        except WriteTimeout as e:
-                            if verbose:
-                                print "[%3d] TIMEOUT (%s)" % (self.wid, str(e))
-                            # This means a timeout: just retry, if it happens that our update was indeed persisted,
-                            # we'll figure it out on the next run.
-                            self.retries = self.retries + 1
-                        except Exception as e:
-                            if verbose:
-                                print "[%3d] ERROR: %s" % (self.wid, str(e))
-                            self.errors = self.errors + 1
-                            done = True
-                    i = i + 1
-                    # Clean up for next iteration
-                    while True:
-                        try:
-                            self.session.execute("DELETE FROM test WHERE k = 0 AND id = %d IF EXISTS" % self.wid)
-                            break
-                        except WriteTimeout as e:
-                            pass
-
-        nodes = self.cluster.nodelist()
-        workers = []
-
-        c = self.patient_cql_connection(nodes[0], keyspace='ks')
-        q = c.prepare("""
-                BEGIN BATCH
-                   UPDATE test SET v = ? WHERE k = 0 IF v = ?;
-                   INSERT INTO test (k, id) VALUES (0, ?) IF NOT EXISTS;
-                APPLY BATCH
-            """)
-
-        for n in range(0, threads):
-            workers.append(Worker(n, c, iterations, q))
-
-        start = time.time()
-
-        for w in workers:
-            w.start()
-
-        for w in workers:
-            w.join()
-
-        if verbose:
-            runtime = time.time() - start
-            print "runtime:", runtime
-
-        query = SimpleStatement("SELECT v FROM test WHERE k = 0", consistency_level=ConsistencyLevel.ALL)
-        rows = session.execute(query)
-        value = rows[0][0]
-
-        errors = 0
-        retries = 0
-        for w in workers:
-            errors = errors + w.errors
-            retries = retries + w.retries
-
-        self.assertTrue((value == threads * iterations) and (errors == 0), "value={}, errors={}, retries={}".format(value, errors, retries))

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pending_range_test.py
----------------------------------------------------------------------
diff --git a/pending_range_test.py b/pending_range_test.py
index efa56f4..55d810b 100644
--- a/pending_range_test.py
+++ b/pending_range_test.py
@@ -1,21 +1,25 @@
+import logging
+import pytest
+
 from cassandra.query import SimpleStatement
-from nose.plugins.attrib import attr
 
-from dtest import TRACE, Tester, debug, create_ks
-from tools.decorators import no_vnodes
+from dtest import Tester, create_ks
+from plugins.assert_tools import assert_regexp_matches
+
+logger = logging.getLogger(__name__)
 
 
-@no_vnodes()
+@pytest.mark.no_vnodes
 class TestPendingRangeMovements(Tester):
 
-    @attr('resource-intensive')
-    def pending_range_test(self):
+    @pytest.mark.resource_intensive
+    def test_pending_range(self):
         """
         @jira_ticket CASSANDRA-10887
         """
         cluster = self.cluster
         # If we are on 2.1, we need to set the log level to debug or higher, as debug.log does not exist.
-        if cluster.version() < '2.2' and not TRACE:
+        if cluster.version() < '2.2':
             cluster.set_log_level('DEBUG')
 
         # Create 5 node cluster
@@ -35,7 +39,7 @@ class TestPendingRangeMovements(Tester):
         lwt_query = SimpleStatement("UPDATE users SET email = 'janedoe@abc.com' WHERE login = 'jdoe3' IF email = 'jdoe@abc.com'")
 
         # Show we can execute LWT no problem
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute(lwt_query)
 
         token = '-634023222112864484'
@@ -61,9 +65,9 @@ class TestPendingRangeMovements(Tester):
 
         # Verify other nodes believe this is Down/Moving
         out, _, _ = node2.nodetool('ring')
-        debug("Nodetool Ring output: {}".format(out))
-        self.assertRegexpMatches(out, '127\.0\.0\.1.*?Down.*?Moving')
+        logger.debug("Nodetool Ring output: {}".format(out))
+        assert_regexp_matches(out, '127\.0\.0\.1.*?Down.*?Moving')
 
         # Check we can still execute LWT
-        for i in xrange(1000):
+        for i in range(1000):
             session.execute(lwt_query)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/assert_tools.py
----------------------------------------------------------------------
diff --git a/plugins/assert_tools.py b/plugins/assert_tools.py
new file mode 100644
index 0000000..9f796ed
--- /dev/null
+++ b/plugins/assert_tools.py
@@ -0,0 +1,138 @@
+"""
+Copyright 2016 Oliver Schoenborn. BSD 3-Clause license (see __license__ at bottom of this file for details).
+
+This module is part of the nose2pytest distribution.
+
+This module's assert_ functions provide drop-in replacements for nose.tools.assert_ functions (many of which are
+pep-8-ized extractions from Python's unittest.case.TestCase methods). As such, it can be imported in a test
+suite run by py.test, to replace the nose imports with functions that rely on py.test's assertion
+introspection for error reporting.  When combined with running nose2pytest.py on your test suite, this
+module may be sufficient to decrease your test suite's third-party dependencies by 1.
+"""
+
+import unittest
+
+
+__all__ = [
+    'assert_almost_equal',
+    'assert_not_almost_equal',
+    'assert_dict_contains_subset',
+
+    'assert_raises_regex',
+    'assert_raises_regexp',
+    'assert_regexp_matches',
+    'assert_warns_regex',
+]
+
+
+def assert_almost_equal(a, b, places=7, msg=None):
+    """
+    Fail if the two objects are unequal as determined by their
+    difference rounded to the given number of decimal places
+    and comparing to zero.
+
+    Note that decimal places (from zero) are usually not the same
+    as significant digits (measured from the most signficant digit).
+
+    See the builtin round() function for places parameter.
+    """
+    if msg is None:
+        assert round(abs(b - a), places) == 0
+    else:
+        assert round(abs(b - a), places) == 0, msg
+
+
+def assert_not_almost_equal(a, b, places=7, msg=None):
+    """
+    Fail if the two objects are equal as determined by their
+    difference rounded to the given number of decimal places
+    and comparing to zero.
+
+    Note that decimal places (from zero) are usually not the same
+    as significant digits (measured from the most signficant digit).
+
+    See the builtin round() function for places parameter.
+    """
+    if msg is None:
+        assert round(abs(b - a), places) != 0
+    else:
+        assert round(abs(b - a), places) != 0, msg
+
+
+def assert_dict_contains_subset(subset, dictionary, msg=None):
+    """
+    Checks whether dictionary is a superset of subset. If not, the assertion message will have useful details,
+    unless msg is given, then msg is output.
+    """
+    dictionary = dictionary
+    missing_keys = sorted(list(set(subset.keys()) - set(dictionary.keys())))
+    mismatch_vals = {k: (subset[k], dictionary[k]) for k in subset if k in dictionary and subset[k] != dictionary[k]}
+    if msg is None:
+        assert missing_keys == [], 'Missing keys = {}'.format(missing_keys)
+        assert mismatch_vals == {}, 'Mismatched values (s, d) = {}'.format(mismatch_vals)
+    else:
+        assert missing_keys == [], msg
+        assert mismatch_vals == {}, msg
+
+
+# make other unittest.TestCase methods available as-is as functions; trick taken from Nose
+
+class _Dummy(unittest.TestCase):
+    def do_nothing(self):
+        pass
+
+_t = _Dummy('do_nothing')
+
+assert_raises_regex=_t.assertRaisesRegex,
+assert_raises_regexp=_t.assertRaisesRegexp,
+assert_regexp_matches=_t.assertRegexpMatches,
+assert_warns_regex=_t.assertWarnsRegex,
+
+del _Dummy
+del _t
+
+
+# py.test integration: add all assert_ function to the pytest package namespace
+
+# Use similar trick as Nose to bring in bound methods from unittest.TestCase as free functions:
+
+def pytest_namespace() -> {str: callable}:
+    namespace = {}
+    for name, obj in globals().items():
+        if name.startswith('assert_'):
+            namespace[name] = obj
+
+    return namespace
+
+
+# licensing
+
+__license__ = """
+    Copyright (c) 2016, Oliver Schoenborn
+    All rights reserved.
+
+    Redistribution and use in source and binary forms, with or without
+    modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, this
+      list of conditions and the following disclaimer.
+
+    * Redistributions in binary form must reproduce the above copyright notice,
+      this list of conditions and the following disclaimer in the documentation
+      and/or other materials provided with the distribution.
+
+    * Neither the name of nose2pytest nor the names of its
+      contributors may be used to endorse or promote products derived from
+      this software without specific prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+    AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+    IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+    FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+    SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+    OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestcollect.py
----------------------------------------------------------------------
diff --git a/plugins/dtestcollect.py b/plugins/dtestcollect.py
deleted file mode 100644
index 6b2dac4..0000000
--- a/plugins/dtestcollect.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import os
-from nose.plugins.base import Plugin
-from nose.case import Test
-import logging
-import unittest
-
-log = logging.getLogger(__name__)
-
-
-class DTestCollect(Plugin):
-    """
-    Collect and output test names only, don't run any tests.
-    """
-    name = 'dtest_collect'
-    enableOpt = 'dtest_collect_only'
-
-    def options(self, parser, env):
-        """Register commandline options.
-        """
-        parser.add_option('--dtest-collect-only',
-                          action='store_true',
-                          dest=self.enableOpt,
-                          default=env.get('DTEST_NOSE_COLLECT_ONLY'),
-                          help="Enable collect-only: %s [COLLECT_ONLY]" %
-                          (self.help()))
-
-    def prepareTestLoader(self, loader):
-        """Install collect-only suite class in TestLoader.
-        """
-        # Disable context awareness
-        log.debug("Preparing test loader")
-        loader.suiteClass = TestSuiteFactory(self.conf)
-
-    def prepareTestCase(self, test):
-        """Replace actual test with dummy that always passes.
-        """
-        # Return something that always passes
-        log.debug("Preparing test case %s", test)
-        if not isinstance(test, Test):
-            return
-
-        def run(result):
-            # We need to make these plugin calls because there won't be
-            # a result proxy, due to using a stripped-down test suite
-            self.conf.plugins.startTest(test)
-            result.startTest(test)
-            self.conf.plugins.addSuccess(test)
-            result.addSuccess(test)
-            self.conf.plugins.stopTest(test)
-            result.stopTest(test)
-        return run
-
-    def describeTest(self, test):
-        tag = os.getenv('TEST_TAG', '')
-        if tag == '':
-            tag = test.test._testMethodName
-        else:
-            tag = test.test._testMethodName + "-" + tag
-        retval = "%s:%s.%s" % (test.test.__module__, test.test.__class__.__name__, tag)
-        return retval
-
-
-class TestSuiteFactory:
-    """
-    Factory for producing configured test suites.
-    """
-    def __init__(self, conf):
-        self.conf = conf
-
-    def __call__(self, tests=(), **kw):
-        return TestSuite(tests, conf=self.conf)
-
-
-class TestSuite(unittest.TestSuite):
-    """
-    Basic test suite that bypasses most proxy and plugin calls, but does
-    wrap tests in a nose.case.Test so prepareTestCase will be called.
-    """
-    def __init__(self, tests=(), conf=None):
-        self.conf = conf
-        # Exec lazy suites: makes discovery depth-first
-        if callable(tests):
-            tests = tests()
-        log.debug("TestSuite(%r)", tests)
-        unittest.TestSuite.__init__(self, tests)
-
-    def addTest(self, test):
-        log.debug("Add test %s", test)
-        if isinstance(test, unittest.TestSuite):
-            self._tests.append(test)
-        else:
-            self._tests.append(Test(test, config=self.conf))

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestconfig.py
----------------------------------------------------------------------
diff --git a/plugins/dtestconfig.py b/plugins/dtestconfig.py
deleted file mode 100644
index 5363d9a..0000000
--- a/plugins/dtestconfig.py
+++ /dev/null
@@ -1,42 +0,0 @@
-from collections import namedtuple
-
-from nose import plugins
-
-# A class that defines the attributes that have to be defined for configuring
-# a dtest run. namedtuple does what we want -- it's immutable and requires
-# all the attributes to be passed in to be instantiated.
-GlobalConfigObject = namedtuple('GlobalConfigObject', [
-    'vnodes',  # disable or enable vnodes
-])
-
-_CONFIG = None
-
-
-class DtestConfigPlugin(plugins.Plugin):
-    """
-    Pass in configuration options for the dtests.
-    """
-    enabled = True  # if this plugin is loaded at all, we're using it
-    name = 'dtest_config'
-
-    def __init__(self, config=None):
-        """
-        Instantiate this plugin with a GlobalConfigObject or, by default, None.
-        Then, set  the global _CONFIG constant with the value of the plugin.
-
-        This is a little weird, yes, but nose seems to generally be built
-        around the idea that a given plugin will be instantiated only once, so
-        this provides a way for test framework code to grab the value off this
-        module. We want that, since the plugin itself isn't available to test
-        code.
-
-        @param config an object meeting the GlobalConfigObject spec that will
-                      be used as configuration for a dtest run.
-        """
-        self.CONFIG = config
-
-        global _CONFIG
-        _CONFIG = self.CONFIG
-
-    def configure(self, options, conf):
-        pass

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtesttag.py
----------------------------------------------------------------------
diff --git a/plugins/dtesttag.py b/plugins/dtesttag.py
deleted file mode 100644
index e1ebb74..0000000
--- a/plugins/dtesttag.py
+++ /dev/null
@@ -1,45 +0,0 @@
-from nose import plugins
-import os
-import inspect
-
-
-class DTestTag(plugins.Plugin):
-    enabled = True  # if this plugin is loaded at all, we're using it
-    name = 'dtest_tag'
-
-    def __init__(self):
-        pass
-
-    def configure(self, options, conf):
-        pass
-
-    def nice_classname(self, obj):
-        """Returns a nice name for class object or class instance.
-
-            >>> nice_classname(Exception()) # doctest: +ELLIPSIS
-            '...Exception'
-            >>> nice_classname(Exception) # doctest: +ELLIPSIS
-            '...Exception'
-
-        """
-        if inspect.isclass(obj):
-            cls_name = obj.__name__
-        else:
-            cls_name = obj.__class__.__name__
-        mod = inspect.getmodule(obj)
-        if mod:
-            name = mod.__name__
-            # jython
-            if name.startswith('org.python.core.'):
-                name = name[len('org.python.core.'):]
-            return "%s.%s" % (name, cls_name)
-        else:
-            return cls_name
-
-    def describeTest(self, test):
-        tag = os.getenv('TEST_TAG', '')
-        if tag == '':
-            tag = test.test._testMethodName
-        else:
-            tag = test.test._testMethodName + "-" + tag
-        return "%s (%s)" % (tag, self.nice_classname(test.test))

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestxunit.py
----------------------------------------------------------------------
diff --git a/plugins/dtestxunit.py b/plugins/dtestxunit.py
deleted file mode 100644
index c996d53..0000000
--- a/plugins/dtestxunit.py
+++ /dev/null
@@ -1,348 +0,0 @@
-"""This plugin provides test results in the standard XUnit XML format.
-
-It's designed for the `Jenkins`_ (previously Hudson) continuous build
-system, but will probably work for anything else that understands an
-XUnit-formatted XML representation of test results.
-
-Add this shell command to your builder ::
-
-    nosetests --with-dtestxunit
-
-And by default a file named nosetests.xml will be written to the
-working directory.
-
-In a Jenkins builder, tick the box named "Publish JUnit test result report"
-under the Post-build Actions and enter this value for Test report XMLs::
-
-    **/nosetests.xml
-
-If you need to change the name or location of the file, you can set the
-``--dtestxunit-file`` option.
-
-If you need to change the name of the test suite, you can set the
-``--dtestxunit-testsuite-name`` option.
-
-Here is an abbreviated version of what an XML test report might look like::
-
-    <?xml version="1.0" encoding="UTF-8"?>
-    <testsuite name="nosetests" tests="1" errors="1" failures="0" skip="0">
-        <testcase classname="path_to_test_suite.TestSomething"
-                  name="test_it" time="0">
-            <error type="exceptions.TypeError" message="oops, wrong type">
-            Traceback (most recent call last):
-            ...
-            TypeError: oops, wrong type
-            </error>
-        </testcase>
-    </testsuite>
-
-.. _Jenkins: http://jenkins-ci.org/
-
-"""
-import codecs
-import os
-import sys
-import re
-import inspect
-from StringIO import StringIO
-from time import time
-from xml.sax import saxutils
-
-from nose.plugins.base import Plugin
-from nose.exc import SkipTest
-from nose.pyversion import force_unicode, format_exception
-
-# Invalid XML characters, control characters 0-31 sans \t, \n and \r
-CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
-
-TEST_ID = re.compile(r'^(.*?)(\(.*\))$')
-
-
-def xml_safe(value):
-    """Replaces invalid XML characters with '?'."""
-    return CONTROL_CHARACTERS.sub('?', value)
-
-
-def escape_cdata(cdata):
-    """Escape a string for an XML CDATA section."""
-    return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
-
-
-def id_split(idval):
-    m = TEST_ID.match(idval)
-    retval = []
-    if m:
-        name, fargs = m.groups()
-        head, tail = name.rsplit(".", 1)
-        retval = [head, tail + fargs]
-    else:
-        retval = idval.rsplit(".", 1)
-    tag = os.getenv('TEST_TAG', '')
-    if tag != '':
-        retval[-1] = retval[-1] + "-" + tag
-    return retval
-
-
-def nice_classname(obj):
-    """Returns a nice name for class object or class instance.
-
-        >>> nice_classname(Exception()) # doctest: +ELLIPSIS
-        '...Exception'
-        >>> nice_classname(Exception) # doctest: +ELLIPSIS
-        '...Exception'
-
-    """
-    if inspect.isclass(obj):
-        cls_name = obj.__name__
-    else:
-        cls_name = obj.__class__.__name__
-    mod = inspect.getmodule(obj)
-    if mod:
-        name = mod.__name__
-        # jython
-        if name.startswith('org.python.core.'):
-            name = name[len('org.python.core.'):]
-        return "%s.%s" % (name, cls_name)
-    else:
-        return cls_name
-
-
-def exc_message(exc_info):
-    """Return the exception's message."""
-    exc = exc_info[1]
-    if exc is None:
-        # str exception
-        result = exc_info[0]
-    else:
-        try:
-            result = str(exc)
-        except UnicodeEncodeError:
-            try:
-                result = unicode(exc)
-            except UnicodeError:
-                # Fallback to args as neither str nor
-                # unicode(Exception(u'\xe6')) work in Python < 2.6
-                result = exc.args[0]
-    result = force_unicode(result, 'UTF-8')
-    return xml_safe(result)
-
-
-class Tee(object):
-    def __init__(self, encoding, *args):
-        self._encoding = encoding
-        self._streams = args
-
-    def write(self, data):
-        data = force_unicode(data, self._encoding)
-        for s in self._streams:
-            s.write(data)
-
-    def writelines(self, lines):
-        for line in lines:
-            self.write(line)
-
-    def flush(self):
-        for s in self._streams:
-            s.flush()
-
-    def isatty(self):
-        return False
-
-
-class DTestXunit(Plugin):
-    """This plugin provides test results in the standard XUnit XML format."""
-    name = 'dtestxunit'
-    score = 1500
-    encoding = 'UTF-8'
-    error_report_file = None
-
-    def __init__(self):
-        super(DTestXunit, self).__init__()
-        self._capture_stack = []
-        self._currentStdout = None
-        self._currentStderr = None
-
-    def _timeTaken(self):
-        if hasattr(self, '_timer'):
-            taken = time() - self._timer
-        else:
-            # test died before it ran (probably error in setup())
-            # or success/failure added before test started probably
-            # due to custom TestResult munging
-            taken = 0.0
-        return taken
-
-    def _quoteattr(self, attr):
-        """Escape an XML attribute. Value can be unicode."""
-        attr = xml_safe(attr)
-        return saxutils.quoteattr(attr)
-
-    def options(self, parser, env):
-        """Sets additional command line options."""
-        Plugin.options(self, parser, env)
-        parser.add_option(
-            '--dtestxunit-file', action='store',
-            dest='dtestxunit_file', metavar="FILE",
-            default=env.get('NOSE_XUNIT_FILE', 'nosetests.xml'),
-            help=("Path to xml file to store the xunit report in. "
-                  "Default is nosetests.xml in the working directory "
-                  "[NOSE_XUNIT_FILE]"))
-
-        parser.add_option(
-            '--dtestxunit-testsuite-name', action='store',
-            dest='dtestxunit_testsuite_name', metavar="PACKAGE",
-            default=env.get('NOSE_XUNIT_TESTSUITE_NAME', 'nosetests'),
-            help=("Name of the testsuite in the xunit xml, generated by plugin. "
-                  "Default test suite name is nosetests."))
-
-    def configure(self, options, config):
-        """Configures the xunit plugin."""
-        Plugin.configure(self, options, config)
-        self.config = config
-        if self.enabled:
-            self.stats = {'errors': 0,
-                          'failures': 0,
-                          'passes': 0,
-                          'skipped': 0
-                          }
-            self.errorlist = []
-            self.error_report_file_name = os.path.realpath(options.dtestxunit_file)
-            self.xunit_testsuite_name = options.dtestxunit_testsuite_name
-
-    def report(self, stream):
-        """Writes an Xunit-formatted XML file
-
-        The file includes a report of test errors and failures.
-
-        """
-        self.error_report_file = codecs.open(self.error_report_file_name, 'w',
-                                             self.encoding, 'replace')
-        self.stats['encoding'] = self.encoding
-        self.stats['testsuite_name'] = self.xunit_testsuite_name
-        self.stats['total'] = (self.stats['errors'] + self.stats['failures'] +
-                               self.stats['passes'] + self.stats['skipped'])
-        self.error_report_file.write(
-            u'<?xml version="1.0" encoding="%(encoding)s"?>'
-            u'<testsuite name="%(testsuite_name)s" tests="%(total)d" '
-            u'errors="%(errors)d" failures="%(failures)d" '
-            u'skip="%(skipped)d">' % self.stats)
-        self.error_report_file.write(u''.join([force_unicode(e, self.encoding)
-                                               for e in self.errorlist]))
-        self.error_report_file.write(u'</testsuite>')
-        self.error_report_file.close()
-        if self.config.verbosity > 1:
-            stream.writeln("-" * 70)
-            stream.writeln("XML: %s" % self.error_report_file.name)
-
-    def _startCapture(self):
-        self._capture_stack.append((sys.stdout, sys.stderr))
-        self._currentStdout = StringIO()
-        self._currentStderr = StringIO()
-        sys.stdout = Tee(self.encoding, self._currentStdout, sys.stdout)
-        sys.stderr = Tee(self.encoding, self._currentStderr, sys.stderr)
-
-    def startContext(self, context):
-        self._startCapture()
-
-    def stopContext(self, context):
-        self._endCapture()
-
-    def beforeTest(self, test):
-        """Initializes a timer before starting a test."""
-        self._timer = time()
-        self._startCapture()
-
-    def _endCapture(self):
-        if self._capture_stack:
-            sys.stdout, sys.stderr = self._capture_stack.pop()
-
-    def afterTest(self, test):
-        self._endCapture()
-        self._currentStdout = None
-        self._currentStderr = None
-
-    def finalize(self, test):
-        while self._capture_stack:
-            self._endCapture()
-
-    def _getCapturedStdout(self):
-        if self._currentStdout:
-            value = self._currentStdout.getvalue()
-            if value:
-                return '<system-out><![CDATA[%s]]></system-out>' % escape_cdata(value)
-        return ''
-
-    def _getCapturedStderr(self):
-        if self._currentStderr:
-            value = self._currentStderr.getvalue()
-            if value:
-                return '<system-err><![CDATA[%s]]></system-err>' % escape_cdata(value)
-        return ''
-
-    def addError(self, test, err, capt=None):
-        """Add error output to Xunit report.
-        """
-        taken = self._timeTaken()
-
-        if issubclass(err[0], SkipTest):
-            type = 'skipped'
-            self.stats['skipped'] += 1
-        else:
-            type = 'error'
-            self.stats['errors'] += 1
-
-        tb = format_exception(err, self.encoding)
-        id = test.id()
-
-        self.errorlist.append(
-            u'<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
-            u'<%(type)s type=%(errtype)s message=%(message)s><![CDATA[%(tb)s]]>'
-            u'</%(type)s>%(systemout)s%(systemerr)s</testcase>' %
-            {'cls': self._quoteattr(id_split(id)[0]),
-             'name': self._quoteattr(id_split(id)[-1]),
-             'taken': taken,
-             'type': type,
-             'errtype': self._quoteattr(nice_classname(err[0])),
-             'message': self._quoteattr(exc_message(err)),
-             'tb': escape_cdata(tb),
-             'systemout': self._getCapturedStdout(),
-             'systemerr': self._getCapturedStderr(),
-             })
-
-    def addFailure(self, test, err, capt=None, tb_info=None):
-        """Add failure output to Xunit report.
-        """
-        taken = self._timeTaken()
-        tb = format_exception(err, self.encoding)
-        self.stats['failures'] += 1
-        id = test.id()
-
-        self.errorlist.append(
-            u'<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
-            u'<failure type=%(errtype)s message=%(message)s><![CDATA[%(tb)s]]>'
-            u'</failure>%(systemout)s%(systemerr)s</testcase>' %
-            {'cls': self._quoteattr(id_split(id)[0]),
-             'name': self._quoteattr(id_split(id)[-1]),
-             'taken': taken,
-             'errtype': self._quoteattr(nice_classname(err[0])),
-             'message': self._quoteattr(exc_message(err)),
-             'tb': escape_cdata(tb),
-             'systemout': self._getCapturedStdout(),
-             'systemerr': self._getCapturedStderr(),
-             })
-
-    def addSuccess(self, test, capt=None):
-        """Add success output to Xunit report.
-        """
-        taken = self._timeTaken()
-        self.stats['passes'] += 1
-        id = test.id()
-        self.errorlist.append(
-            '<testcase classname=%(cls)s name=%(name)s '
-            'time="%(taken).3f">%(systemout)s%(systemerr)s</testcase>' %
-            {'cls': self._quoteattr(id_split(id)[0]),
-             'name': self._quoteattr(id_split(id)[-1]),
-             'taken': taken,
-             'systemout': self._getCapturedStdout(),
-             'systemerr': self._getCapturedStderr(),
-             })

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/prepared_statements_test.py
----------------------------------------------------------------------
diff --git a/prepared_statements_test.py b/prepared_statements_test.py
index 35ca5aa..72ac9dd 100644
--- a/prepared_statements_test.py
+++ b/prepared_statements_test.py
@@ -1,7 +1,11 @@
+import logging
+
 from cassandra import InvalidRequest
 
 from dtest import Tester
 
+logger = logging.getLogger(__name__)
+
 KEYSPACE = "foo"
 
 
@@ -10,13 +14,12 @@ class TestPreparedStatements(Tester):
     Tests for pushed native protocol notification from Cassandra.
     """
 
-    def dropped_index_test(self):
+    def test_dropped_index(self):
         """
         Prepared statements using dropped indexes should be handled correctly
         """
-
         self.cluster.populate(1).start()
-        node = self.cluster.nodes.values()[0]
+        node = list(self.cluster.nodes.values())[0]
 
         session = self.patient_cql_connection(node)
         session.execute("""
@@ -33,14 +36,14 @@ class TestPreparedStatements(Tester):
             session.execute(insert_statement, (i, 0))
 
         query_statement = session.prepare("SELECT * FROM mytable WHERE b=?")
-        print "Number of matching rows:", len(list(session.execute(query_statement, (0,))))
+        print("Number of matching rows:", len(list(session.execute(query_statement, (0,)))))
 
         session.execute("DROP INDEX bindex")
 
         try:
-            print "Executing prepared statement with dropped index..."
+            print("Executing prepared statement with dropped index...")
             session.execute(query_statement, (0,))
         except InvalidRequest as ir:
-            print ir
+            print(ir)
         except Exception:
             raise

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pushed_notifications_test.py
----------------------------------------------------------------------
diff --git a/pushed_notifications_test.py b/pushed_notifications_test.py
index a8dcf81..9b888de 100644
--- a/pushed_notifications_test.py
+++ b/pushed_notifications_test.py
@@ -1,4 +1,7 @@
 import time
+import pytest
+import logging
+
 from datetime import datetime
 from distutils.version import LooseVersion
 from threading import Event
@@ -7,10 +10,11 @@ from cassandra import ConsistencyLevel as CL
 from cassandra import ReadFailure
 from cassandra.query import SimpleStatement
 from ccmlib.node import Node, TimeoutError
-from nose.tools import timed
 
-from dtest import Tester, debug, get_ip_from_node, create_ks
-from tools.decorators import no_vnodes, since
+from dtest import Tester, get_ip_from_node, create_ks
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class NotificationWaiter(object):
@@ -48,7 +52,7 @@ class NotificationWaiter(object):
         """
         Called when a notification is pushed from Cassandra.
         """
-        debug("Got {} from {} at {}".format(notification, self.address, datetime.now()))
+        logger.debug("Got {} from {} at {}".format(notification, self.address, datetime.now()))
 
         if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']:
             return  # we are not interested in this schema change
@@ -73,7 +77,7 @@ class NotificationWaiter(object):
         return self.notifications
 
     def clear_notifications(self):
-        debug("Clearing notifications...")
+        logger.debug("Clearing notifications...")
         self.notifications = []
         self.event.clear()
 
@@ -83,8 +87,8 @@ class TestPushedNotifications(Tester):
     Tests for pushed native protocol notification from Cassandra.
     """
 
-    @no_vnodes()
-    def move_single_node_test(self):
+    @pytest.mark.no_vnodes
+    def test_move_single_node(self):
         """
         @jira_ticket CASSANDRA-8516
         Moving a token should result in MOVED_NODE notifications.
@@ -92,30 +96,30 @@ class TestPushedNotifications(Tester):
         self.cluster.populate(3).start(wait_for_binary_proto=True, wait_other_notice=True)
 
         waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
-                   for node in self.cluster.nodes.values()]
+                   for node in list(self.cluster.nodes.values())]
 
         # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
         # late due to network delays let's block a bit longer
-        debug("Waiting for unwanted notifications....")
+        logger.debug("Waiting for unwanted notifications....")
         waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
         waiters[0].clear_notifications()
 
-        debug("Issuing move command....")
-        node1 = self.cluster.nodes.values()[0]
+        logger.debug("Issuing move command....")
+        node1 = list(self.cluster.nodes.values())[0]
         node1.move("123")
 
         for waiter in waiters:
-            debug("Waiting for notification from {}".format(waiter.address,))
+            logger.debug("Waiting for notification from {}".format(waiter.address,))
             notifications = waiter.wait_for_notifications(60.0)
-            self.assertEquals(1, len(notifications), notifications)
+            assert 1 == len(notifications), notifications
             notification = notifications[0]
             change_type = notification["change_type"]
             address, port = notification["address"]
-            self.assertEquals("MOVED_NODE", change_type)
-            self.assertEquals(get_ip_from_node(node1), address)
+            assert "MOVED_NODE" == change_type
+            assert get_ip_from_node(node1) == address
 
-    @no_vnodes()
-    def move_single_node_localhost_test(self):
+    @pytest.mark.no_vnodes
+    def test_move_single_node_localhost(self):
         """
         @jira_ticket  CASSANDRA-10052
         Test that we don't get NODE_MOVED notifications from nodes other than the local one,
@@ -132,29 +136,28 @@ class TestPushedNotifications(Tester):
         cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
 
         waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
-                   for node in self.cluster.nodes.values()]
+                   for node in list(self.cluster.nodes.values())]
 
         # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
         # late due to network delays let's block a bit longer
-        debug("Waiting for unwanted notifications...")
+        logger.debug("Waiting for unwanted notifications...")
         waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
         waiters[0].clear_notifications()
 
-        debug("Issuing move command....")
-        node1 = self.cluster.nodes.values()[0]
+        logger.debug("Issuing move command....")
+        node1 = list(self.cluster.nodes.values())[0]
         node1.move("123")
 
         for waiter in waiters:
-            debug("Waiting for notification from {}".format(waiter.address,))
+            logger.debug("Waiting for notification from {}".format(waiter.address,))
             notifications = waiter.wait_for_notifications(30.0)
-            self.assertEquals(1 if waiter.node is node1 else 0, len(notifications), notifications)
+            assert 1 if waiter.node is node1 else 0 == len(notifications), notifications
 
-    def restart_node_test(self):
+    def test_restart_node(self):
         """
         @jira_ticket CASSANDRA-7816
         Restarting a node should generate exactly one DOWN and one UP notification
         """
-
         self.cluster.populate(2).start(wait_for_binary_proto=True, wait_other_notice=True)
         node1, node2 = self.cluster.nodelist()
 
@@ -162,7 +165,7 @@ class TestPushedNotifications(Tester):
 
         # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
         # don't confuse the state below.
-        debug("Waiting for unwanted notifications...")
+        logger.debug("Waiting for unwanted notifications...")
         waiter.wait_for_notifications(timeout=30, num_notifications=2)
         waiter.clear_notifications()
 
@@ -171,25 +174,25 @@ class TestPushedNotifications(Tester):
         version = self.cluster.cassandra_version()
         expected_notifications = 2 if version >= '2.2' else 3
         for i in range(5):
-            debug("Restarting second node...")
+            logger.debug("Restarting second node...")
             node2.stop(wait_other_notice=True)
             node2.start(wait_other_notice=True)
-            debug("Waiting for notifications from {}".format(waiter.address))
+            logger.debug("Waiting for notifications from {}".format(waiter.address))
             notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications)
-            self.assertEquals(expected_notifications, len(notifications), notifications)
+            assert expected_notifications, len(notifications) == notifications
             for notification in notifications:
-                self.assertEquals(get_ip_from_node(node2), notification["address"][0])
-            self.assertEquals("DOWN", notifications[0]["change_type"])
+                assert get_ip_from_node(node2) == notification["address"][0]
+            assert "DOWN" == notifications[0]["change_type"]
             if version >= '2.2':
-                self.assertEquals("UP", notifications[1]["change_type"])
+                assert "UP" == notifications[1]["change_type"]
             else:
                 # pre 2.2, we'll receive both a NEW_NODE and an UP notification,
                 # but the order is not guaranteed
-                self.assertEquals({"NEW_NODE", "UP"}, set(map(lambda n: n["change_type"], notifications[1:])))
+                assert {"NEW_NODE", "UP"} == set([n["change_type"] for n in notifications[1:]])
 
             waiter.clear_notifications()
 
-    def restart_node_localhost_test(self):
+    def test_restart_node_localhost(self):
         """
         Test that we don't get client notifications when rpc_address is set to localhost.
         @jira_ticket  CASSANDRA-10052
@@ -209,17 +212,17 @@ class TestPushedNotifications(Tester):
         waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
 
         # restart node 2
-        debug("Restarting second node...")
+        logger.debug("Restarting second node...")
         node2.stop(wait_other_notice=True)
         node2.start(wait_other_notice=True)
 
         # check that node1 did not send UP or DOWN notification for node2
-        debug("Waiting for notifications from {}".format(waiter.address,))
+        logger.debug("Waiting for notifications from {}".format(waiter.address,))
         notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2)
-        self.assertEquals(0, len(notifications), notifications)
+        assert 0 == len(notifications), notifications
 
     @since("2.2")
-    def add_and_remove_node_test(self):
+    def test_add_and_remove_node(self):
         """
         Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave.
         @jira_ticket CASSANDRA-11038
@@ -231,7 +234,7 @@ class TestPushedNotifications(Tester):
 
         # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
         # don't confuse the state below
-        debug("Waiting for unwanted notifications...")
+        logger.debug("Waiting for unwanted notifications...")
         waiter.wait_for_notifications(timeout=30, num_notifications=2)
         waiter.clear_notifications()
 
@@ -240,29 +243,29 @@ class TestPushedNotifications(Tester):
         session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
         session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
 
-        debug("Adding second node...")
+        logger.debug("Adding second node...")
         node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042))
         self.cluster.add(node2, False)
         node2.start(wait_other_notice=True)
-        debug("Waiting for notifications from {}".format(waiter.address))
+        logger.debug("Waiting for notifications from {}".format(waiter.address))
         notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
-        self.assertEquals(2, len(notifications), notifications)
+        assert 2 == len(notifications), notifications
         for notification in notifications:
-            self.assertEquals(get_ip_from_node(node2), notification["address"][0])
-            self.assertEquals("NEW_NODE", notifications[0]["change_type"])
-            self.assertEquals("UP", notifications[1]["change_type"])
+            assert get_ip_from_node(node2) == notification["address"][0]
+            assert "NEW_NODE" == notifications[0]["change_type"]
+            assert "UP" == notifications[1]["change_type"]
 
-        debug("Removing second node...")
+        logger.debug("Removing second node...")
         waiter.clear_notifications()
         node2.decommission()
         node2.stop(gently=False)
-        debug("Waiting for notifications from {}".format(waiter.address))
+        logger.debug("Waiting for notifications from {}".format(waiter.address))
         notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
-        self.assertEquals(2, len(notifications), notifications)
+        assert 2 == len(notifications), notifications
         for notification in notifications:
-            self.assertEquals(get_ip_from_node(node2), notification["address"][0])
-            self.assertEquals("REMOVED_NODE", notifications[0]["change_type"])
-            self.assertEquals("DOWN", notifications[1]["change_type"])
+            assert get_ip_from_node(node2) == notification["address"][0]
+            assert "REMOVED_NODE" == notifications[0]["change_type"]
+            assert "DOWN" == notifications[1]["change_type"]
 
     def change_rpc_address_to_localhost(self):
         """
@@ -272,23 +275,22 @@ class TestPushedNotifications(Tester):
 
         i = 0
         for node in cluster.nodelist():
-            debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml')
+            logger.debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml')
             if cluster.version() < '4':
                 node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i)
             node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i)
             node.import_config_files()  # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address
             node.set_configuration_options(values={'rpc_address': 'localhost'})
-            debug(node.show())
+            logger.debug(node.show())
             i += 2
 
     @since("3.0")
-    def schema_changes_test(self):
+    def test_schema_changes(self):
         """
         @jira_ticket CASSANDRA-10328
         Creating, updating and dropping a keyspace, a table and a materialized view
         will generate the correct schema change notifications.
         """
-
         self.cluster.populate(2).start(wait_for_binary_proto=True)
         node1, node2 = self.cluster.nodelist()
 
@@ -306,17 +308,34 @@ class TestPushedNotifications(Tester):
         session.execute("drop TABLE t")
         session.execute("drop KEYSPACE ks")
 
-        debug("Waiting for notifications from {}".format(waiter.address,))
+        logger.debug("Waiting for notifications from {}".format(waiter.address,))
         notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8)
-        self.assertEquals(8, len(notifications), notifications)
-        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'KEYSPACE'}, notifications[0])
-        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u't'}, notifications[1])
-        self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u't'}, notifications[2])
-        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[3])
-        self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[4])
-        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[5])
-        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u't'}, notifications[6])
-        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'KEYSPACE'}, notifications[7])
+        assert 8 == len(notifications), notifications
+        # assert dict contains subset
+        expected = {'change_type': 'CREATED', 'target_type': 'KEYSPACE'}
+        assert set(notifications[0].keys()) >= expected.keys() and {k: notifications[0][k] for k in expected if
+                                                                    k in notifications[0]} == expected
+        expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 't'}
+        assert set(notifications[1].keys()) >= expected.keys() and {k: notifications[1][k] for k in expected if
+                                                                    k in notifications[1]} == expected
+        expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 't'}
+        assert set(notifications[2].keys()) >= expected.keys() and {k: notifications[2][k] for k in expected if
+                                                                    k in notifications[2]} == expected
+        expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 'mv'}
+        assert set(notifications[3].keys()) >= expected.keys() and {k: notifications[3][k] for k in expected if
+                                                                    k in notifications[3]} == expected
+        expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 'mv'}
+        assert set(notifications[4].keys()) >= expected.keys() and {k: notifications[4][k] for k in expected if
+                                                                    k in notifications[4]} == expected
+        expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 'mv'}
+        assert set(notifications[5].keys()) >= expected.keys() and {k: notifications[5][k] for k in expected if
+                                                                    k in notifications[5]} == expected
+        expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 't'}
+        assert set(notifications[6].keys()) >= expected.keys() and {k: notifications[6][k] for k in expected if
+                                                                    k in notifications[6]} == expected
+        expected = {'change_type': 'DROPPED', 'target_type': 'KEYSPACE'}
+        assert set(notifications[7].keys()) >= expected.keys() and {k: notifications[7][k] for k in expected if
+                                                                    k in notifications[7]} == expected
 
 
 class TestVariousNotifications(Tester):
@@ -325,17 +344,16 @@ class TestVariousNotifications(Tester):
     """
 
     @since('2.2')
-    def tombstone_failure_threshold_message_test(self):
+    def test_tombstone_failure_threshold_message(self):
         """
         Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather
         than dropping the request. A drop makes the coordinator waits for the specified
         read_request_timeout_in_ms.
         @jira_ticket CASSANDRA-7886
         """
-
         have_v5_protocol = self.cluster.version() >= LooseVersion('3.10')
 
-        self.allow_log_errors = True
+        self.fixture_dtest_setup.allow_log_errors = True
         self.cluster.set_configuration_options(
             values={
                 'tombstone_failure_threshold': 500,
@@ -356,7 +374,7 @@ class TestVariousNotifications(Tester):
         )
 
         # Add data with tombstones
-        values = map(lambda i: str(i), range(1000))
+        values = [str(i) for i in range(1000)]
         for value in values:
             session.execute(SimpleStatement(
                 "insert into test (id, mytext, col1) values (1, '{}', null) ".format(
@@ -367,15 +385,15 @@ class TestVariousNotifications(Tester):
 
         failure_msg = ("Scanned over.* tombstones.* query aborted")
 
-        @timed(25)
+        @pytest.mark.timeout(25)
         def read_failure_query():
             try:
                 session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL))
             except ReadFailure as exc:
                 if have_v5_protocol:
                     # at least one replica should have responded with a tombstone error
-                    self.assertIsNotNone(exc.error_code_map)
-                    self.assertEqual(0x0001, exc.error_code_map.values()[0])
+                    assert exc.error_code_map is not None
+                    assert 0x0001 == list(exc.error_code_map.values())[0]
             except Exception:
                 raise
             else:
@@ -393,22 +411,21 @@ class TestVariousNotifications(Tester):
                        node2.grep_log(failure_msg) or
                        node3.grep_log(failure_msg))
 
-            self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
-                                      "after failed query"))
+            assert failure == "Cannot find tombstone failure threshold error in log after failed query"
 
         mark1 = node1.mark_log()
         mark2 = node2.mark_log()
         mark3 = node3.mark_log()
 
-        @timed(35)
+        @pytest.mark.timeout(35)
         def range_request_failure_query():
             try:
                 session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL))
             except ReadFailure as exc:
                 if have_v5_protocol:
                     # at least one replica should have responded with a tombstone error
-                    self.assertIsNotNone(exc.error_code_map)
-                    self.assertEqual(0x0001, exc.error_code_map.values()[0])
+                    assert exc.error_code_map is not None
+                    assert 0x0001 == list(exc.error_code_map.values())[0]
             except Exception:
                 raise
             else:
@@ -426,5 +443,4 @@ class TestVariousNotifications(Tester):
                        node2.grep_log(failure_msg, from_mark=mark2) or
                        node3.grep_log(failure_msg, from_mark=mark3))
 
-            self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
-                                      "after range_request_timeout_query"))
+            assert failure == "Cannot find tombstone failure threshold error in log after range_request_timeout_query"

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/putget_test.py
----------------------------------------------------------------------
diff --git a/putget_test.py b/putget_test.py
index f200864..1933f8e 100644
--- a/putget_test.py
+++ b/putget_test.py
@@ -1,28 +1,39 @@
+import pytest
 import time
+import logging
 
 from cassandra import ConsistencyLevel
 from thrift.protocol import TBinaryProtocol
 from thrift.transport import TSocket, TTransport
 
+from dtest_setup_overrides import DTestSetupOverrides
+
 from dtest import Tester, create_ks, create_cf
 from tools.data import (create_c1c2_table, insert_c1c2, insert_columns, putget,
                         query_c1c2, query_columns, range_putget)
-from tools.decorators import no_vnodes, since
 from tools.misc import ImmutableMapping, retry_till_success
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 class TestPutGet(Tester):
-    cluster_options = ImmutableMapping({'start_rpc': 'true'})
 
-    def putget_test(self):
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_dtest_setup_overrides(self):
+        dtest_setup_overrides = DTestSetupOverrides()
+        dtest_setup_overrides.cluster_options = ImmutableMapping({'start_rpc': 'true'})
+        return dtest_setup_overrides
+
+    def test_putget(self):
         """ Simple put/get on a single row, hitting multiple sstables """
         self._putget()
 
-    def putget_snappy_test(self):
+    def test_putget_snappy(self):
         """ Simple put/get on a single row, but hitting multiple sstables (with snappy compression) """
         self._putget(compression="Snappy")
 
-    def putget_deflate_test(self):
+    def test_putget_deflate(self):
         """ Simple put/get on a single row, but hitting multiple sstables (with deflate compression) """
         self._putget(compression="Deflate")
 
@@ -40,7 +51,7 @@ class TestPutGet(Tester):
 
         putget(cluster, session)
 
-    def non_local_read_test(self):
+    def test_non_local_read(self):
         """ This test reads from a coordinator we know has no copy of the data """
         cluster = self.cluster
 
@@ -53,12 +64,11 @@ class TestPutGet(Tester):
 
         # insert and get at CL.QUORUM (since RF=2, node1 won't have all key locally)
         insert_c1c2(session, n=1000, consistency=ConsistencyLevel.QUORUM)
-        for n in xrange(0, 1000):
+        for n in range(0, 1000):
             query_c1c2(session, n, ConsistencyLevel.QUORUM)
 
-    def rangeputget_test(self):
+    def test_rangeputget(self):
         """ Simple put/get on ranges of rows, hitting multiple sstables """
-
         cluster = self.cluster
 
         cluster.populate(3).start()
@@ -70,7 +80,7 @@ class TestPutGet(Tester):
 
         range_putget(cluster, session)
 
-    def wide_row_test(self):
+    def test_wide_row(self):
         """ Test wide row slices """
         cluster = self.cluster
 
@@ -83,16 +93,16 @@ class TestPutGet(Tester):
 
         key = 'wide'
 
-        for x in xrange(1, 5001):
+        for x in range(1, 5001):
             insert_columns(self, session, key, 100, offset=x - 1)
 
         for size in (10, 100, 1000):
-            for x in xrange(1, (50001 - size) / size):
+            for x in range(1, (50001 - size) // size):
                 query_columns(self, session, key, size, offset=x * size - 1)
 
-    @no_vnodes()
+    @pytest.mark.no_vnodes
     @since('2.0', max_version='4')
-    def wide_slice_test(self):
+    def test_wide_slice(self):
         """
         Check slicing a wide row.
         See https://issues.apache.org/jira/browse/CASSANDRA-4919
@@ -140,9 +150,9 @@ class TestPutGet(Tester):
         session.execute(query)
         time.sleep(.5)
 
-        for i in xrange(10):
+        for i in range(10):
             key_num = str(i).zfill(2)
-            for j in xrange(10):
+            for j in range(10):
                 stmt = "INSERT INTO test (k, column1, value) VALUES ('a%s', 'col%s', '%s')" % (key_num, j, j)
                 session.execute(stmt)
                 stmt = "INSERT INTO test (k, column1, value) VALUES ('b%s', 'col%s', '%s')" % (key_num, j, j)
@@ -172,7 +182,7 @@ class TestPutGet(Tester):
             # print row.key
             # print cols
 
-        self.assertEqual(len(columns), 95, "Regression in cassandra-4919. Expected 95 columns, got {}.".format(len(columns)))
+        assert len(columns) == 95, "Regression in cassandra-4919. Expected 95 columns == got {}.".format(len(columns))
 
 
 class ThriftConnection(object):
@@ -248,7 +258,7 @@ class ThriftConnection(object):
 
     def wait_for_agreement(self):
         schemas = self.client.describe_schema_versions()
-        if len([ss for ss in schemas.keys() if ss != 'UNREACHABLE']) > 1:
+        if len([ss for ss in list(schemas.keys()) if ss != 'UNREACHABLE']) > 1:
             raise Exception("schema agreement not reached")
 
     def _translate_cl(self, cl):
@@ -258,7 +268,7 @@ class ThriftConnection(object):
         """ Insert some basic values """
         cf_parent = self.Cassandra.ColumnParent(column_family=self.cf_name)
 
-        for row_key in ('row_%d' % i for i in xrange(num_rows)):
+        for row_key in ('row_%d' % i for i in range(num_rows)):
             col = self.Cassandra.Column(name='col_0', value='val_0',
                                         timestamp=int(time.time() * 1000))
             retry_till_success(self.client.insert,
@@ -269,7 +279,7 @@ class ThriftConnection(object):
 
     def query_columns(self, num_rows=10, consistency_level='QUORUM'):
         """ Check that the values inserted in insert_columns() are present """
-        for row_key in ('row_%d' % i for i in xrange(num_rows)):
+        for row_key in ('row_%d' % i for i in range(num_rows)):
             cpath = self.Cassandra.ColumnPath(column_family=self.cf_name,
                                               column='col_0')
             cosc = retry_till_success(self.client.get, key=row_key, column_path=cpath,
@@ -277,5 +287,5 @@ class ThriftConnection(object):
                                       timeout=30)
             col = cosc.column
             value = col.value
-            self.assertEqual(value, 'val_0')
+            assert value == 'val_0'
         return self

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pytest.ini
----------------------------------------------------------------------
diff --git a/pytest.ini b/pytest.ini
new file mode 100644
index 0000000..1ee5342
--- /dev/null
+++ b/pytest.ini
@@ -0,0 +1,5 @@
+[pytest]
+junit_suite_name = Cassandra dtests
+log_print = True
+log_level = INFO
+log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/range_ghost_test.py
----------------------------------------------------------------------
diff --git a/range_ghost_test.py b/range_ghost_test.py
index 34f599c..6d3d3ae 100644
--- a/range_ghost_test.py
+++ b/range_ghost_test.py
@@ -1,12 +1,15 @@
 import time
+import logging
 
 from tools.assertions import assert_length_equal
 from dtest import Tester, create_ks, create_cf
 
+logger = logging.getLogger(__name__)
+
 
 class TestRangeGhosts(Tester):
 
-    def ghosts_test(self):
+    def test_ghosts(self):
         """ Check range ghost are correctly removed by the system """
         cluster = self.cluster
         cluster.populate(1).start()
@@ -19,7 +22,7 @@ class TestRangeGhosts(Tester):
 
         rows = 1000
 
-        for i in xrange(0, rows):
+        for i in range(0, rows):
             session.execute("UPDATE cf SET c = 'value' WHERE key = 'k%i'" % i)
 
         res = list(session.execute("SELECT * FROM cf LIMIT 10000"))
@@ -27,7 +30,7 @@ class TestRangeGhosts(Tester):
 
         node1.flush()
 
-        for i in xrange(0, rows / 2):
+        for i in range(0, rows // 2):
             session.execute("DELETE FROM cf WHERE key = 'k%i'" % i)
 
         res = list(session.execute("SELECT * FROM cf LIMIT 10000"))

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/read_failures_test.py
----------------------------------------------------------------------
diff --git a/read_failures_test.py b/read_failures_test.py
index ba43505..80ebe5d 100644
--- a/read_failures_test.py
+++ b/read_failures_test.py
@@ -1,9 +1,14 @@
+import logging
+import pytest
+
 from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout
 from cassandra.policies import FallthroughRetryPolicy
 from cassandra.query import SimpleStatement
 
 from dtest import Tester
-from tools.decorators import since
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 KEYSPACE = "readfailures"
 
@@ -13,19 +18,21 @@ class TestReadFailures(Tester):
     Tests for read failures in the replicas, introduced as a part of
     @jira_ticket CASSANDRA-12311.
     """
-    ignore_log_patterns = (
-        "Scanned over [1-9][0-9]* tombstones",  # This is expected when testing read failures due to tombstones
-    )
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            "Scanned over [1-9][0-9]* tombstones",  # This is expected when testing read failures due to tombstones
+        )
+        return fixture_dtest_setup
 
-    def setUp(self):
-        super(TestReadFailures, self).setUp()
+    @pytest.fixture(scope='function', autouse=True)
+    def parse_dtest_config(self, parse_dtest_config):
         self.tombstone_failure_threshold = 500
         self.replication_factor = 3
         self.consistency_level = ConsistencyLevel.ALL
         self.expected_expt = ReadFailure
 
-    def tearDown(self):
-        super(TestReadFailures, self).tearDown()
+        return parse_dtest_config
 
     def _prepare_cluster(self):
         self.cluster.set_configuration_options(
@@ -33,7 +40,7 @@ class TestReadFailures(Tester):
         )
         self.cluster.populate(3)
         self.cluster.start(wait_for_binary_proto=True)
-        self.nodes = self.cluster.nodes.values()
+        self.nodes = list(self.cluster.nodes.values())
 
         session = self.patient_exclusive_cql_connection(self.nodes[0], protocol_version=self.protocol_version)
 
@@ -59,11 +66,11 @@ class TestReadFailures(Tester):
         if self.expected_expt is None:
             session.execute(statement)
         else:
-            with self.assertRaises(self.expected_expt) as cm:
+            with pytest.raises(self.expected_expt) as cm:
                 # On 2.1, we won't return the ReadTimeout from coordinator until actual timeout,
                 # so we need to up the default timeout of the driver session
                 session.execute(statement, timeout=15)
-            return cm.exception
+            return cm._excinfo[1]
 
     def _assert_error_code_map_exists_with_code(self, exception, expected_code):
         """
@@ -71,14 +78,14 @@ class TestReadFailures(Tester):
         where at least one node responded with some expected code.
         This is meant for testing failure exceptions on protocol v5.
         """
-        self.assertIsNotNone(exception)
-        self.assertIsNotNone(exception.error_code_map)
+        assert exception is not None
+        assert exception.error_code_map is not None
         expected_code_found = False
-        for error_code in exception.error_code_map.values():
+        for error_code in list(exception.error_code_map.values()):
             if error_code == expected_code:
                 expected_code_found = True
                 break
-        self.assertTrue(expected_code_found, "The error code map did not contain " + str(expected_code))
+        assert expected_code_found, "The error code map did not contain " + str(expected_code)
 
     @since('2.1')
     def test_tombstone_failure_v3(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message