From commits-return-205767-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Mon Jan 29 22:10:17 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 5BAB2180654 for ; Mon, 29 Jan 2018 22:10:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4BA4C160C56; Mon, 29 Jan 2018 21:10:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 00FD2160C2F for ; Mon, 29 Jan 2018 22:10:14 +0100 (CET) Received: (qmail 32515 invoked by uid 500); 29 Jan 2018 21:10:08 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 32294 invoked by uid 99); 29 Jan 2018 21:10:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jan 2018 21:10:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1DBCFE97E7; Mon, 29 Jan 2018 21:10:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aweisberg@apache.org To: commits@cassandra.apache.org Date: Mon, 29 Jan 2018 21:10:25 -0000 Message-Id: <9c94146e3ad04909871d31ac8f5d341d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/36] cassandra-dtest git commit: Migrate dtests to use pytest and python3 http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/paxos_test.py ---------------------------------------------------------------------- diff --git a/paxos_test.py b/paxos_test.py new file mode 100644 index 0000000..736ca46 --- /dev/null +++ b/paxos_test.py @@ -0,0 +1,195 @@ +import time +import pytest +import logging + +from threading import Thread + +from cassandra import ConsistencyLevel, WriteTimeout +from cassandra.query import SimpleStatement + +from tools.assertions import assert_unavailable +from dtest import Tester, create_ks + +since = pytest.mark.since +logger = logging.getLogger(__name__) + + +@since('2.0.6') +class TestPaxos(Tester): + + def prepare(self, ordered=False, create_keyspace=True, use_cache=False, nodes=1, rf=1): + cluster = self.cluster + + if (ordered): + cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner") + + if (use_cache): + cluster.set_configuration_options(values={'row_cache_size_in_mb': 100}) + + cluster.populate(nodes).start() + node1 = cluster.nodelist()[0] + time.sleep(0.2) + + session = self.patient_cql_connection(node1) + if create_keyspace: + create_ks(session, 'ks', rf) + return session + + def test_replica_availability(self): + """ + @jira_ticket CASSANDRA-8640 + + Regression test for a bug (CASSANDRA-8640) that required all nodes to + be available in order to run LWT queries, even if the query could + complete correctly with quorum nodes available. + """ + session = self.prepare(nodes=3, rf=3) + session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)") + session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS") + + self.cluster.nodelist()[2].stop() + session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS") + + self.cluster.nodelist()[1].stop() + assert_unavailable(session.execute, "INSERT INTO test (k, v) VALUES (2, 2) IF NOT EXISTS") + + self.cluster.nodelist()[1].start(wait_for_binary_proto=True, wait_other_notice=True) + session.execute("INSERT INTO test (k, v) VALUES (3, 3) IF NOT EXISTS") + + self.cluster.nodelist()[2].start(wait_for_binary_proto=True) + session.execute("INSERT INTO test (k, v) VALUES (4, 4) IF NOT EXISTS") + + @pytest.mark.no_vnodes + def test_cluster_availability(self): + # Warning, a change in partitioner or a change in CCM token allocation + # may require the partition keys of these inserts to be changed. + # This must not use vnodes as it relies on assumed token values. + + session = self.prepare(nodes=3) + session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)") + session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS") + + self.cluster.nodelist()[2].stop() + session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS") + + self.cluster.nodelist()[1].stop() + session.execute("INSERT INTO test (k, v) VALUES (3, 2) IF NOT EXISTS") + + self.cluster.nodelist()[1].start(wait_for_binary_proto=True) + session.execute("INSERT INTO test (k, v) VALUES (5, 5) IF NOT EXISTS") + + self.cluster.nodelist()[2].start(wait_for_binary_proto=True) + session.execute("INSERT INTO test (k, v) VALUES (6, 6) IF NOT EXISTS") + + def test_contention_multi_iterations(self): + pytest.skip("Hanging the build") + self._contention_test(8, 100) + + # Warning, this test will require you to raise the open + # file limit on OSX. Use 'ulimit -n 1000' + def test_contention_many_threads(self): + self._contention_test(300, 1) + + def _contention_test(self, threads, iterations): + """ + Test threads repeatedly contending on the same row. + """ + + verbose = False + + session = self.prepare(nodes=3) + session.execute("CREATE TABLE test (k int, v int static, id int, PRIMARY KEY (k, id))") + session.execute("INSERT INTO test(k, v) VALUES (0, 0)") + + class Worker(Thread): + + def __init__(self, wid, session, iterations, query): + Thread.__init__(self) + self.wid = wid + self.iterations = iterations + self.query = query + self.session = session + self.errors = 0 + self.retries = 0 + + def run(self): + global worker_done + i = 0 + prev = 0 + while i < self.iterations: + done = False + while not done: + try: + res = self.session.execute(self.query, (prev + 1, prev, self.wid)) + if verbose: + print("[%3d] CAS %3d -> %3d (res: %s)" % (self.wid, prev, prev + 1, str(res))) + if res[0][0] is True: + done = True + prev = prev + 1 + else: + self.retries = self.retries + 1 + # There is 2 conditions, so 2 reasons to fail: if we failed because the row with our + # worker ID already exists, it means we timeout earlier but our update did went in, + # so do consider this as a success + prev = res[0][3] + if res[0][2] is not None: + if verbose: + print("[%3d] Update was inserted on previous try (res = %s)" % (self.wid, str(res))) + done = True + except WriteTimeout as e: + if verbose: + print("[%3d] TIMEOUT (%s)" % (self.wid, str(e))) + # This means a timeout: just retry, if it happens that our update was indeed persisted, + # we'll figure it out on the next run. + self.retries = self.retries + 1 + except Exception as e: + if verbose: + print("[%3d] ERROR: %s" % (self.wid, str(e))) + self.errors = self.errors + 1 + done = True + i = i + 1 + # Clean up for next iteration + while True: + try: + self.session.execute("DELETE FROM test WHERE k = 0 AND id = %d IF EXISTS" % self.wid) + break + except WriteTimeout as e: + pass + + nodes = self.cluster.nodelist() + workers = [] + + c = self.patient_cql_connection(nodes[0], keyspace='ks') + q = c.prepare(""" + BEGIN BATCH + UPDATE test SET v = ? WHERE k = 0 IF v = ?; + INSERT INTO test (k, id) VALUES (0, ?) IF NOT EXISTS; + APPLY BATCH + """) + + for n in range(0, threads): + workers.append(Worker(n, c, iterations, q)) + + start = time.time() + + for w in workers: + w.start() + + for w in workers: + w.join() + + if verbose: + runtime = time.time() - start + print("runtime:", runtime) + + query = SimpleStatement("SELECT v FROM test WHERE k = 0", consistency_level=ConsistencyLevel.ALL) + rows = session.execute(query) + value = rows[0][0] + + errors = 0 + retries = 0 + for w in workers: + errors = errors + w.errors + retries = retries + w.retries + + assert (value == threads * iterations) and (errors == 0), "value={}, errors={}, retries={}".format(value, errors, retries) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/paxos_tests.py ---------------------------------------------------------------------- diff --git a/paxos_tests.py b/paxos_tests.py deleted file mode 100644 index 6c0bd28..0000000 --- a/paxos_tests.py +++ /dev/null @@ -1,192 +0,0 @@ -# coding: utf-8 - -import time -from threading import Thread - -from cassandra import ConsistencyLevel, WriteTimeout -from cassandra.query import SimpleStatement - -from tools.assertions import assert_unavailable -from dtest import Tester, create_ks -from tools.decorators import no_vnodes, since - - -@since('2.0.6') -class TestPaxos(Tester): - - def prepare(self, ordered=False, create_keyspace=True, use_cache=False, nodes=1, rf=1): - cluster = self.cluster - - if (ordered): - cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner") - - if (use_cache): - cluster.set_configuration_options(values={'row_cache_size_in_mb': 100}) - - cluster.populate(nodes).start() - node1 = cluster.nodelist()[0] - time.sleep(0.2) - - session = self.patient_cql_connection(node1) - if create_keyspace: - create_ks(session, 'ks', rf) - return session - - def replica_availability_test(self): - """ - @jira_ticket CASSANDRA-8640 - - Regression test for a bug (CASSANDRA-8640) that required all nodes to - be available in order to run LWT queries, even if the query could - complete correctly with quorum nodes available. - """ - session = self.prepare(nodes=3, rf=3) - session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)") - session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS") - - self.cluster.nodelist()[2].stop() - session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS") - - self.cluster.nodelist()[1].stop() - assert_unavailable(session.execute, "INSERT INTO test (k, v) VALUES (2, 2) IF NOT EXISTS") - - self.cluster.nodelist()[1].start(wait_for_binary_proto=True, wait_other_notice=True) - session.execute("INSERT INTO test (k, v) VALUES (3, 3) IF NOT EXISTS") - - self.cluster.nodelist()[2].start(wait_for_binary_proto=True) - session.execute("INSERT INTO test (k, v) VALUES (4, 4) IF NOT EXISTS") - - @no_vnodes() - def cluster_availability_test(self): - # Warning, a change in partitioner or a change in CCM token allocation - # may require the partition keys of these inserts to be changed. - # This must not use vnodes as it relies on assumed token values. - - session = self.prepare(nodes=3) - session.execute("CREATE TABLE test (k int PRIMARY KEY, v int)") - session.execute("INSERT INTO test (k, v) VALUES (0, 0) IF NOT EXISTS") - - self.cluster.nodelist()[2].stop() - session.execute("INSERT INTO test (k, v) VALUES (1, 1) IF NOT EXISTS") - - self.cluster.nodelist()[1].stop() - session.execute("INSERT INTO test (k, v) VALUES (3, 2) IF NOT EXISTS") - - self.cluster.nodelist()[1].start(wait_for_binary_proto=True) - session.execute("INSERT INTO test (k, v) VALUES (5, 5) IF NOT EXISTS") - - self.cluster.nodelist()[2].start(wait_for_binary_proto=True) - session.execute("INSERT INTO test (k, v) VALUES (6, 6) IF NOT EXISTS") - - def contention_test_multi_iterations(self): - self.skipTest("Hanging the build") - self._contention_test(8, 100) - - # Warning, this test will require you to raise the open - # file limit on OSX. Use 'ulimit -n 1000' - def contention_test_many_threads(self): - self._contention_test(300, 1) - - def _contention_test(self, threads, iterations): - """ - Test threads repeatedly contending on the same row. - """ - - verbose = False - - session = self.prepare(nodes=3) - session.execute("CREATE TABLE test (k int, v int static, id int, PRIMARY KEY (k, id))") - session.execute("INSERT INTO test(k, v) VALUES (0, 0)") - - class Worker(Thread): - - def __init__(self, wid, session, iterations, query): - Thread.__init__(self) - self.wid = wid - self.iterations = iterations - self.query = query - self.session = session - self.errors = 0 - self.retries = 0 - - def run(self): - global worker_done - i = 0 - prev = 0 - while i < self.iterations: - done = False - while not done: - try: - res = self.session.execute(self.query, (prev + 1, prev, self.wid)) - if verbose: - print "[%3d] CAS %3d -> %3d (res: %s)" % (self.wid, prev, prev + 1, str(res)) - if res[0][0] is True: - done = True - prev = prev + 1 - else: - self.retries = self.retries + 1 - # There is 2 conditions, so 2 reasons to fail: if we failed because the row with our - # worker ID already exists, it means we timeout earlier but our update did went in, - # so do consider this as a success - prev = res[0][3] - if res[0][2] is not None: - if verbose: - print "[%3d] Update was inserted on previous try (res = %s)" % (self.wid, str(res)) - done = True - except WriteTimeout as e: - if verbose: - print "[%3d] TIMEOUT (%s)" % (self.wid, str(e)) - # This means a timeout: just retry, if it happens that our update was indeed persisted, - # we'll figure it out on the next run. - self.retries = self.retries + 1 - except Exception as e: - if verbose: - print "[%3d] ERROR: %s" % (self.wid, str(e)) - self.errors = self.errors + 1 - done = True - i = i + 1 - # Clean up for next iteration - while True: - try: - self.session.execute("DELETE FROM test WHERE k = 0 AND id = %d IF EXISTS" % self.wid) - break - except WriteTimeout as e: - pass - - nodes = self.cluster.nodelist() - workers = [] - - c = self.patient_cql_connection(nodes[0], keyspace='ks') - q = c.prepare(""" - BEGIN BATCH - UPDATE test SET v = ? WHERE k = 0 IF v = ?; - INSERT INTO test (k, id) VALUES (0, ?) IF NOT EXISTS; - APPLY BATCH - """) - - for n in range(0, threads): - workers.append(Worker(n, c, iterations, q)) - - start = time.time() - - for w in workers: - w.start() - - for w in workers: - w.join() - - if verbose: - runtime = time.time() - start - print "runtime:", runtime - - query = SimpleStatement("SELECT v FROM test WHERE k = 0", consistency_level=ConsistencyLevel.ALL) - rows = session.execute(query) - value = rows[0][0] - - errors = 0 - retries = 0 - for w in workers: - errors = errors + w.errors - retries = retries + w.retries - - self.assertTrue((value == threads * iterations) and (errors == 0), "value={}, errors={}, retries={}".format(value, errors, retries)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pending_range_test.py ---------------------------------------------------------------------- diff --git a/pending_range_test.py b/pending_range_test.py index efa56f4..55d810b 100644 --- a/pending_range_test.py +++ b/pending_range_test.py @@ -1,21 +1,25 @@ +import logging +import pytest + from cassandra.query import SimpleStatement -from nose.plugins.attrib import attr -from dtest import TRACE, Tester, debug, create_ks -from tools.decorators import no_vnodes +from dtest import Tester, create_ks +from plugins.assert_tools import assert_regexp_matches + +logger = logging.getLogger(__name__) -@no_vnodes() +@pytest.mark.no_vnodes class TestPendingRangeMovements(Tester): - @attr('resource-intensive') - def pending_range_test(self): + @pytest.mark.resource_intensive + def test_pending_range(self): """ @jira_ticket CASSANDRA-10887 """ cluster = self.cluster # If we are on 2.1, we need to set the log level to debug or higher, as debug.log does not exist. - if cluster.version() < '2.2' and not TRACE: + if cluster.version() < '2.2': cluster.set_log_level('DEBUG') # Create 5 node cluster @@ -35,7 +39,7 @@ class TestPendingRangeMovements(Tester): lwt_query = SimpleStatement("UPDATE users SET email = 'janedoe@abc.com' WHERE login = 'jdoe3' IF email = 'jdoe@abc.com'") # Show we can execute LWT no problem - for i in xrange(1000): + for i in range(1000): session.execute(lwt_query) token = '-634023222112864484' @@ -61,9 +65,9 @@ class TestPendingRangeMovements(Tester): # Verify other nodes believe this is Down/Moving out, _, _ = node2.nodetool('ring') - debug("Nodetool Ring output: {}".format(out)) - self.assertRegexpMatches(out, '127\.0\.0\.1.*?Down.*?Moving') + logger.debug("Nodetool Ring output: {}".format(out)) + assert_regexp_matches(out, '127\.0\.0\.1.*?Down.*?Moving') # Check we can still execute LWT - for i in xrange(1000): + for i in range(1000): session.execute(lwt_query) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/assert_tools.py ---------------------------------------------------------------------- diff --git a/plugins/assert_tools.py b/plugins/assert_tools.py new file mode 100644 index 0000000..9f796ed --- /dev/null +++ b/plugins/assert_tools.py @@ -0,0 +1,138 @@ +""" +Copyright 2016 Oliver Schoenborn. BSD 3-Clause license (see __license__ at bottom of this file for details). + +This module is part of the nose2pytest distribution. + +This module's assert_ functions provide drop-in replacements for nose.tools.assert_ functions (many of which are +pep-8-ized extractions from Python's unittest.case.TestCase methods). As such, it can be imported in a test +suite run by py.test, to replace the nose imports with functions that rely on py.test's assertion +introspection for error reporting. When combined with running nose2pytest.py on your test suite, this +module may be sufficient to decrease your test suite's third-party dependencies by 1. +""" + +import unittest + + +__all__ = [ + 'assert_almost_equal', + 'assert_not_almost_equal', + 'assert_dict_contains_subset', + + 'assert_raises_regex', + 'assert_raises_regexp', + 'assert_regexp_matches', + 'assert_warns_regex', +] + + +def assert_almost_equal(a, b, places=7, msg=None): + """ + Fail if the two objects are unequal as determined by their + difference rounded to the given number of decimal places + and comparing to zero. + + Note that decimal places (from zero) are usually not the same + as significant digits (measured from the most signficant digit). + + See the builtin round() function for places parameter. + """ + if msg is None: + assert round(abs(b - a), places) == 0 + else: + assert round(abs(b - a), places) == 0, msg + + +def assert_not_almost_equal(a, b, places=7, msg=None): + """ + Fail if the two objects are equal as determined by their + difference rounded to the given number of decimal places + and comparing to zero. + + Note that decimal places (from zero) are usually not the same + as significant digits (measured from the most signficant digit). + + See the builtin round() function for places parameter. + """ + if msg is None: + assert round(abs(b - a), places) != 0 + else: + assert round(abs(b - a), places) != 0, msg + + +def assert_dict_contains_subset(subset, dictionary, msg=None): + """ + Checks whether dictionary is a superset of subset. If not, the assertion message will have useful details, + unless msg is given, then msg is output. + """ + dictionary = dictionary + missing_keys = sorted(list(set(subset.keys()) - set(dictionary.keys()))) + mismatch_vals = {k: (subset[k], dictionary[k]) for k in subset if k in dictionary and subset[k] != dictionary[k]} + if msg is None: + assert missing_keys == [], 'Missing keys = {}'.format(missing_keys) + assert mismatch_vals == {}, 'Mismatched values (s, d) = {}'.format(mismatch_vals) + else: + assert missing_keys == [], msg + assert mismatch_vals == {}, msg + + +# make other unittest.TestCase methods available as-is as functions; trick taken from Nose + +class _Dummy(unittest.TestCase): + def do_nothing(self): + pass + +_t = _Dummy('do_nothing') + +assert_raises_regex=_t.assertRaisesRegex, +assert_raises_regexp=_t.assertRaisesRegexp, +assert_regexp_matches=_t.assertRegexpMatches, +assert_warns_regex=_t.assertWarnsRegex, + +del _Dummy +del _t + + +# py.test integration: add all assert_ function to the pytest package namespace + +# Use similar trick as Nose to bring in bound methods from unittest.TestCase as free functions: + +def pytest_namespace() -> {str: callable}: + namespace = {} + for name, obj in globals().items(): + if name.startswith('assert_'): + namespace[name] = obj + + return namespace + + +# licensing + +__license__ = """ + Copyright (c) 2016, Oliver Schoenborn + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * Neither the name of nose2pytest nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestcollect.py ---------------------------------------------------------------------- diff --git a/plugins/dtestcollect.py b/plugins/dtestcollect.py deleted file mode 100644 index 6b2dac4..0000000 --- a/plugins/dtestcollect.py +++ /dev/null @@ -1,92 +0,0 @@ -import os -from nose.plugins.base import Plugin -from nose.case import Test -import logging -import unittest - -log = logging.getLogger(__name__) - - -class DTestCollect(Plugin): - """ - Collect and output test names only, don't run any tests. - """ - name = 'dtest_collect' - enableOpt = 'dtest_collect_only' - - def options(self, parser, env): - """Register commandline options. - """ - parser.add_option('--dtest-collect-only', - action='store_true', - dest=self.enableOpt, - default=env.get('DTEST_NOSE_COLLECT_ONLY'), - help="Enable collect-only: %s [COLLECT_ONLY]" % - (self.help())) - - def prepareTestLoader(self, loader): - """Install collect-only suite class in TestLoader. - """ - # Disable context awareness - log.debug("Preparing test loader") - loader.suiteClass = TestSuiteFactory(self.conf) - - def prepareTestCase(self, test): - """Replace actual test with dummy that always passes. - """ - # Return something that always passes - log.debug("Preparing test case %s", test) - if not isinstance(test, Test): - return - - def run(result): - # We need to make these plugin calls because there won't be - # a result proxy, due to using a stripped-down test suite - self.conf.plugins.startTest(test) - result.startTest(test) - self.conf.plugins.addSuccess(test) - result.addSuccess(test) - self.conf.plugins.stopTest(test) - result.stopTest(test) - return run - - def describeTest(self, test): - tag = os.getenv('TEST_TAG', '') - if tag == '': - tag = test.test._testMethodName - else: - tag = test.test._testMethodName + "-" + tag - retval = "%s:%s.%s" % (test.test.__module__, test.test.__class__.__name__, tag) - return retval - - -class TestSuiteFactory: - """ - Factory for producing configured test suites. - """ - def __init__(self, conf): - self.conf = conf - - def __call__(self, tests=(), **kw): - return TestSuite(tests, conf=self.conf) - - -class TestSuite(unittest.TestSuite): - """ - Basic test suite that bypasses most proxy and plugin calls, but does - wrap tests in a nose.case.Test so prepareTestCase will be called. - """ - def __init__(self, tests=(), conf=None): - self.conf = conf - # Exec lazy suites: makes discovery depth-first - if callable(tests): - tests = tests() - log.debug("TestSuite(%r)", tests) - unittest.TestSuite.__init__(self, tests) - - def addTest(self, test): - log.debug("Add test %s", test) - if isinstance(test, unittest.TestSuite): - self._tests.append(test) - else: - self._tests.append(Test(test, config=self.conf)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestconfig.py ---------------------------------------------------------------------- diff --git a/plugins/dtestconfig.py b/plugins/dtestconfig.py deleted file mode 100644 index 5363d9a..0000000 --- a/plugins/dtestconfig.py +++ /dev/null @@ -1,42 +0,0 @@ -from collections import namedtuple - -from nose import plugins - -# A class that defines the attributes that have to be defined for configuring -# a dtest run. namedtuple does what we want -- it's immutable and requires -# all the attributes to be passed in to be instantiated. -GlobalConfigObject = namedtuple('GlobalConfigObject', [ - 'vnodes', # disable or enable vnodes -]) - -_CONFIG = None - - -class DtestConfigPlugin(plugins.Plugin): - """ - Pass in configuration options for the dtests. - """ - enabled = True # if this plugin is loaded at all, we're using it - name = 'dtest_config' - - def __init__(self, config=None): - """ - Instantiate this plugin with a GlobalConfigObject or, by default, None. - Then, set the global _CONFIG constant with the value of the plugin. - - This is a little weird, yes, but nose seems to generally be built - around the idea that a given plugin will be instantiated only once, so - this provides a way for test framework code to grab the value off this - module. We want that, since the plugin itself isn't available to test - code. - - @param config an object meeting the GlobalConfigObject spec that will - be used as configuration for a dtest run. - """ - self.CONFIG = config - - global _CONFIG - _CONFIG = self.CONFIG - - def configure(self, options, conf): - pass http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtesttag.py ---------------------------------------------------------------------- diff --git a/plugins/dtesttag.py b/plugins/dtesttag.py deleted file mode 100644 index e1ebb74..0000000 --- a/plugins/dtesttag.py +++ /dev/null @@ -1,45 +0,0 @@ -from nose import plugins -import os -import inspect - - -class DTestTag(plugins.Plugin): - enabled = True # if this plugin is loaded at all, we're using it - name = 'dtest_tag' - - def __init__(self): - pass - - def configure(self, options, conf): - pass - - def nice_classname(self, obj): - """Returns a nice name for class object or class instance. - - >>> nice_classname(Exception()) # doctest: +ELLIPSIS - '...Exception' - >>> nice_classname(Exception) # doctest: +ELLIPSIS - '...Exception' - - """ - if inspect.isclass(obj): - cls_name = obj.__name__ - else: - cls_name = obj.__class__.__name__ - mod = inspect.getmodule(obj) - if mod: - name = mod.__name__ - # jython - if name.startswith('org.python.core.'): - name = name[len('org.python.core.'):] - return "%s.%s" % (name, cls_name) - else: - return cls_name - - def describeTest(self, test): - tag = os.getenv('TEST_TAG', '') - if tag == '': - tag = test.test._testMethodName - else: - tag = test.test._testMethodName + "-" + tag - return "%s (%s)" % (tag, self.nice_classname(test.test)) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/plugins/dtestxunit.py ---------------------------------------------------------------------- diff --git a/plugins/dtestxunit.py b/plugins/dtestxunit.py deleted file mode 100644 index c996d53..0000000 --- a/plugins/dtestxunit.py +++ /dev/null @@ -1,348 +0,0 @@ -"""This plugin provides test results in the standard XUnit XML format. - -It's designed for the `Jenkins`_ (previously Hudson) continuous build -system, but will probably work for anything else that understands an -XUnit-formatted XML representation of test results. - -Add this shell command to your builder :: - - nosetests --with-dtestxunit - -And by default a file named nosetests.xml will be written to the -working directory. - -In a Jenkins builder, tick the box named "Publish JUnit test result report" -under the Post-build Actions and enter this value for Test report XMLs:: - - **/nosetests.xml - -If you need to change the name or location of the file, you can set the -``--dtestxunit-file`` option. - -If you need to change the name of the test suite, you can set the -``--dtestxunit-testsuite-name`` option. - -Here is an abbreviated version of what an XML test report might look like:: - - - - - - Traceback (most recent call last): - ... - TypeError: oops, wrong type - - - - -.. _Jenkins: http://jenkins-ci.org/ - -""" -import codecs -import os -import sys -import re -import inspect -from StringIO import StringIO -from time import time -from xml.sax import saxutils - -from nose.plugins.base import Plugin -from nose.exc import SkipTest -from nose.pyversion import force_unicode, format_exception - -# Invalid XML characters, control characters 0-31 sans \t, \n and \r -CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]") - -TEST_ID = re.compile(r'^(.*?)(\(.*\))$') - - -def xml_safe(value): - """Replaces invalid XML characters with '?'.""" - return CONTROL_CHARACTERS.sub('?', value) - - -def escape_cdata(cdata): - """Escape a string for an XML CDATA section.""" - return xml_safe(cdata).replace(']]>', ']]>]]>>> nice_classname(Exception()) # doctest: +ELLIPSIS - '...Exception' - >>> nice_classname(Exception) # doctest: +ELLIPSIS - '...Exception' - - """ - if inspect.isclass(obj): - cls_name = obj.__name__ - else: - cls_name = obj.__class__.__name__ - mod = inspect.getmodule(obj) - if mod: - name = mod.__name__ - # jython - if name.startswith('org.python.core.'): - name = name[len('org.python.core.'):] - return "%s.%s" % (name, cls_name) - else: - return cls_name - - -def exc_message(exc_info): - """Return the exception's message.""" - exc = exc_info[1] - if exc is None: - # str exception - result = exc_info[0] - else: - try: - result = str(exc) - except UnicodeEncodeError: - try: - result = unicode(exc) - except UnicodeError: - # Fallback to args as neither str nor - # unicode(Exception(u'\xe6')) work in Python < 2.6 - result = exc.args[0] - result = force_unicode(result, 'UTF-8') - return xml_safe(result) - - -class Tee(object): - def __init__(self, encoding, *args): - self._encoding = encoding - self._streams = args - - def write(self, data): - data = force_unicode(data, self._encoding) - for s in self._streams: - s.write(data) - - def writelines(self, lines): - for line in lines: - self.write(line) - - def flush(self): - for s in self._streams: - s.flush() - - def isatty(self): - return False - - -class DTestXunit(Plugin): - """This plugin provides test results in the standard XUnit XML format.""" - name = 'dtestxunit' - score = 1500 - encoding = 'UTF-8' - error_report_file = None - - def __init__(self): - super(DTestXunit, self).__init__() - self._capture_stack = [] - self._currentStdout = None - self._currentStderr = None - - def _timeTaken(self): - if hasattr(self, '_timer'): - taken = time() - self._timer - else: - # test died before it ran (probably error in setup()) - # or success/failure added before test started probably - # due to custom TestResult munging - taken = 0.0 - return taken - - def _quoteattr(self, attr): - """Escape an XML attribute. Value can be unicode.""" - attr = xml_safe(attr) - return saxutils.quoteattr(attr) - - def options(self, parser, env): - """Sets additional command line options.""" - Plugin.options(self, parser, env) - parser.add_option( - '--dtestxunit-file', action='store', - dest='dtestxunit_file', metavar="FILE", - default=env.get('NOSE_XUNIT_FILE', 'nosetests.xml'), - help=("Path to xml file to store the xunit report in. " - "Default is nosetests.xml in the working directory " - "[NOSE_XUNIT_FILE]")) - - parser.add_option( - '--dtestxunit-testsuite-name', action='store', - dest='dtestxunit_testsuite_name', metavar="PACKAGE", - default=env.get('NOSE_XUNIT_TESTSUITE_NAME', 'nosetests'), - help=("Name of the testsuite in the xunit xml, generated by plugin. " - "Default test suite name is nosetests.")) - - def configure(self, options, config): - """Configures the xunit plugin.""" - Plugin.configure(self, options, config) - self.config = config - if self.enabled: - self.stats = {'errors': 0, - 'failures': 0, - 'passes': 0, - 'skipped': 0 - } - self.errorlist = [] - self.error_report_file_name = os.path.realpath(options.dtestxunit_file) - self.xunit_testsuite_name = options.dtestxunit_testsuite_name - - def report(self, stream): - """Writes an Xunit-formatted XML file - - The file includes a report of test errors and failures. - - """ - self.error_report_file = codecs.open(self.error_report_file_name, 'w', - self.encoding, 'replace') - self.stats['encoding'] = self.encoding - self.stats['testsuite_name'] = self.xunit_testsuite_name - self.stats['total'] = (self.stats['errors'] + self.stats['failures'] + - self.stats['passes'] + self.stats['skipped']) - self.error_report_file.write( - u'' - u'' % self.stats) - self.error_report_file.write(u''.join([force_unicode(e, self.encoding) - for e in self.errorlist])) - self.error_report_file.write(u'') - self.error_report_file.close() - if self.config.verbosity > 1: - stream.writeln("-" * 70) - stream.writeln("XML: %s" % self.error_report_file.name) - - def _startCapture(self): - self._capture_stack.append((sys.stdout, sys.stderr)) - self._currentStdout = StringIO() - self._currentStderr = StringIO() - sys.stdout = Tee(self.encoding, self._currentStdout, sys.stdout) - sys.stderr = Tee(self.encoding, self._currentStderr, sys.stderr) - - def startContext(self, context): - self._startCapture() - - def stopContext(self, context): - self._endCapture() - - def beforeTest(self, test): - """Initializes a timer before starting a test.""" - self._timer = time() - self._startCapture() - - def _endCapture(self): - if self._capture_stack: - sys.stdout, sys.stderr = self._capture_stack.pop() - - def afterTest(self, test): - self._endCapture() - self._currentStdout = None - self._currentStderr = None - - def finalize(self, test): - while self._capture_stack: - self._endCapture() - - def _getCapturedStdout(self): - if self._currentStdout: - value = self._currentStdout.getvalue() - if value: - return '' % escape_cdata(value) - return '' - - def _getCapturedStderr(self): - if self._currentStderr: - value = self._currentStderr.getvalue() - if value: - return '' % escape_cdata(value) - return '' - - def addError(self, test, err, capt=None): - """Add error output to Xunit report. - """ - taken = self._timeTaken() - - if issubclass(err[0], SkipTest): - type = 'skipped' - self.stats['skipped'] += 1 - else: - type = 'error' - self.stats['errors'] += 1 - - tb = format_exception(err, self.encoding) - id = test.id() - - self.errorlist.append( - u'' - u'<%(type)s type=%(errtype)s message=%(message)s>' - u'%(systemout)s%(systemerr)s' % - {'cls': self._quoteattr(id_split(id)[0]), - 'name': self._quoteattr(id_split(id)[-1]), - 'taken': taken, - 'type': type, - 'errtype': self._quoteattr(nice_classname(err[0])), - 'message': self._quoteattr(exc_message(err)), - 'tb': escape_cdata(tb), - 'systemout': self._getCapturedStdout(), - 'systemerr': self._getCapturedStderr(), - }) - - def addFailure(self, test, err, capt=None, tb_info=None): - """Add failure output to Xunit report. - """ - taken = self._timeTaken() - tb = format_exception(err, self.encoding) - self.stats['failures'] += 1 - id = test.id() - - self.errorlist.append( - u'' - u'' - u'%(systemout)s%(systemerr)s' % - {'cls': self._quoteattr(id_split(id)[0]), - 'name': self._quoteattr(id_split(id)[-1]), - 'taken': taken, - 'errtype': self._quoteattr(nice_classname(err[0])), - 'message': self._quoteattr(exc_message(err)), - 'tb': escape_cdata(tb), - 'systemout': self._getCapturedStdout(), - 'systemerr': self._getCapturedStderr(), - }) - - def addSuccess(self, test, capt=None): - """Add success output to Xunit report. - """ - taken = self._timeTaken() - self.stats['passes'] += 1 - id = test.id() - self.errorlist.append( - '%(systemout)s%(systemerr)s' % - {'cls': self._quoteattr(id_split(id)[0]), - 'name': self._quoteattr(id_split(id)[-1]), - 'taken': taken, - 'systemout': self._getCapturedStdout(), - 'systemerr': self._getCapturedStderr(), - }) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/prepared_statements_test.py ---------------------------------------------------------------------- diff --git a/prepared_statements_test.py b/prepared_statements_test.py index 35ca5aa..72ac9dd 100644 --- a/prepared_statements_test.py +++ b/prepared_statements_test.py @@ -1,7 +1,11 @@ +import logging + from cassandra import InvalidRequest from dtest import Tester +logger = logging.getLogger(__name__) + KEYSPACE = "foo" @@ -10,13 +14,12 @@ class TestPreparedStatements(Tester): Tests for pushed native protocol notification from Cassandra. """ - def dropped_index_test(self): + def test_dropped_index(self): """ Prepared statements using dropped indexes should be handled correctly """ - self.cluster.populate(1).start() - node = self.cluster.nodes.values()[0] + node = list(self.cluster.nodes.values())[0] session = self.patient_cql_connection(node) session.execute(""" @@ -33,14 +36,14 @@ class TestPreparedStatements(Tester): session.execute(insert_statement, (i, 0)) query_statement = session.prepare("SELECT * FROM mytable WHERE b=?") - print "Number of matching rows:", len(list(session.execute(query_statement, (0,)))) + print("Number of matching rows:", len(list(session.execute(query_statement, (0,))))) session.execute("DROP INDEX bindex") try: - print "Executing prepared statement with dropped index..." + print("Executing prepared statement with dropped index...") session.execute(query_statement, (0,)) except InvalidRequest as ir: - print ir + print(ir) except Exception: raise http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pushed_notifications_test.py ---------------------------------------------------------------------- diff --git a/pushed_notifications_test.py b/pushed_notifications_test.py index a8dcf81..9b888de 100644 --- a/pushed_notifications_test.py +++ b/pushed_notifications_test.py @@ -1,4 +1,7 @@ import time +import pytest +import logging + from datetime import datetime from distutils.version import LooseVersion from threading import Event @@ -7,10 +10,11 @@ from cassandra import ConsistencyLevel as CL from cassandra import ReadFailure from cassandra.query import SimpleStatement from ccmlib.node import Node, TimeoutError -from nose.tools import timed -from dtest import Tester, debug, get_ip_from_node, create_ks -from tools.decorators import no_vnodes, since +from dtest import Tester, get_ip_from_node, create_ks + +since = pytest.mark.since +logger = logging.getLogger(__name__) class NotificationWaiter(object): @@ -48,7 +52,7 @@ class NotificationWaiter(object): """ Called when a notification is pushed from Cassandra. """ - debug("Got {} from {} at {}".format(notification, self.address, datetime.now())) + logger.debug("Got {} from {} at {}".format(notification, self.address, datetime.now())) if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']: return # we are not interested in this schema change @@ -73,7 +77,7 @@ class NotificationWaiter(object): return self.notifications def clear_notifications(self): - debug("Clearing notifications...") + logger.debug("Clearing notifications...") self.notifications = [] self.event.clear() @@ -83,8 +87,8 @@ class TestPushedNotifications(Tester): Tests for pushed native protocol notification from Cassandra. """ - @no_vnodes() - def move_single_node_test(self): + @pytest.mark.no_vnodes + def test_move_single_node(self): """ @jira_ticket CASSANDRA-8516 Moving a token should result in MOVED_NODE notifications. @@ -92,30 +96,30 @@ class TestPushedNotifications(Tester): self.cluster.populate(3).start(wait_for_binary_proto=True, wait_other_notice=True) waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"]) - for node in self.cluster.nodes.values()] + for node in list(self.cluster.nodes.values())] # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are # late due to network delays let's block a bit longer - debug("Waiting for unwanted notifications....") + logger.debug("Waiting for unwanted notifications....") waiters[0].wait_for_notifications(timeout=30, num_notifications=2) waiters[0].clear_notifications() - debug("Issuing move command....") - node1 = self.cluster.nodes.values()[0] + logger.debug("Issuing move command....") + node1 = list(self.cluster.nodes.values())[0] node1.move("123") for waiter in waiters: - debug("Waiting for notification from {}".format(waiter.address,)) + logger.debug("Waiting for notification from {}".format(waiter.address,)) notifications = waiter.wait_for_notifications(60.0) - self.assertEquals(1, len(notifications), notifications) + assert 1 == len(notifications), notifications notification = notifications[0] change_type = notification["change_type"] address, port = notification["address"] - self.assertEquals("MOVED_NODE", change_type) - self.assertEquals(get_ip_from_node(node1), address) + assert "MOVED_NODE" == change_type + assert get_ip_from_node(node1) == address - @no_vnodes() - def move_single_node_localhost_test(self): + @pytest.mark.no_vnodes + def test_move_single_node_localhost(self): """ @jira_ticket CASSANDRA-10052 Test that we don't get NODE_MOVED notifications from nodes other than the local one, @@ -132,29 +136,28 @@ class TestPushedNotifications(Tester): cluster.start(wait_for_binary_proto=True, wait_other_notice=True) waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"]) - for node in self.cluster.nodes.values()] + for node in list(self.cluster.nodes.values())] # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are # late due to network delays let's block a bit longer - debug("Waiting for unwanted notifications...") + logger.debug("Waiting for unwanted notifications...") waiters[0].wait_for_notifications(timeout=30, num_notifications=2) waiters[0].clear_notifications() - debug("Issuing move command....") - node1 = self.cluster.nodes.values()[0] + logger.debug("Issuing move command....") + node1 = list(self.cluster.nodes.values())[0] node1.move("123") for waiter in waiters: - debug("Waiting for notification from {}".format(waiter.address,)) + logger.debug("Waiting for notification from {}".format(waiter.address,)) notifications = waiter.wait_for_notifications(30.0) - self.assertEquals(1 if waiter.node is node1 else 0, len(notifications), notifications) + assert 1 if waiter.node is node1 else 0 == len(notifications), notifications - def restart_node_test(self): + def test_restart_node(self): """ @jira_ticket CASSANDRA-7816 Restarting a node should generate exactly one DOWN and one UP notification """ - self.cluster.populate(2).start(wait_for_binary_proto=True, wait_other_notice=True) node1, node2 = self.cluster.nodelist() @@ -162,7 +165,7 @@ class TestPushedNotifications(Tester): # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications # don't confuse the state below. - debug("Waiting for unwanted notifications...") + logger.debug("Waiting for unwanted notifications...") waiter.wait_for_notifications(timeout=30, num_notifications=2) waiter.clear_notifications() @@ -171,25 +174,25 @@ class TestPushedNotifications(Tester): version = self.cluster.cassandra_version() expected_notifications = 2 if version >= '2.2' else 3 for i in range(5): - debug("Restarting second node...") + logger.debug("Restarting second node...") node2.stop(wait_other_notice=True) node2.start(wait_other_notice=True) - debug("Waiting for notifications from {}".format(waiter.address)) + logger.debug("Waiting for notifications from {}".format(waiter.address)) notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications) - self.assertEquals(expected_notifications, len(notifications), notifications) + assert expected_notifications, len(notifications) == notifications for notification in notifications: - self.assertEquals(get_ip_from_node(node2), notification["address"][0]) - self.assertEquals("DOWN", notifications[0]["change_type"]) + assert get_ip_from_node(node2) == notification["address"][0] + assert "DOWN" == notifications[0]["change_type"] if version >= '2.2': - self.assertEquals("UP", notifications[1]["change_type"]) + assert "UP" == notifications[1]["change_type"] else: # pre 2.2, we'll receive both a NEW_NODE and an UP notification, # but the order is not guaranteed - self.assertEquals({"NEW_NODE", "UP"}, set(map(lambda n: n["change_type"], notifications[1:]))) + assert {"NEW_NODE", "UP"} == set([n["change_type"] for n in notifications[1:]]) waiter.clear_notifications() - def restart_node_localhost_test(self): + def test_restart_node_localhost(self): """ Test that we don't get client notifications when rpc_address is set to localhost. @jira_ticket CASSANDRA-10052 @@ -209,17 +212,17 @@ class TestPushedNotifications(Tester): waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"]) # restart node 2 - debug("Restarting second node...") + logger.debug("Restarting second node...") node2.stop(wait_other_notice=True) node2.start(wait_other_notice=True) # check that node1 did not send UP or DOWN notification for node2 - debug("Waiting for notifications from {}".format(waiter.address,)) + logger.debug("Waiting for notifications from {}".format(waiter.address,)) notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2) - self.assertEquals(0, len(notifications), notifications) + assert 0 == len(notifications), notifications @since("2.2") - def add_and_remove_node_test(self): + def test_add_and_remove_node(self): """ Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave. @jira_ticket CASSANDRA-11038 @@ -231,7 +234,7 @@ class TestPushedNotifications(Tester): # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications # don't confuse the state below - debug("Waiting for unwanted notifications...") + logger.debug("Waiting for unwanted notifications...") waiter.wait_for_notifications(timeout=30, num_notifications=2) waiter.clear_notifications() @@ -240,29 +243,29 @@ class TestPushedNotifications(Tester): session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") - debug("Adding second node...") + logger.debug("Adding second node...") node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042)) self.cluster.add(node2, False) node2.start(wait_other_notice=True) - debug("Waiting for notifications from {}".format(waiter.address)) + logger.debug("Waiting for notifications from {}".format(waiter.address)) notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2) - self.assertEquals(2, len(notifications), notifications) + assert 2 == len(notifications), notifications for notification in notifications: - self.assertEquals(get_ip_from_node(node2), notification["address"][0]) - self.assertEquals("NEW_NODE", notifications[0]["change_type"]) - self.assertEquals("UP", notifications[1]["change_type"]) + assert get_ip_from_node(node2) == notification["address"][0] + assert "NEW_NODE" == notifications[0]["change_type"] + assert "UP" == notifications[1]["change_type"] - debug("Removing second node...") + logger.debug("Removing second node...") waiter.clear_notifications() node2.decommission() node2.stop(gently=False) - debug("Waiting for notifications from {}".format(waiter.address)) + logger.debug("Waiting for notifications from {}".format(waiter.address)) notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2) - self.assertEquals(2, len(notifications), notifications) + assert 2 == len(notifications), notifications for notification in notifications: - self.assertEquals(get_ip_from_node(node2), notification["address"][0]) - self.assertEquals("REMOVED_NODE", notifications[0]["change_type"]) - self.assertEquals("DOWN", notifications[1]["change_type"]) + assert get_ip_from_node(node2) == notification["address"][0] + assert "REMOVED_NODE" == notifications[0]["change_type"] + assert "DOWN" == notifications[1]["change_type"] def change_rpc_address_to_localhost(self): """ @@ -272,23 +275,22 @@ class TestPushedNotifications(Tester): i = 0 for node in cluster.nodelist(): - debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml') + logger.debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml') if cluster.version() < '4': node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i) node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i) node.import_config_files() # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address node.set_configuration_options(values={'rpc_address': 'localhost'}) - debug(node.show()) + logger.debug(node.show()) i += 2 @since("3.0") - def schema_changes_test(self): + def test_schema_changes(self): """ @jira_ticket CASSANDRA-10328 Creating, updating and dropping a keyspace, a table and a materialized view will generate the correct schema change notifications. """ - self.cluster.populate(2).start(wait_for_binary_proto=True) node1, node2 = self.cluster.nodelist() @@ -306,17 +308,34 @@ class TestPushedNotifications(Tester): session.execute("drop TABLE t") session.execute("drop KEYSPACE ks") - debug("Waiting for notifications from {}".format(waiter.address,)) + logger.debug("Waiting for notifications from {}".format(waiter.address,)) notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8) - self.assertEquals(8, len(notifications), notifications) - self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'KEYSPACE'}, notifications[0]) - self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u't'}, notifications[1]) - self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u't'}, notifications[2]) - self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[3]) - self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[4]) - self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[5]) - self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u't'}, notifications[6]) - self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'KEYSPACE'}, notifications[7]) + assert 8 == len(notifications), notifications + # assert dict contains subset + expected = {'change_type': 'CREATED', 'target_type': 'KEYSPACE'} + assert set(notifications[0].keys()) >= expected.keys() and {k: notifications[0][k] for k in expected if + k in notifications[0]} == expected + expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 't'} + assert set(notifications[1].keys()) >= expected.keys() and {k: notifications[1][k] for k in expected if + k in notifications[1]} == expected + expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 't'} + assert set(notifications[2].keys()) >= expected.keys() and {k: notifications[2][k] for k in expected if + k in notifications[2]} == expected + expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 'mv'} + assert set(notifications[3].keys()) >= expected.keys() and {k: notifications[3][k] for k in expected if + k in notifications[3]} == expected + expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 'mv'} + assert set(notifications[4].keys()) >= expected.keys() and {k: notifications[4][k] for k in expected if + k in notifications[4]} == expected + expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 'mv'} + assert set(notifications[5].keys()) >= expected.keys() and {k: notifications[5][k] for k in expected if + k in notifications[5]} == expected + expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 't'} + assert set(notifications[6].keys()) >= expected.keys() and {k: notifications[6][k] for k in expected if + k in notifications[6]} == expected + expected = {'change_type': 'DROPPED', 'target_type': 'KEYSPACE'} + assert set(notifications[7].keys()) >= expected.keys() and {k: notifications[7][k] for k in expected if + k in notifications[7]} == expected class TestVariousNotifications(Tester): @@ -325,17 +344,16 @@ class TestVariousNotifications(Tester): """ @since('2.2') - def tombstone_failure_threshold_message_test(self): + def test_tombstone_failure_threshold_message(self): """ Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather than dropping the request. A drop makes the coordinator waits for the specified read_request_timeout_in_ms. @jira_ticket CASSANDRA-7886 """ - have_v5_protocol = self.cluster.version() >= LooseVersion('3.10') - self.allow_log_errors = True + self.fixture_dtest_setup.allow_log_errors = True self.cluster.set_configuration_options( values={ 'tombstone_failure_threshold': 500, @@ -356,7 +374,7 @@ class TestVariousNotifications(Tester): ) # Add data with tombstones - values = map(lambda i: str(i), range(1000)) + values = [str(i) for i in range(1000)] for value in values: session.execute(SimpleStatement( "insert into test (id, mytext, col1) values (1, '{}', null) ".format( @@ -367,15 +385,15 @@ class TestVariousNotifications(Tester): failure_msg = ("Scanned over.* tombstones.* query aborted") - @timed(25) + @pytest.mark.timeout(25) def read_failure_query(): try: session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL)) except ReadFailure as exc: if have_v5_protocol: # at least one replica should have responded with a tombstone error - self.assertIsNotNone(exc.error_code_map) - self.assertEqual(0x0001, exc.error_code_map.values()[0]) + assert exc.error_code_map is not None + assert 0x0001 == list(exc.error_code_map.values())[0] except Exception: raise else: @@ -393,22 +411,21 @@ class TestVariousNotifications(Tester): node2.grep_log(failure_msg) or node3.grep_log(failure_msg)) - self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log " - "after failed query")) + assert failure == "Cannot find tombstone failure threshold error in log after failed query" mark1 = node1.mark_log() mark2 = node2.mark_log() mark3 = node3.mark_log() - @timed(35) + @pytest.mark.timeout(35) def range_request_failure_query(): try: session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL)) except ReadFailure as exc: if have_v5_protocol: # at least one replica should have responded with a tombstone error - self.assertIsNotNone(exc.error_code_map) - self.assertEqual(0x0001, exc.error_code_map.values()[0]) + assert exc.error_code_map is not None + assert 0x0001 == list(exc.error_code_map.values())[0] except Exception: raise else: @@ -426,5 +443,4 @@ class TestVariousNotifications(Tester): node2.grep_log(failure_msg, from_mark=mark2) or node3.grep_log(failure_msg, from_mark=mark3)) - self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log " - "after range_request_timeout_query")) + assert failure == "Cannot find tombstone failure threshold error in log after range_request_timeout_query" http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/putget_test.py ---------------------------------------------------------------------- diff --git a/putget_test.py b/putget_test.py index f200864..1933f8e 100644 --- a/putget_test.py +++ b/putget_test.py @@ -1,28 +1,39 @@ +import pytest import time +import logging from cassandra import ConsistencyLevel from thrift.protocol import TBinaryProtocol from thrift.transport import TSocket, TTransport +from dtest_setup_overrides import DTestSetupOverrides + from dtest import Tester, create_ks, create_cf from tools.data import (create_c1c2_table, insert_c1c2, insert_columns, putget, query_c1c2, query_columns, range_putget) -from tools.decorators import no_vnodes, since from tools.misc import ImmutableMapping, retry_till_success +since = pytest.mark.since +logger = logging.getLogger(__name__) + class TestPutGet(Tester): - cluster_options = ImmutableMapping({'start_rpc': 'true'}) - def putget_test(self): + @pytest.fixture(scope='function', autouse=True) + def fixture_dtest_setup_overrides(self): + dtest_setup_overrides = DTestSetupOverrides() + dtest_setup_overrides.cluster_options = ImmutableMapping({'start_rpc': 'true'}) + return dtest_setup_overrides + + def test_putget(self): """ Simple put/get on a single row, hitting multiple sstables """ self._putget() - def putget_snappy_test(self): + def test_putget_snappy(self): """ Simple put/get on a single row, but hitting multiple sstables (with snappy compression) """ self._putget(compression="Snappy") - def putget_deflate_test(self): + def test_putget_deflate(self): """ Simple put/get on a single row, but hitting multiple sstables (with deflate compression) """ self._putget(compression="Deflate") @@ -40,7 +51,7 @@ class TestPutGet(Tester): putget(cluster, session) - def non_local_read_test(self): + def test_non_local_read(self): """ This test reads from a coordinator we know has no copy of the data """ cluster = self.cluster @@ -53,12 +64,11 @@ class TestPutGet(Tester): # insert and get at CL.QUORUM (since RF=2, node1 won't have all key locally) insert_c1c2(session, n=1000, consistency=ConsistencyLevel.QUORUM) - for n in xrange(0, 1000): + for n in range(0, 1000): query_c1c2(session, n, ConsistencyLevel.QUORUM) - def rangeputget_test(self): + def test_rangeputget(self): """ Simple put/get on ranges of rows, hitting multiple sstables """ - cluster = self.cluster cluster.populate(3).start() @@ -70,7 +80,7 @@ class TestPutGet(Tester): range_putget(cluster, session) - def wide_row_test(self): + def test_wide_row(self): """ Test wide row slices """ cluster = self.cluster @@ -83,16 +93,16 @@ class TestPutGet(Tester): key = 'wide' - for x in xrange(1, 5001): + for x in range(1, 5001): insert_columns(self, session, key, 100, offset=x - 1) for size in (10, 100, 1000): - for x in xrange(1, (50001 - size) / size): + for x in range(1, (50001 - size) // size): query_columns(self, session, key, size, offset=x * size - 1) - @no_vnodes() + @pytest.mark.no_vnodes @since('2.0', max_version='4') - def wide_slice_test(self): + def test_wide_slice(self): """ Check slicing a wide row. See https://issues.apache.org/jira/browse/CASSANDRA-4919 @@ -140,9 +150,9 @@ class TestPutGet(Tester): session.execute(query) time.sleep(.5) - for i in xrange(10): + for i in range(10): key_num = str(i).zfill(2) - for j in xrange(10): + for j in range(10): stmt = "INSERT INTO test (k, column1, value) VALUES ('a%s', 'col%s', '%s')" % (key_num, j, j) session.execute(stmt) stmt = "INSERT INTO test (k, column1, value) VALUES ('b%s', 'col%s', '%s')" % (key_num, j, j) @@ -172,7 +182,7 @@ class TestPutGet(Tester): # print row.key # print cols - self.assertEqual(len(columns), 95, "Regression in cassandra-4919. Expected 95 columns, got {}.".format(len(columns))) + assert len(columns) == 95, "Regression in cassandra-4919. Expected 95 columns == got {}.".format(len(columns)) class ThriftConnection(object): @@ -248,7 +258,7 @@ class ThriftConnection(object): def wait_for_agreement(self): schemas = self.client.describe_schema_versions() - if len([ss for ss in schemas.keys() if ss != 'UNREACHABLE']) > 1: + if len([ss for ss in list(schemas.keys()) if ss != 'UNREACHABLE']) > 1: raise Exception("schema agreement not reached") def _translate_cl(self, cl): @@ -258,7 +268,7 @@ class ThriftConnection(object): """ Insert some basic values """ cf_parent = self.Cassandra.ColumnParent(column_family=self.cf_name) - for row_key in ('row_%d' % i for i in xrange(num_rows)): + for row_key in ('row_%d' % i for i in range(num_rows)): col = self.Cassandra.Column(name='col_0', value='val_0', timestamp=int(time.time() * 1000)) retry_till_success(self.client.insert, @@ -269,7 +279,7 @@ class ThriftConnection(object): def query_columns(self, num_rows=10, consistency_level='QUORUM'): """ Check that the values inserted in insert_columns() are present """ - for row_key in ('row_%d' % i for i in xrange(num_rows)): + for row_key in ('row_%d' % i for i in range(num_rows)): cpath = self.Cassandra.ColumnPath(column_family=self.cf_name, column='col_0') cosc = retry_till_success(self.client.get, key=row_key, column_path=cpath, @@ -277,5 +287,5 @@ class ThriftConnection(object): timeout=30) col = cosc.column value = col.value - self.assertEqual(value, 'val_0') + assert value == 'val_0' return self http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/pytest.ini ---------------------------------------------------------------------- diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..1ee5342 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +junit_suite_name = Cassandra dtests +log_print = True +log_level = INFO +log_format = %(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/range_ghost_test.py ---------------------------------------------------------------------- diff --git a/range_ghost_test.py b/range_ghost_test.py index 34f599c..6d3d3ae 100644 --- a/range_ghost_test.py +++ b/range_ghost_test.py @@ -1,12 +1,15 @@ import time +import logging from tools.assertions import assert_length_equal from dtest import Tester, create_ks, create_cf +logger = logging.getLogger(__name__) + class TestRangeGhosts(Tester): - def ghosts_test(self): + def test_ghosts(self): """ Check range ghost are correctly removed by the system """ cluster = self.cluster cluster.populate(1).start() @@ -19,7 +22,7 @@ class TestRangeGhosts(Tester): rows = 1000 - for i in xrange(0, rows): + for i in range(0, rows): session.execute("UPDATE cf SET c = 'value' WHERE key = 'k%i'" % i) res = list(session.execute("SELECT * FROM cf LIMIT 10000")) @@ -27,7 +30,7 @@ class TestRangeGhosts(Tester): node1.flush() - for i in xrange(0, rows / 2): + for i in range(0, rows // 2): session.execute("DELETE FROM cf WHERE key = 'k%i'" % i) res = list(session.execute("SELECT * FROM cf LIMIT 10000")) http://git-wip-us.apache.org/repos/asf/cassandra-dtest/blob/49b2dda4/read_failures_test.py ---------------------------------------------------------------------- diff --git a/read_failures_test.py b/read_failures_test.py index ba43505..80ebe5d 100644 --- a/read_failures_test.py +++ b/read_failures_test.py @@ -1,9 +1,14 @@ +import logging +import pytest + from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout from cassandra.policies import FallthroughRetryPolicy from cassandra.query import SimpleStatement from dtest import Tester -from tools.decorators import since + +since = pytest.mark.since +logger = logging.getLogger(__name__) KEYSPACE = "readfailures" @@ -13,19 +18,21 @@ class TestReadFailures(Tester): Tests for read failures in the replicas, introduced as a part of @jira_ticket CASSANDRA-12311. """ - ignore_log_patterns = ( - "Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones - ) + @pytest.fixture(autouse=True) + def fixture_add_additional_log_patterns(self, fixture_dtest_setup): + fixture_dtest_setup.ignore_log_patterns = ( + "Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones + ) + return fixture_dtest_setup - def setUp(self): - super(TestReadFailures, self).setUp() + @pytest.fixture(scope='function', autouse=True) + def parse_dtest_config(self, parse_dtest_config): self.tombstone_failure_threshold = 500 self.replication_factor = 3 self.consistency_level = ConsistencyLevel.ALL self.expected_expt = ReadFailure - def tearDown(self): - super(TestReadFailures, self).tearDown() + return parse_dtest_config def _prepare_cluster(self): self.cluster.set_configuration_options( @@ -33,7 +40,7 @@ class TestReadFailures(Tester): ) self.cluster.populate(3) self.cluster.start(wait_for_binary_proto=True) - self.nodes = self.cluster.nodes.values() + self.nodes = list(self.cluster.nodes.values()) session = self.patient_exclusive_cql_connection(self.nodes[0], protocol_version=self.protocol_version) @@ -59,11 +66,11 @@ class TestReadFailures(Tester): if self.expected_expt is None: session.execute(statement) else: - with self.assertRaises(self.expected_expt) as cm: + with pytest.raises(self.expected_expt) as cm: # On 2.1, we won't return the ReadTimeout from coordinator until actual timeout, # so we need to up the default timeout of the driver session session.execute(statement, timeout=15) - return cm.exception + return cm._excinfo[1] def _assert_error_code_map_exists_with_code(self, exception, expected_code): """ @@ -71,14 +78,14 @@ class TestReadFailures(Tester): where at least one node responded with some expected code. This is meant for testing failure exceptions on protocol v5. """ - self.assertIsNotNone(exception) - self.assertIsNotNone(exception.error_code_map) + assert exception is not None + assert exception.error_code_map is not None expected_code_found = False - for error_code in exception.error_code_map.values(): + for error_code in list(exception.error_code_map.values()): if error_code == expected_code: expected_code_found = True break - self.assertTrue(expected_code_found, "The error code map did not contain " + str(expected_code)) + assert expected_code_found, "The error code map did not contain " + str(expected_code) @since('2.1') def test_tombstone_failure_v3(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org