cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject [21/36] cassandra-dtest git commit: Migrate dtests to use pytest and python3
Date Mon, 29 Jan 2018 21:10:24 GMT
http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/read_repair_test.py
----------------------------------------------------------------------
diff --git a/read_repair_test.py b/read_repair_test.py
index 57fbf40..7e8d405 100644
--- a/read_repair_test.py
+++ b/read_repair_test.py
@@ -1,23 +1,28 @@
 import time
+import pytest
+import logging
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
 
+from dtest import Tester, create_ks
 from tools.assertions import assert_one
-from dtest import PRINT_DEBUG, Tester, debug, create_ks
 from tools.data import rows_to_list
-from tools.decorators import since
+from tools.misc import retry_till_success
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestReadRepair(Tester):
 
-    def setUp(self):
-        Tester.setUp(self)
-        self.cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
-        self.cluster.populate(3).start(wait_for_binary_proto=True)
+    @pytest.fixture(scope='function', autouse=True)
+    def fixture_set_cluster_settings(self, fixture_dtest_setup):
+        fixture_dtest_setup.cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
+        fixture_dtest_setup.cluster.populate(3).start(wait_for_binary_proto=True)
 
     @since('3.0')
-    def alter_rf_and_run_read_repair_test(self):
+    def test_alter_rf_and_run_read_repair(self):
         """
         @jira_ticket CASSANDRA-10655
         @jira_ticket CASSANDRA-10657
@@ -25,90 +30,127 @@ class TestReadRepair(Tester):
         Test that querying only a subset of all the columns in a row doesn't confuse read-repair to avoid
         the problem described in CASSANDRA-10655.
         """
-        self._test_read_repair()
+
+        # session is only used to setup & do schema modification. Actual data queries are done directly on
+        # each node, using an exclusive connection and CL.ONE
+        session = self.patient_cql_connection(self.cluster.nodelist()[0])
+        initial_replica, non_replicas = self.do_initial_setup(session)
+
+        # Execute a query at CL.ALL on one of the nodes which was *not* the initial replica. It should trigger a
+        # read repair and propagate the data to all 3 nodes.
+        # Note: result of the read repair contains only the selected column (a), not all columns
+        logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name)
+        read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
+        assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=ConsistencyLevel.ALL)
+
+        # The read repair should have repaired the replicas, at least partially (see CASSANDRA-10655)
+        # verify by querying each replica in turn.
+        self.check_data_on_each_replica(expect_fully_repaired=False, initial_replica=initial_replica)
+
+        # Now query again at CL.ALL but this time selecting all columns, which should ensure that 'b' also gets repaired
+        query = "SELECT * FROM alter_rf_test.t1 WHERE k=1"
+        logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name)
+        assert_one(read_repair_session, query, [1, 1, 1], cl=ConsistencyLevel.ALL)
+
+        # Check each replica individually again now that we expect the data to be fully repaired
+        self.check_data_on_each_replica(expect_fully_repaired=True, initial_replica=initial_replica)
 
     def test_read_repair_chance(self):
         """
         @jira_ticket CASSANDRA-12368
         """
-        self._test_read_repair(cl_all=False)
-
-    def _test_read_repair(self, cl_all=True):
+        # session is only used to setup & do schema modification. Actual data queries are done directly on
+        # each node, using an exclusive connection and CL.ONE
         session = self.patient_cql_connection(self.cluster.nodelist()[0])
+        initial_replica, non_replicas = self.do_initial_setup(session)
+
+        # To ensure read repairs are triggered, set the table property to 100%
+        logger.debug("Setting table read repair chance to 1")
+        session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""")
+
+        # Execute a query at CL.ONE on one of the nodes which was *not* the initial replica. It should trigger a
+        # read repair because read_repair_chance == 1, and propagate the data to all 3 nodes.
+        # Note: result of the read repair contains only the selected column (a), not all columns, so we won't expect
+        # 'b' to have been fully repaired afterwards.
+        logger.debug("Executing 'SELECT a...' on non-initial replica to trigger read repair " + non_replicas[0].name)
+        read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
+        read_repair_session.execute(SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1",
+                                                    consistency_level=ConsistencyLevel.ONE))
+
+        # Query each replica individually to ensure that read repair was triggered. We should expect that only
+        # the initial replica has data for both the 'a' and 'b' columns. The read repair should only have affected
+        # the selected column, so the other two replicas should only have that data.
+        # Note: we need to temporarily set read_repair_chance to 0 while we perform this check.
+        logger.debug("Setting table read repair chance to 0 while we verify each replica's data")
+        session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""")
+        # The read repair is run in the background, so we spin while checking that the repair has completed
+        retry_till_success(self.check_data_on_each_replica,
+                           expect_fully_repaired=False,
+                           initial_replica=initial_replica,
+                           timeout=30,
+                           bypassed_exception=NotRepairedException)
+
+        # Re-enable global read repair and perform another query on a non-replica. This time the query selects all
+        # columns so we also expect the value for 'b' to be repaired.
+        logger.debug("Setting table read repair chance to 1")
+        session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""")
+        logger.debug("Executing 'SELECT *...' on non-initial replica to trigger read repair " + non_replicas[0].name)
+        read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
+        read_repair_session.execute(SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1",
+                                                    consistency_level=ConsistencyLevel.ONE))
+
+        # Query each replica again to ensure that second read repair was triggered. This time, we expect the
+        # data to be fully repaired (both 'a' and 'b' columns) by virtue of the query being 'SELECT *...'
+        # As before, we turn off read repair before doing this check.
+        logger.debug("Setting table read repair chance to 0 while we verify each replica's data")
+        session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 0;""")
+        retry_till_success(self.check_data_on_each_replica,
+                           expect_fully_repaired=True,
+                           initial_replica=initial_replica,
+                           timeout=30,
+                           bypassed_exception=NotRepairedException)
+
+    def do_initial_setup(self, session):
+        """
+        Create a keyspace with rf=1 and a table containing a single row with 2 non-primary key columns.
+        Insert 1 row, placing the data on a single initial replica. Then, alter the keyspace to rf=3, but don't
+        repair. Tests will execute various reads on the replicas and assert the effects of read repair.
+        :param session: Used to perform the schema setup & insert the data
+        :return: a tuple containing the node which initially acts as the replica, and a list of the other two nodes
+        """
+        # Disable speculative retry to make it clear that we only query additional nodes because of read_repair_chance
         session.execute("""CREATE KEYSPACE alter_rf_test
                            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};""")
-        session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int);")
+        session.execute("CREATE TABLE alter_rf_test.t1 (k int PRIMARY KEY, a int, b int) WITH speculative_retry='NONE';")
         session.execute("INSERT INTO alter_rf_test.t1 (k, a, b) VALUES (1, 1, 1);")
-        cl_one_stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1",
-                                      consistency_level=ConsistencyLevel.ONE)
 
         # identify the initial replica and trigger a flush to ensure reads come from sstables
-        initial_replica, non_replicas = self.identify_initial_placement('alter_rf_test', 't1', 1)
-        debug("At RF=1 replica for data is " + initial_replica.name)
+        initial_replica, non_replicas = self.identify_initial_placement()
+        logger.debug("At RF=1 replica for data is " + initial_replica.name)
         initial_replica.flush()
 
+        # Just some basic validation.
         # At RF=1, it shouldn't matter which node we query, as the actual data should always come from the
         # initial replica when reading at CL ONE
         for n in self.cluster.nodelist():
-            debug("Checking " + n.name)
+            logger.debug("Checking " + n.name)
             session = self.patient_exclusive_cql_connection(n)
             assert_one(session, "SELECT * FROM alter_rf_test.t1 WHERE k=1", [1, 1, 1], cl=ConsistencyLevel.ONE)
 
-        # Alter so RF=n but don't repair, then execute a query which selects only a subset of the columns. Run this at
-        # CL ALL on one of the nodes which doesn't currently have the data, triggering a read repair.
-        # The expectation will be that every replicas will have been repaired for that column (but we make no assumptions
-        # on the other columns).
-        debug("Changing RF from 1 to 3")
+        # Alter so RF=n but don't repair, calling tests will execute queries to exercise read repair,
+        # either at CL.ALL or after setting read_repair_chance to 100%.
+        logger.debug("Changing RF from 1 to 3")
         session.execute("""ALTER KEYSPACE alter_rf_test
                            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};""")
 
-        if not cl_all:
-            debug("Setting table read repair chance to 1")
-            session.execute("""ALTER TABLE alter_rf_test.t1 WITH read_repair_chance = 1;""")
-
-        cl = ConsistencyLevel.ALL if cl_all else ConsistencyLevel.ONE
-
-        debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name)
-        read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
-
-        if cl_all:
-            # result of the read repair query at cl=ALL contains only the selected column
-            assert_one(read_repair_session, "SELECT a FROM alter_rf_test.t1 WHERE k=1", [1], cl=cl)
-        else:
-            # With background read repair at CL=ONE, result may or may not be correct
-            stmt = SimpleStatement("SELECT a FROM alter_rf_test.t1 WHERE k=1", consistency_level=cl)
-            session.execute(stmt)
-
-        # Check the results of the read repair by querying each replica again at CL ONE
-        debug("Re-running SELECTs at CL ONE to verify read repair")
-        for n in self.cluster.nodelist():
-            debug("Checking " + n.name)
-            session = self.patient_exclusive_cql_connection(n)
-            res = rows_to_list(session.execute(cl_one_stmt))
-            # Column a must be 1 everywhere, and column b must be either 1 or None everywhere
-            self.assertIn(res[0][:2], [[1, 1], [1, None]])
-
-        # Now query selecting all columns
-        query = "SELECT * FROM alter_rf_test.t1 WHERE k=1"
-        debug("Executing SELECT on non-initial replica to trigger read repair " + non_replicas[0].name)
-        read_repair_session = self.patient_exclusive_cql_connection(non_replicas[0])
-
-        if cl_all:
-            # result of the read repair query at cl=ALL should contain all columns
-            assert_one(session, query, [1, 1, 1], cl=cl)
-        else:
-            # With background read repair at CL=ONE, result may or may not be correct
-            stmt = SimpleStatement(query, consistency_level=cl)
-            session.execute(stmt)
-
-        # Check all replica is fully up to date
-        debug("Re-running SELECTs at CL ONE to verify read repair")
-        for n in self.cluster.nodelist():
-            debug("Checking " + n.name)
-            session = self.patient_exclusive_cql_connection(n)
-            assert_one(session, query, [1, 1, 1], cl=ConsistencyLevel.ONE)
+        return initial_replica, non_replicas
 
-    def identify_initial_placement(self, keyspace, table, key):
+    def identify_initial_placement(self):
+        """
+        Identify which node in the 3 node cluster contains the specific key at the point that the test keyspace has
+        rf=1.
+        :return: tuple containing the initial replica, plus a list of the other 2 replicas.
+        """
         nodes = self.cluster.nodelist()
         out, _, _ = nodes[0].nodetool("getendpoints alter_rf_test t1 1")
         address = out.split('\n')[-2]
@@ -120,12 +162,31 @@ class TestReadRepair(Tester):
             else:
                 non_replicas.append(node)
 
-        self.assertIsNotNone(initial_replica, "Couldn't identify initial replica")
+        assert initial_replica is not None, "Couldn't identify initial replica"
 
         return initial_replica, non_replicas
 
+    def check_data_on_each_replica(self, expect_fully_repaired, initial_replica):
+        """
+        Perform a SELECT * query at CL.ONE on each replica in turn. If expect_fully_repaired is True, we verify that
+        each replica returns the full row being queried. If not, then we only verify that the 'a' column has been
+        repaired.
+        """
+        stmt = SimpleStatement("SELECT * FROM alter_rf_test.t1 WHERE k=1", consistency_level=ConsistencyLevel.ONE)
+        logger.debug("Checking all if read repair has completed on all replicas")
+        for n in self.cluster.nodelist():
+            logger.debug("Checking {n}, {x}expecting all columns"
+                         .format(n=n.name, x="" if expect_fully_repaired or n == initial_replica else "not "))
+            session = self.patient_exclusive_cql_connection(n)
+            res = rows_to_list(session.execute(stmt))
+            logger.debug("Actual result: " + str(res))
+            expected = [[1, 1, 1]] if expect_fully_repaired or n == initial_replica else [[1, 1, None]]
+            if res != expected:
+                raise NotRepairedException()
+
+
     @since('2.0')
-    def range_slice_query_with_tombstones_test(self):
+    def test_range_slice_query_with_tombstones(self):
         """
         @jira_ticket CASSANDRA-8989
         @jira_ticket CASSANDRA-9502
@@ -174,9 +235,9 @@ class TestReadRepair(Tester):
         for trace_event in trace.events:
             # Step 1, find coordinator node:
             activity = trace_event.description
-            self.assertNotIn("Appending to commitlog", activity)
-            self.assertNotIn("Adding to cf memtable", activity)
-            self.assertNotIn("Acquiring switchLock read lock", activity)
+            assert "Appending to commitlog" not in activity
+            assert "Adding to cf memtable" not in activity
+            assert "Acquiring switchLock read lock" not in activity
 
     @since('3.0')
     def test_gcable_tombstone_resurrection_on_range_slice_query(self):
@@ -221,12 +282,21 @@ class TestReadRepair(Tester):
         self.pprint_trace(trace)
         for trace_event in trace.events:
             activity = trace_event.description
-            self.assertNotIn("Sending READ_REPAIR message", activity)
+            assert "Sending READ_REPAIR message" not in activity
 
     def pprint_trace(self, trace):
         """Pretty print a trace"""
-        if PRINT_DEBUG:
-            print("-" * 40)
+        if logging.root.level == logging.DEBUG:
+            print(("-" * 40))
             for t in trace.events:
-                print("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name))
-            print("-" * 40)
+                print(("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name)))
+            print(("-" * 40))
+
+
+class NotRepairedException(Exception):
+    """
+    Thrown to indicate that the data on a replica hasn't been doesn't match what we'd expect if a
+    specific read repair has run. See check_data_on_each_replica.
+    """
+    pass
+

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/rebuild_test.py
----------------------------------------------------------------------
diff --git a/rebuild_test.py b/rebuild_test.py
index 795c945..919bd54 100644
--- a/rebuild_test.py
+++ b/rebuild_test.py
@@ -1,26 +1,36 @@
+import pytest
 import time
+import logging
+
+from flaky import flaky
+
 from threading import Thread
 
 from cassandra import ConsistencyLevel
 from ccmlib.node import ToolError
 
-from dtest import Tester, debug, create_ks, create_cf
+from dtest import Tester, create_ks, create_cf
 from tools.data import insert_c1c2, query_c1c2
-from tools.decorators import since, no_vnodes
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class TestRebuild(Tester):
-    ignore_log_patterns = (
-        # This one occurs when trying to send the migration to a
-        # node that hasn't started yet, and when it does, it gets
-        # replayed and everything is fine.
-        r'Can\'t send migration request: node.*is down',
-        # ignore streaming error during bootstrap
-        r'Exception encountered during startup',
-        r'Streaming error occurred'
-    )
-
-    def simple_rebuild_test(self):
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            # This one occurs when trying to send the migration to a
+            # node that hasn't started yet, and when it does, it gets
+            # replayed and everything is fine.
+            r'Can\'t send migration request: node.*is down',
+            # ignore streaming error during bootstrap
+            r'Exception encountered during startup',
+            r'Streaming error occurred'
+        )
+
+    def test_simple_rebuild(self):
         """
         @jira_ticket CASSANDRA-9119
 
@@ -48,7 +58,7 @@ class TestRebuild(Tester):
         insert_c1c2(session, n=keys, consistency=ConsistencyLevel.LOCAL_ONE)
 
         # check data
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE)
         session.shutdown()
 
@@ -118,31 +128,35 @@ class TestRebuild(Tester):
         # manually raise exception from cmd1 thread
         # see http://stackoverflow.com/a/1854263
         if cmd1.thread_exc_info is not None:
-            raise cmd1.thread_exc_info[1], None, cmd1.thread_exc_info[2]
+            raise cmd1.thread_exc_info[1].with_traceback(cmd1.thread_exc_info[2])
 
         # exactly 1 of the two nodetool calls should fail
         # usually it will be the one in the main thread,
         # but occasionally it wins the race with the one in the secondary thread,
         # so we check that one succeeded and the other failed
-        self.assertEqual(self.rebuild_errors, 1,
-                         msg='rebuild errors should be 1, but found {}. Concurrent rebuild should not be allowed, but one rebuild command should have succeeded.'.format(self.rebuild_errors))
+        assert self.rebuild_errors == 1, \
+            'rebuild errors should be 1, but found {}. Concurrent rebuild should not be allowed, but one rebuild command should have succeeded.'.format(self.rebuild_errors)
 
         # check data
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE)
 
+    @flaky
     @since('2.2')
-    def resumable_rebuild_test(self):
+    def test_resumable_rebuild(self):
         """
         @jira_ticket CASSANDRA-10810
 
         Test rebuild operation is resumable
         """
-        self.ignore_log_patterns = list(self.ignore_log_patterns) + [r'Error while rebuilding node',
-                                                                     r'Streaming error occurred on session with peer 127.0.0.3',
-                                                                     r'Remote peer 127.0.0.3 failed stream session',
-                                                                     r'Streaming error occurred on session with peer 127.0.0.3:7000',
-                                                                     r'Remote peer 127.0.0.3:7000 failed stream session']
+        self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [
+            r'Error while rebuilding node',
+            r'Streaming error occurred on session with peer 127.0.0.3',
+            r'Remote peer 127.0.0.3 failed stream session',
+            r'Streaming error occurred on session with peer 127.0.0.3:7000',
+            r'Remote peer 127.0.0.3:7000 failed stream session'
+        ]
+
         cluster = self.cluster
         cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.PropertyFileSnitch'})
 
@@ -203,32 +217,32 @@ class TestRebuild(Tester):
         node3.byteman_submit(script)
 
         # First rebuild must fail and data must be incomplete
-        with self.assertRaises(ToolError, msg='Unexpected: SUCCEED'):
-            debug('Executing first rebuild -> '),
+        with pytest.raises(ToolError, msg='Unexpected: SUCCEED'):
+            logger.debug('Executing first rebuild -> '),
             node3.nodetool('rebuild dc1')
-        debug('Expected: FAILED')
+        logger.debug('Expected: FAILED')
 
         session.execute('USE ks')
-        with self.assertRaises(AssertionError, msg='Unexpected: COMPLETE'):
-            debug('Checking data is complete -> '),
-            for i in xrange(0, 20000):
+        with pytest.raises(AssertionError, msg='Unexpected: COMPLETE'):
+            logger.debug('Checking data is complete -> '),
+            for i in range(0, 20000):
                 query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE)
-        debug('Expected: INCOMPLETE')
+        logger.debug('Expected: INCOMPLETE')
 
-        debug('Executing second rebuild -> '),
+        logger.debug('Executing second rebuild -> '),
         node3.nodetool('rebuild dc1')
-        debug('Expected: SUCCEED')
+        logger.debug('Expected: SUCCEED')
 
         # Check all streaming sessions completed, streamed ranges are skipped and verify streamed data
         node3.watch_log_for('All sessions completed')
         node3.watch_log_for('Skipping streaming those ranges.')
-        debug('Checking data is complete -> '),
-        for i in xrange(0, 20000):
+        logger.debug('Checking data is complete -> '),
+        for i in range(0, 20000):
             query_c1c2(session, i, ConsistencyLevel.LOCAL_ONE)
-        debug('Expected: COMPLETE')
+        logger.debug('Expected: COMPLETE')
 
     @since('3.6')
-    def rebuild_ranges_test(self):
+    def test_rebuild_ranges(self):
         """
         @jira_ticket CASSANDRA-10406
         """
@@ -285,16 +299,16 @@ class TestRebuild(Tester):
 
         # check data is sent by stopping node1
         node1.stop()
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.ONE)
         # ks2 should not be streamed
         session.execute('USE ks2')
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)
 
     @since('3.10')
-    @no_vnodes()
-    def disallow_rebuild_nonlocal_range_test(self):
+    @pytest.mark.no_vnodes
+    def test_disallow_rebuild_nonlocal_range(self):
         """
         @jira_ticket CASSANDRA-9875
         Verifies that nodetool rebuild throws an error when an operator
@@ -322,12 +336,12 @@ class TestRebuild(Tester):
         session = self.patient_exclusive_cql_connection(node1)
         session.execute("CREATE KEYSPACE ks1 WITH replication = {'class':'SimpleStrategy', 'replication_factor':2};")
 
-        with self.assertRaisesRegexp(ToolError, 'is not a range that is owned by this node'):
+        with pytest.raises(ToolError, match='is not a range that is owned by this node'):
             node1.nodetool('rebuild -ks ks1 -ts (%s,%s]' % (node1_token, node2_token))
 
     @since('3.10')
-    @no_vnodes()
-    def disallow_rebuild_from_nonreplica_test(self):
+    @pytest.mark.no_vnodes
+    def test_disallow_rebuild_from_nonreplica(self):
         """
         @jira_ticket CASSANDRA-9875
         Verifies that nodetool rebuild throws an error when an operator
@@ -358,12 +372,12 @@ class TestRebuild(Tester):
         session = self.patient_exclusive_cql_connection(node1)
         session.execute("CREATE KEYSPACE ks1 WITH replication = {'class':'SimpleStrategy', 'replication_factor':2};")
 
-        with self.assertRaisesRegexp(ToolError, 'Unable to find sufficient sources for streaming range'):
+        with pytest.raises(ToolError, message='Unable to find sufficient sources for streaming range'):
             node1.nodetool('rebuild -ks ks1 -ts (%s,%s] -s %s' % (node3_token, node1_token, node3_address))
 
     @since('3.10')
-    @no_vnodes()
-    def rebuild_with_specific_sources_test(self):
+    @pytest.mark.no_vnodes
+    def test_rebuild_with_specific_sources(self):
         """
         @jira_ticket CASSANDRA-9875
         Verifies that an operator can specify specific sources to use
@@ -426,18 +440,18 @@ class TestRebuild(Tester):
 
         # verify that node2 streamed to node3
         log_matches = node2.grep_log('Session with %s is complete' % node3.address_for_current_version())
-        self.assertTrue(len(log_matches) > 0)
+        assert len(log_matches) > 0
 
         # verify that node1 did not participate
         log_matches = node1.grep_log('streaming plan for Rebuild')
-        self.assertEqual(len(log_matches), 0)
+        assert len(log_matches) == 0
 
         # check data is sent by stopping node1, node2
         node1.stop()
         node2.stop()
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.ONE)
         # ks2 should not be streamed
         session.execute('USE ks2')
-        for i in xrange(0, keys):
+        for i in range(0, keys):
             query_c1c2(session, i, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/deprecated_repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/deprecated_repair_test.py b/repair_tests/deprecated_repair_test.py
index e438f53..4c60664 100644
--- a/repair_tests/deprecated_repair_test.py
+++ b/repair_tests/deprecated_repair_test.py
@@ -1,15 +1,20 @@
+import pytest
+import logging
+
 from distutils.version import LooseVersion
 
 from cassandra import ConsistencyLevel
 from ccmlib.common import is_win
 
-from dtest import Tester, debug, create_ks, create_cf
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_length_equal
 from tools.data import insert_c1c2
-from tools.decorators import since
 from tools.jmxutils import (JolokiaAgent, make_mbean,
                             remove_perf_disable_shared_mem)
 
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
 
 @since("2.2", max_version="4")
 class TestDeprecatedRepairAPI(Tester):
@@ -19,7 +24,7 @@ class TestDeprecatedRepairAPI(Tester):
     Test if deprecated repair JMX API runs with expected parameters
     """
 
-    def force_repair_async_1_test(self):
+    def test_force_repair_async_1(self):
         """
         test forceRepairAsync(String keyspace, boolean isSequential,
                               Collection<String> dataCenters,
@@ -28,15 +33,15 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
                                           ['ks', True, [], [], False, False, ["cf"]])
-        self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
-        self.assertEqual(opt["primary_range"], "false", opt)
-        self.assertEqual(opt["incremental"], "true", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["column_families"], "[cf]", opt)
-
-    def force_repair_async_2_test(self):
+        assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
+        assert opt["primary_range"], "false" == opt
+        assert opt["incremental"], "true" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["column_families"], "[cf]" == opt
+
+    def test_force_repair_async_2(self):
         """
         test forceRepairAsync(String keyspace, int parallelismDegree,
                               Collection<String> dataCenters,
@@ -45,15 +50,15 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,int,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
                                           ['ks', 1, [], [], True, True, []])
-        self.assertEqual(opt["parallelism"], "parallel", opt)
-        self.assertEqual(opt["primary_range"], "true", opt)
-        self.assertEqual(opt["incremental"], "false", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["column_families"], "[]", opt)
-
-    def force_repair_async_3_test(self):
+        assert opt["parallelism"], "parallel" == opt
+        assert opt["primary_range"], "true" == opt
+        assert opt["incremental"], "false" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["column_families"], "[]" == opt
+
+    def test_force_repair_async_3(self):
         """
         test forceRepairAsync(String keyspace, boolean isSequential,
                               boolean isLocal, boolean primaryRange,
@@ -61,15 +66,15 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,boolean,boolean,boolean,[Ljava.lang.String;)",
                                           ['ks', False, False, False, False, ["cf"]])
-        self.assertEqual(opt["parallelism"], "parallel", opt)
-        self.assertEqual(opt["primary_range"], "false", opt)
-        self.assertEqual(opt["incremental"], "true", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["column_families"], "[cf]", opt)
-
-    def force_repair_range_async_1_test(self):
+        assert opt["parallelism"], "parallel" == opt
+        assert opt["primary_range"], "false" == opt
+        assert opt["incremental"], "true" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["column_families"], "[cf]" == opt
+
+    def test_force_repair_range_async_1(self):
         """
         test forceRepairRangeAsync(String beginToken, String endToken,
                                    String keyspaceName, boolean isSequential,
@@ -79,16 +84,16 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
                                           ["0", "1000", "ks", True, ["dc1"], [], False, ["cf"]])
-        self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
-        self.assertEqual(opt["primary_range"], "false", opt)
-        self.assertEqual(opt["incremental"], "true", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[dc1]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["ranges"], "1", opt)
-        self.assertEqual(opt["column_families"], "[cf]", opt)
-
-    def force_repair_range_async_2_test(self):
+        assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
+        assert opt["primary_range"], "false" == opt
+        assert opt["incremental"], "true" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[dc1]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["ranges"], "1" == opt
+        assert opt["column_families"], "[cf]" == opt
+
+    def test_force_repair_range_async_2(self):
         """
         test forceRepairRangeAsync(String beginToken, String endToken,
                                    String keyspaceName, int parallelismDegree,
@@ -98,16 +103,16 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,int,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
                                           ["0", "1000", "ks", 2, [], [], True, ["cf"]])
-        self.assertEqual(opt["parallelism"], "parallel" if is_win() else "dc_parallel", opt)
-        self.assertEqual(opt["primary_range"], "false", opt)
-        self.assertEqual(opt["incremental"], "false", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["ranges"], "1", opt)
-        self.assertEqual(opt["column_families"], "[cf]", opt)
-
-    def force_repair_range_async_3_test(self):
+        assert opt["parallelism"], "parallel" if is_win() else "dc_parallel" == opt
+        assert opt["primary_range"], "false" == opt
+        assert opt["incremental"], "false" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["ranges"], "1" == opt
+        assert opt["column_families"], "[cf]" == opt
+
+    def test_force_repair_range_async_3(self):
         """
         test forceRepairRangeAsync(String beginToken, String endToken,
                                    String keyspaceName, boolean isSequential,
@@ -116,14 +121,14 @@ class TestDeprecatedRepairAPI(Tester):
         """
         opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,boolean,boolean,[Ljava.lang.String;)",
                                           ["0", "1000", "ks", True, True, True, ["cf"]])
-        self.assertEqual(opt["parallelism"], "parallel" if is_win() else "sequential", opt)
-        self.assertEqual(opt["primary_range"], "false", opt)
-        self.assertEqual(opt["incremental"], "false", opt)
-        self.assertEqual(opt["job_threads"], "1", opt)
-        self.assertEqual(opt["data_centers"], "[dc1]", opt)
-        self.assertEqual(opt["hosts"], "[]", opt)
-        self.assertEqual(opt["ranges"], "1", opt)
-        self.assertEqual(opt["column_families"], "[cf]", opt)
+        assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
+        assert opt["primary_range"], "false" == opt
+        assert opt["incremental"], "false" == opt
+        assert opt["job_threads"], "1" == opt
+        assert opt["data_centers"], "[dc1]" == opt
+        assert opt["hosts"], "[]" == opt
+        assert opt["ranges"], "1" == opt
+        assert opt["column_families"], "[cf]" == opt
 
     def _deprecated_repair_jmx(self, method, arguments):
         """
@@ -135,7 +140,7 @@ class TestDeprecatedRepairAPI(Tester):
         """
         cluster = self.cluster
 
-        debug("Starting cluster..")
+        logger.debug("Starting cluster..")
         cluster.populate([1, 1])
         node1, node2 = cluster.nodelist()
         remove_perf_disable_shared_mem(node1)
@@ -152,7 +157,7 @@ class TestDeprecatedRepairAPI(Tester):
         mbean = make_mbean('db', 'StorageService')
         with JolokiaAgent(node1) as jmx:
             # assert repair runs and returns valid cmd number
-            self.assertEqual(jmx.execute_method(mbean, method, arguments), 1)
+            assert jmx.execute_method(mbean, method, arguments) == 1
         # wait for log to start
         node1.watch_log_for("Starting repair command")
         # get repair parameters from the log
@@ -165,7 +170,7 @@ class TestDeprecatedRepairAPI(Tester):
         line, m = line[0]
 
         if supports_pull_repair:
-            self.assertEqual(m.group("pullrepair"), "false", "Pull repair cannot be enabled through the deprecated API so the pull repair option should always be false.")
+            assert m.group("pullrepair"), "false" == "Pull repair cannot be enabled through the deprecated API so the pull repair option should always be false."
 
         return {"parallelism": m.group("parallelism"),
                 "primary_range": m.group("pr"),

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/incremental_repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py
index e3017c2..4791e9a 100644
--- a/repair_tests/incremental_repair_test.py
+++ b/repair_tests/incremental_repair_test.py
@@ -1,8 +1,11 @@
 import time
+import pytest
+import re
+import logging
+
 from datetime import datetime
 from collections import Counter, namedtuple
 from re import findall, compile
-from unittest import skip
 from uuid import UUID, uuid1
 
 from cassandra import ConsistencyLevel
@@ -10,13 +13,14 @@ from cassandra.query import SimpleStatement
 from cassandra.metadata import Murmur3Token
 from ccmlib.common import is_win
 from ccmlib.node import Node, ToolError
-from nose.plugins.attrib import attr
 
-from dtest import Tester, debug, create_ks, create_cf
+from dtest import Tester, create_ks, create_cf
 from tools.assertions import assert_almost_equal, assert_one
 from tools.data import insert_c1c2
-from tools.decorators import since, no_vnodes
-from tools.misc import new_node
+from tools.misc import new_node, ImmutableMapping
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
 
 
 class ConsistentState(object):
@@ -29,7 +33,12 @@ class ConsistentState(object):
 
 
 class TestIncRepair(Tester):
-    ignore_log_patterns = (r'Can\'t send migration request: node.*is down',)
+
+    @pytest.fixture(autouse=True)
+    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
+        fixture_dtest_setup.ignore_log_patterns = (
+            r'Can\'t send migration request: node.*is down'
+        )
 
     @classmethod
     def _get_repaired_data(cls, node, keyspace):
@@ -41,7 +50,7 @@ class TestIncRepair(Tester):
         out = node.run_sstablemetadata(keyspace=keyspace).stdout
 
         def matches(pattern):
-            return filter(None, [pattern.match(l) for l in out.split('\n')])
+            return filter(None, [pattern.match(l) for l in out.decode("utf-8").split('\n')])
         names = [m.group(1) for m in matches(_sstable_name)]
         repaired_times = [int(m.group(1)) for m in matches(_repaired_at)]
 
@@ -57,37 +66,38 @@ class TestIncRepair(Tester):
     def assertNoRepairedSSTables(self, node, keyspace):
         """ Checks that no sstables are marked repaired, and none are marked pending repair """
         data = self._get_repaired_data(node, keyspace)
-        self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data))
-        self.assertTrue(all([t.pending_id is None for t in data]))
+        assert all([t.repaired == 0 for t in data]), '{}'.format(data)
+        assert all([t.pending_id is None for t in data])
 
     def assertAllPendingRepairSSTables(self, node, keyspace, pending_id=None):
         """ Checks that no sstables are marked repaired, and all are marked pending repair """
         data = self._get_repaired_data(node, keyspace)
-        self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data))
+        assert all([t.repaired == 0 for t in data]), '{}'.format(data)
         if pending_id:
-            self.assertTrue(all([t.pending_id == pending_id for t in data]))
+            assert all([t.pending_id == pending_id for t in data])
         else:
-            self.assertTrue(all([t.pending_id is not None for t in data]))
+            assert all([t.pending_id is not None for t in data])
 
     def assertAllRepairedSSTables(self, node, keyspace):
         """ Checks that all sstables are marked repaired, and none are marked pending repair """
         data = self._get_repaired_data(node, keyspace)
-        self.assertTrue(all([t.repaired > 0 for t in data]), '{}'.format(data))
-        self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data))
+        assert all([t.repaired > 0 for t in data]), '{}'.format(data)
+        assert all([t.pending_id is None for t in data]), '{}'.format(data)
 
     def assertRepairedAndUnrepaired(self, node, keyspace):
         """ Checks that a node has both repaired and unrepaired sstables for a given keyspace """
         data = self._get_repaired_data(node, keyspace)
-        self.assertTrue(any([t.repaired > 0 for t in data]), '{}'.format(data))
-        self.assertTrue(any([t.repaired == 0 for t in data]), '{}'.format(data))
-        self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data))
+        assert any([t.repaired > 0 for t in data]), '{}'.format(data)
+        assert any([t.repaired == 0 for t in data]), '{}'.format(data)
+        assert all([t.pending_id is None for t in data]), '{}'.format(data)
 
     @since('4.0')
-    def consistent_repair_test(self):
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+    def test_consistent_repair(self):
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         # make data inconsistent between nodes
         session = self.patient_exclusive_cql_connection(node3)
@@ -115,15 +125,15 @@ class TestIncRepair(Tester):
         node1.start(wait_other_notice=True, wait_for_binary_proto=True)
 
         # flush and check that no sstables are marked repaired
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             node.flush()
             self.assertNoRepairedSSTables(node, 'ks')
             session = self.patient_exclusive_cql_connection(node)
             results = list(session.execute("SELECT * FROM system.repairs"))
-            self.assertEqual(len(results), 0, str(results))
+            assert len(results) == 0, str(results)
 
         # disable compaction so we can verify sstables are marked pending repair
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             node.nodetool('disableautocompaction ks tbl')
 
         node1.repair(options=['ks'])
@@ -131,28 +141,28 @@ class TestIncRepair(Tester):
         # check that all participating nodes have the repair recorded in their system
         # table, that all nodes are listed as participants, and that all sstables are
         # (still) marked pending repair
-        expected_participants = {n.address() for n in cluster.nodelist()}
-        expected_participants_wp = {n.address_and_port() for n in cluster.nodelist()}
+        expected_participants = {n.address() for n in self.cluster.nodelist()}
+        expected_participants_wp = {n.address_and_port() for n in self.cluster.nodelist()}
         recorded_pending_ids = set()
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             session = self.patient_exclusive_cql_connection(node)
             results = list(session.execute("SELECT * FROM system.repairs"))
-            self.assertEqual(len(results), 1)
+            assert len(results) == 1
             result = results[0]
-            self.assertEqual(set(result.participants), expected_participants)
+            assert set(result.participants) == expected_participants
             if hasattr(result, "participants_wp"):
-                self.assertEqual(set(result.participants_wp), expected_participants_wp)
-            self.assertEqual(result.state, ConsistentState.FINALIZED, "4=FINALIZED")
+                assert set(result.participants_wp) == expected_participants_wp
+            assert result.state, ConsistentState.FINALIZED == "4=FINALIZED"
             pending_id = result.parent_id
             self.assertAllPendingRepairSSTables(node, 'ks', pending_id)
             recorded_pending_ids.add(pending_id)
 
-        self.assertEqual(len(recorded_pending_ids), 1)
+        assert len(recorded_pending_ids) == 1
 
         # sstables are compacted out of pending repair by a compaction
         # task, we disabled compaction earlier in the test, so here we
         # force the compaction and check that all sstables are promoted
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             node.nodetool('compact ks tbl')
             self.assertAllRepairedSSTables(node, 'ks')
 
@@ -166,7 +176,7 @@ class TestIncRepair(Tester):
         ranges = {'\x00\x00\x00\x08K\xc2\xed\\<\xd3{X\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6',
                   '\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4',
                   '\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4\x00\x00\x00\x08K\xc2\xed\\<\xd3{X'}
-        ranges = {buffer(b) for b in ranges}
+        ranges = {bytes(b, "Latin-1") for b in ranges}
 
         for node in self.cluster.nodelist():
             session = self.patient_exclusive_cql_connection(node)
@@ -177,8 +187,13 @@ class TestIncRepair(Tester):
                              {str(n.address()) + ":7000" for n in self.cluster.nodelist()},
                              ranges, now, now, ConsistentState.REPAIRING])  # 2=REPAIRING
 
+        # as we faked repairs and inserted directly into system.repairs table, the current
+        # implementation in trunk (LocalSessions) only pulls the sessions via callbacks or
+        # from the system.repairs table once at startup. we need to stop and start the nodes
+        # as a way to force the repair sessions to get populated into the correct in-memory objects
         time.sleep(1)
         for node in self.cluster.nodelist():
+            node.flush()
             node.stop(gently=False)
 
         for node in self.cluster.nodelist():
@@ -187,12 +202,13 @@ class TestIncRepair(Tester):
         return session_id
 
     @since('4.0')
-    def manual_session_fail_test(self):
+    def test_manual_session_fail(self):
         """ check manual failing of repair sessions via nodetool works properly """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         # make data inconsistent between nodes
         session = self.patient_exclusive_cql_connection(node3)
@@ -201,35 +217,37 @@ class TestIncRepair(Tester):
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
-            self.assertIn("no sessions", out.stdout)
+            assert "no sessions" in out.stdout
 
         session_id = self._make_fake_session('ks', 'tbl')
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("REPAIRING", line)
+            assert re.match(str(session_id), line)
+            assert "REPAIRING" in line
 
         node1.nodetool("repair_admin --cancel {}".format(session_id))
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin --all')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("FAILED", line)
+            assert re.match(str(session_id), line)
+            assert "FAILED" in line
 
     @since('4.0')
-    def manual_session_cancel_non_coordinator_failure_test(self):
+    def test_manual_session_cancel_non_coordinator_failure(self):
         """ check manual failing of repair sessions via a node other than the coordinator fails """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         # make data inconsistent between nodes
         session = self.patient_exclusive_cql_connection(node3)
@@ -238,17 +256,17 @@ class TestIncRepair(Tester):
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
-            self.assertIn("no sessions", out.stdout)
+            assert "no sessions" in out.stdout
 
         session_id = self._make_fake_session('ks', 'tbl')
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("REPAIRING", line)
+            assert re.match(str(session_id), line)
+            assert "REPAIRING" in line
 
         try:
             node2.nodetool("repair_admin --cancel {}".format(session_id))
@@ -260,18 +278,19 @@ class TestIncRepair(Tester):
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("REPAIRING", line)
+            assert re.match(str(session_id), line)
+            assert "REPAIRING" in line
 
     @since('4.0')
-    def manual_session_force_cancel_test(self):
+    def test_manual_session_force_cancel(self):
         """ check manual failing of repair sessions via a non-coordinator works if the --force flag is set """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         # make data inconsistent between nodes
         session = self.patient_exclusive_cql_connection(node3)
@@ -280,29 +299,29 @@ class TestIncRepair(Tester):
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
-            self.assertIn("no sessions", out.stdout)
+            assert "no sessions" in out.stdout
 
         session_id = self._make_fake_session('ks', 'tbl')
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("REPAIRING", line)
+            assert re.match(str(session_id), line)
+            assert "REPAIRING" in line
 
         node2.nodetool("repair_admin --cancel {} --force".format(session_id))
 
         for node in self.cluster.nodelist():
             out = node.nodetool('repair_admin --all')
             lines = out.stdout.split('\n')
-            self.assertGreater(len(lines), 1)
+            assert len(lines) > 1
             line = lines[1]
-            self.assertIn(str(session_id), line)
-            self.assertIn("FAILED", line)
+            assert re.match(str(session_id), line)
+            assert "FAILED" in line
 
-    def sstable_marking_test(self):
+    def test_sstable_marking(self):
         """
         * Launch a three node cluster
         * Stop node3
@@ -311,11 +330,10 @@ class TestIncRepair(Tester):
         * Issue an incremental repair, and wait for it to finish
         * Run sstablemetadata on every node, assert that all sstables are marked as repaired
         """
-        cluster = self.cluster
         # hinted handoff can create SSTable that we don't need after node3 restarted
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         node3.stop(gently=True)
 
@@ -331,20 +349,20 @@ class TestIncRepair(Tester):
         node3.watch_log_for("Initializing keyspace1.standard1", filename=log_file)
         # wait for things to settle before starting repair
         time.sleep(1)
-        if cluster.version() >= "2.2":
+        if self.cluster.version() >= "2.2":
             node3.repair()
         else:
             node3.nodetool("repair -par -inc")
 
-        if cluster.version() >= '4.0':
+        if self.cluster.version() >= '4.0':
             # sstables are compacted out of pending repair by a compaction
-            for node in cluster.nodelist():
+            for node in self.cluster.nodelist():
                 node.nodetool('compact keyspace1 standard1')
 
-        for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist()):
-            self.assertNotIn('Repaired at: 0', out)
+        for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in self.cluster.nodelist()):
+            assert 'Repaired at: 0' not in out.decode("utf-8")
 
-    def multiple_repair_test(self):
+    def test_multiple_repair(self):
         """
         * Launch a three node cluster
         * Create a keyspace with RF 3 and a table
@@ -370,21 +388,21 @@ class TestIncRepair(Tester):
         create_ks(session, 'ks', 3)
         create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
 
-        debug("insert data")
+        logger.debug("insert data")
 
-        insert_c1c2(session, keys=range(1, 50), consistency=ConsistencyLevel.ALL)
+        insert_c1c2(session, keys=list(range(1, 50)), consistency=ConsistencyLevel.ALL)
         node1.flush()
 
-        debug("bringing down node 3")
+        logger.debug("bringing down node 3")
         node3.flush()
         node3.stop(gently=False)
 
-        debug("inserting additional data into node 1 and 2")
-        insert_c1c2(session, keys=range(50, 100), consistency=ConsistencyLevel.TWO)
+        logger.debug("inserting additional data into node 1 and 2")
+        insert_c1c2(session, keys=list(range(50, 100)), consistency=ConsistencyLevel.TWO)
         node1.flush()
         node2.flush()
 
-        debug("restarting and repairing node 3")
+        logger.debug("restarting and repairing node 3")
         node3.start(wait_for_binary_proto=True)
 
         if cluster.version() >= "2.2":
@@ -397,15 +415,15 @@ class TestIncRepair(Tester):
         if is_win:
             time.sleep(2)
 
-        debug("stopping node 2")
+        logger.debug("stopping node 2")
         node2.stop(gently=False)
 
-        debug("inserting data in nodes 1 and 3")
-        insert_c1c2(session, keys=range(100, 150), consistency=ConsistencyLevel.TWO)
+        logger.debug("inserting data in nodes 1 and 3")
+        insert_c1c2(session, keys=list(range(100, 150)), consistency=ConsistencyLevel.TWO)
         node1.flush()
         node3.flush()
 
-        debug("start and repair node 2")
+        logger.debug("start and repair node 2")
         node2.start(wait_for_binary_proto=True)
 
         if cluster.version() >= "2.2":
@@ -413,7 +431,7 @@ class TestIncRepair(Tester):
         else:
             node2.nodetool("repair -par -inc")
 
-        debug("replace node and check data integrity")
+        logger.debug("replace node and check data integrity")
         node3.stop(gently=False)
         node5 = Node('node5', cluster, True, ('127.0.0.5', 9160), ('127.0.0.5', 7000), '7500', '0', None, ('127.0.0.5', 9042))
         cluster.add(node5, False)
@@ -421,7 +439,7 @@ class TestIncRepair(Tester):
 
         assert_one(session, "SELECT COUNT(*) FROM ks.cf LIMIT 200", [149])
 
-    def sstable_repairedset_test(self):
+    def test_sstable_repairedset(self):
         """
         * Launch a two node cluster
         * Insert data with stress
@@ -437,10 +455,9 @@ class TestIncRepair(Tester):
         * Run sstablemetadata on both nodes again, pipe to a new file
         * Verify repairs occurred and repairedAt was updated
         """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
-        cluster.populate(2).start()
-        node1, node2 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+        self.cluster.populate(2).start()
+        node1, node2 = self.cluster.nodelist()
         node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50'])
 
         node1.flush()
@@ -451,53 +468,57 @@ class TestIncRepair(Tester):
         node2.run_sstablerepairedset(keyspace='keyspace1')
         node2.start(wait_for_binary_proto=True)
 
-        initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout
-        initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout
+        initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout.decode("utf-8")
+        initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout.decode("utf-8")
 
         matches = findall('(?<=Repaired at:).*', '\n'.join([initialOut1, initialOut2]))
-        debug("Repair timestamps are: {}".format(matches))
+        logger.debug("Repair timestamps are: {}".format(matches))
 
         uniquematches = set(matches)
         matchcount = Counter(matches)
 
-        self.assertGreaterEqual(len(uniquematches), 2, uniquematches)
+        assert len(uniquematches) >= 2, uniquematches
 
-        self.assertGreaterEqual(max(matchcount), 1, matchcount)
+        assert len(max(matchcount)) >= 1, matchcount
 
-        self.assertIn('Repaired at: 0', '\n'.join([initialOut1, initialOut2]))
+        assert re.search('Repaired at: 0', '\n'.join([initialOut1, initialOut2]))
 
         node1.stop()
         node2.stress(['write', 'n=15K', 'no-warmup', '-schema', 'replication(factor=2)'])
         node2.flush()
         node1.start(wait_for_binary_proto=True)
 
-        if cluster.version() >= "2.2":
+        if self.cluster.version() >= "2.2":
             node1.repair()
         else:
             node1.nodetool("repair -par -inc")
 
-        if cluster.version() >= '4.0':
+        if self.cluster.version() >= '4.0':
             # sstables are compacted out of pending repair by a compaction
-            for node in cluster.nodelist():
+            for node in self.cluster.nodelist():
                 node.nodetool('compact keyspace1 standard1')
 
         finalOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout
+        if not isinstance(finalOut1, str):
+            finalOut1 = finalOut1.decode("utf-8")
         finalOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout
+        if not isinstance(finalOut2, str):
+            finalOut2 = finalOut2.decode("utf-8")
 
         matches = findall('(?<=Repaired at:).*', '\n'.join([finalOut1, finalOut2]))
 
-        debug(matches)
+        logger.debug(matches)
 
         uniquematches = set(matches)
         matchcount = Counter(matches)
 
-        self.assertGreaterEqual(len(uniquematches), 2)
+        assert len(uniquematches) >= 2
 
-        self.assertGreaterEqual(max(matchcount), 2)
+        assert len(max(matchcount)) >= 2
 
-        self.assertNotIn('Repaired at: 0', '\n'.join([finalOut1, finalOut2]))
+        assert not re.search('Repaired at: 0', '\n'.join([finalOut1, finalOut2]))
 
-    def compaction_test(self):
+    def test_compaction(self):
         """
         Test we can major compact after an incremental repair
         * Launch a three node cluster
@@ -543,22 +564,22 @@ class TestIncRepair(Tester):
             assert_one(session, "select val from tab where key =" + str(x), [1])
 
     @since("2.2")
-    def multiple_full_repairs_lcs_test(self):
+    def test_multiple_full_repairs_lcs(self):
         """
         @jira_ticket CASSANDRA-11172 - repeated full repairs should not cause infinite loop in getNextBackgroundTask
         """
         cluster = self.cluster
         cluster.populate(2).start(wait_for_binary_proto=True)
         node1, node2 = cluster.nodelist()
-        for x in xrange(0, 10):
+        for x in range(0, 10):
             node1.stress(['write', 'n=100k', 'no-warmup', '-rate', 'threads=10', '-schema', 'compaction(strategy=LeveledCompactionStrategy,sstable_size_in_mb=10)', 'replication(factor=2)'])
             cluster.flush()
             cluster.wait_for_compactions()
             node1.nodetool("repair -full keyspace1 standard1")
 
-    @attr('long')
-    @skip('hangs CI')
-    def multiple_subsequent_repair_test(self):
+    @pytest.mark.env("long")
+    @pytest.mark.skip(reason='hangs CI')
+    def test_multiple_subsequent_repair(self):
         """
         @jira_ticket CASSANDRA-8366
 
@@ -576,55 +597,55 @@ class TestIncRepair(Tester):
         cluster.populate(3).start()
         node1, node2, node3 = cluster.nodelist()
 
-        debug("Inserting data with stress")
+        logger.debug("Inserting data with stress")
         node1.stress(['write', 'n=5M', 'no-warmup', '-rate', 'threads=10', '-schema', 'replication(factor=3)'])
 
-        debug("Flushing nodes")
+        logger.debug("Flushing nodes")
         cluster.flush()
 
-        debug("Waiting compactions to finish")
+        logger.debug("Waiting compactions to finish")
         cluster.wait_for_compactions()
 
         if self.cluster.version() >= '2.2':
-            debug("Repairing node1")
+            logger.debug("Repairing node1")
             node1.nodetool("repair")
-            debug("Repairing node2")
+            logger.debug("Repairing node2")
             node2.nodetool("repair")
-            debug("Repairing node3")
+            logger.debug("Repairing node3")
             node3.nodetool("repair")
         else:
-            debug("Repairing node1")
+            logger.debug("Repairing node1")
             node1.nodetool("repair -par -inc")
-            debug("Repairing node2")
+            logger.debug("Repairing node2")
             node2.nodetool("repair -par -inc")
-            debug("Repairing node3")
+            logger.debug("Repairing node3")
             node3.nodetool("repair -par -inc")
 
-        # Using "print" instead of debug() here is on purpose.  The compactions
+        # Using "print" instead of logger.debug() here is on purpose.  The compactions
         # take a long time and don't print anything by default, which can result
         # in the test being timed out after 20 minutes.  These print statements
         # prevent it from being timed out.
-        print "compacting node1"
+        print("compacting node1")
         node1.compact()
-        print "compacting node2"
+        print("compacting node2")
         node2.compact()
-        print "compacting node3"
+        print("compacting node3")
         node3.compact()
 
         # wait some time to be sure the load size is propagated between nodes
-        debug("Waiting for load size info to be propagated between nodes")
+        logger.debug("Waiting for load size info to be propagated between nodes")
         time.sleep(45)
 
-        load_size_in_kb = float(sum(map(lambda n: n.data_size(), [node1, node2, node3])))
+        load_size_in_kb = float(sum([n.data_size() for n in [node1, node2, node3]]))
         load_size = load_size_in_kb / 1024 / 1024
-        debug("Total Load size: {}GB".format(load_size))
+        logger.debug("Total Load size: {}GB".format(load_size))
 
         # There is still some overhead, but it's lot better. We tolerate 25%.
         expected_load_size = 4.5  # In GB
         assert_almost_equal(load_size, expected_load_size, error=0.25)
 
-    @attr('resource-intensive')
-    def sstable_marking_test_not_intersecting_all_ranges(self):
+    @pytest.mark.resource_intensive
+    def test_sstable_marking_not_intersecting_all_ranges(self):
         """
         @jira_ticket CASSANDRA-10299
         * Launch a four node cluster
@@ -636,21 +657,21 @@ class TestIncRepair(Tester):
         cluster.populate(4).start(wait_for_binary_proto=True)
         node1, node2, node3, node4 = cluster.nodelist()
 
-        debug("Inserting data with stress")
+        logger.debug("Inserting data with stress")
         node1.stress(['write', 'n=3', 'no-warmup', '-rate', 'threads=1', '-schema', 'replication(factor=3)'])
 
-        debug("Flushing nodes")
+        logger.debug("Flushing nodes")
         cluster.flush()
 
         repair_options = '' if self.cluster.version() >= '2.2' else '-inc -par'
 
-        debug("Repairing node 1")
+        logger.debug("Repairing node 1")
         node1.nodetool("repair {}".format(repair_options))
-        debug("Repairing node 2")
+        logger.debug("Repairing node 2")
         node2.nodetool("repair {}".format(repair_options))
-        debug("Repairing node 3")
+        logger.debug("Repairing node 3")
         node3.nodetool("repair {}".format(repair_options))
-        debug("Repairing node 4")
+        logger.debug("Repairing node 4")
         node4.nodetool("repair {}".format(repair_options))
 
         if cluster.version() >= '4.0':
@@ -659,16 +680,16 @@ class TestIncRepair(Tester):
                 node.nodetool('compact keyspace1 standard1')
 
         for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist() if len(node.get_sstables('keyspace1', 'standard1')) > 0):
-            self.assertNotIn('Repaired at: 0', out)
+            assert 'Repaired at: 0' not in out
 
-    @no_vnodes()
+    @pytest.mark.no_vnodes
     @since('4.0')
-    def move_test(self):
+    def test_move(self):
         """ Test repaired data remains in sync after a move """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start()
-        node1, node2, node3, node4 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start()
+        node1, node2, node3, node4 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
@@ -686,25 +707,25 @@ class TestIncRepair(Tester):
             session.execute(stmt, (v, v))
 
         # everything should be in sync
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
         node2.nodetool('move {}'.format(2**16))
 
         # everything should still be in sync
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
-    @no_vnodes()
+    @pytest.mark.no_vnodes
     @since('4.0')
-    def decommission_test(self):
+    def test_decommission(self):
         """ Test repaired data remains in sync after a decommission """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(4).start()
-        node1, node2, node3, node4 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(4).start()
+        node1, node2, node3, node4 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
@@ -722,25 +743,25 @@ class TestIncRepair(Tester):
             session.execute(stmt, (v, v))
 
         # everything should be in sync
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
         node2.nodetool('decommission')
 
         # everything should still be in sync
         for node in [node1, node3, node4]:
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
-    @no_vnodes()
+    @pytest.mark.no_vnodes
     @since('4.0')
-    def bootstrap_test(self):
+    def test_bootstrap(self):
         """ Test repaired data remains in sync after a bootstrap """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
@@ -760,27 +781,28 @@ class TestIncRepair(Tester):
         # everything should be in sync
         for node in [node1, node2, node3]:
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
         node4 = new_node(self.cluster)
         node4.start(wait_for_binary_proto=True)
 
-        self.assertEqual(len(self.cluster.nodelist()), 4)
+        assert len(self.cluster.nodelist()) == 4
         # everything should still be in sync
         for node in self.cluster.nodelist():
             result = node.repair(options=['ks', '--validate'])
-            self.assertIn("Repaired data is in sync", result.stdout)
+            assert "Repaired data is in sync" in result.stdout
 
     @since('4.0')
-    def force_test(self):
+    def test_force(self):
         """
         forcing an incremental repair should incrementally repair any nodes
         that are up, but should not promote the sstables to repaired
         """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
@@ -793,7 +815,7 @@ class TestIncRepair(Tester):
         node2.stop()
 
         # repair should fail because node2 is down
-        with self.assertRaises(ToolError):
+        with pytest.raises(ToolError):
             node1.repair(options=['ks'])
 
         # run with force flag
@@ -804,15 +826,16 @@ class TestIncRepair(Tester):
         self.assertNoRepairedSSTables(node2, 'ks')
 
     @since('4.0')
-    def hosts_test(self):
+    def test_hosts(self):
         """
         running an incremental repair with hosts specified should incrementally repair
         the given nodes, but should not promote the sstables to repaired
         """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
@@ -830,18 +853,17 @@ class TestIncRepair(Tester):
         self.assertNoRepairedSSTables(node2, 'ks')
 
     @since('4.0')
-    def subrange_test(self):
+    def test_subrange(self):
         """
         running an incremental repair with hosts specified should incrementally repair
         the given nodes, but should not promote the sstables to repaired
         """
-        cluster = self.cluster
-        cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
-                                                  'num_tokens': 1,
-                                                  'commitlog_sync_period_in_ms': 500,
-                                                  'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'})
-        cluster.populate(3).start()
-        node1, node2, node3 = cluster.nodelist()
+        self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+                                                                                     'num_tokens': 1,
+                                                                                     'commitlog_sync_period_in_ms': 500,
+                                                                                     'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'})
+        self.cluster.populate(3).start()
+        node1, node2, node3 = self.cluster.nodelist()
 
         session = self.patient_exclusive_cql_connection(node3)
         session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
@@ -851,12 +873,12 @@ class TestIncRepair(Tester):
         for i in range(10):
             session.execute(stmt, (i, i))
 
-        for node in cluster.nodelist():
+        for node in self.cluster.nodelist():
             node.flush()
             self.assertNoRepairedSSTables(node, 'ks')
 
         # only repair the partition k=0
-        token = Murmur3Token.from_key(str(bytearray([0, 0, 0, 0])))
+        token = Murmur3Token.from_key(bytes([0, 0, 0, 0]))
         # import ipdb; ipdb.set_trace()
         # run with force flag
         node1.repair(options=['ks', '-st', str(token.value - 1), '-et', str(token.value)])

http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/repair_tests/preview_repair_test.py
----------------------------------------------------------------------
diff --git a/repair_tests/preview_repair_test.py b/repair_tests/preview_repair_test.py
index 86627ab..ee5a38d 100644
--- a/repair_tests/preview_repair_test.py
+++ b/repair_tests/preview_repair_test.py
@@ -1,23 +1,25 @@
+import pytest
 import time
 
 from cassandra import ConsistencyLevel
 from cassandra.query import SimpleStatement
 
 from dtest import Tester
-from tools.decorators import no_vnodes, since
+
+since = pytest.mark.since
 
 
 @since('4.0')
-class PreviewRepairTest(Tester):
+class TestPreviewRepair(Tester):
 
     def assert_no_repair_history(self, session):
         rows = session.execute("select * from system_distributed.repair_history")
-        self.assertEqual(rows.current_rows, [])
+        assert rows.current_rows == []
         rows = session.execute("select * from system_distributed.parent_repair_history")
-        self.assertEqual(rows.current_rows, [])
+        assert rows.current_rows == []
 
-    @no_vnodes()
-    def preview_test(self):
+    @pytest.mark.no_vnodes
+    def test_preview(self):
         """ Test that preview correctly detects out of sync data """
         cluster = self.cluster
         cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
@@ -30,7 +32,7 @@ class PreviewRepairTest(Tester):
 
         # everything should be in sync
         result = node1.repair(options=['ks', '--preview'])
-        self.assertIn("Previewed data was in sync", result.stdout)
+        assert "Previewed data was in sync" in result.stdout
         self.assert_no_repair_history(session)
 
         # make data inconsistent between nodes
@@ -57,16 +59,16 @@ class PreviewRepairTest(Tester):
 
         # data should not be in sync for full and unrepaired previews
         result = node1.repair(options=['ks', '--preview'])
-        self.assertIn("Total estimated streaming", result.stdout)
-        self.assertNotIn("Previewed data was in sync", result.stdout)
+        assert "Total estimated streaming" in result.stdout
+        assert "Previewed data was in sync" not in result.stdout
 
         result = node1.repair(options=['ks', '--preview', '--full'])
-        self.assertIn("Total estimated streaming", result.stdout)
-        self.assertNotIn("Previewed data was in sync", result.stdout)
+        assert "Total estimated streaming" in result.stdout
+        assert "Previewed data was in sync" not in result.stdout
 
         # repaired data should be in sync anyway
         result = node1.repair(options=['ks', '--validate'])
-        self.assertIn("Repaired data is in sync", result.stdout)
+        assert "Repaired data is in sync" in result.stdout
 
         self.assert_no_repair_history(session)
 
@@ -77,10 +79,10 @@ class PreviewRepairTest(Tester):
 
         # ...and everything should be in sync
         result = node1.repair(options=['ks', '--preview'])
-        self.assertIn("Previewed data was in sync", result.stdout)
+        assert "Previewed data was in sync" in result.stdout
 
         result = node1.repair(options=['ks', '--preview', '--full'])
-        self.assertIn("Previewed data was in sync", result.stdout)
+        assert "Previewed data was in sync" in result.stdout
 
         result = node1.repair(options=['ks', '--validate'])
-        self.assertIn("Repaired data is in sync", result.stdout)
+        assert "Repaired data is in sync" in result.stdout


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message