kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [25/50] [abbrv] kafka git commit: KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests
Date Mon, 11 Apr 2016 23:09:41 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
new file mode 100644
index 0000000..9aa16ab
--- /dev/null
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
+from kafkatest.services.console_consumer import ConsoleConsumer
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix
+import subprocess, itertools, time
+from collections import Counter
+
+class ConnectDistributedTest(KafkaTest):
+    """
+    Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
+    another, validating the total output is identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    TOPIC = "test"
+    OFFSETS_TOPIC = "connect-offsets"
+    CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
+
+    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
+    # across all nodes.
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc.log_level = "DEBUG"
+        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.schemas = True
+
+    def test_file_source_and_sink(self):
+        """
+        Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
+        correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
+        """
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+
+        self.cc.start()
+
+        self.logger.info("Creating connectors")
+        for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
+            connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+            self.cc.create_connector(connector_config)
+
+        # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.cc.restart()
+
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
+
+
+    @matrix(clean=[True, False])
+    def test_bounce(self, clean):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
+        """
+        num_tasks = 3
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, tasks=num_tasks)
+        self.source.start()
+        self.sink = VerifiableSink(self.cc, tasks=num_tasks)
+        self.sink.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+                # If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give
+                # some time here, the next bounce may cause consumers to be shut down before they have any time to process
+                # data and we can end up with zero data making it through the test.
+                if not clean:
+                    time.sleep(15)
+
+
+        self.source.stop()
+        self.sink.stop()
+        self.cc.stop()
+
+        # Validate at least once delivery of everything that was reported as written since we should have flushed and
+        # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed
+        # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across
+        # tasks.
+        success = True
+        errors = []
+        allow_dups = not clean
+        src_messages = self.source.messages()
+        sink_messages = self.sink.messages()
+        for task in range(num_tasks):
+            # Validate source messages
+            src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
+            # bouncing should commit on rebalance.
+            src_seqno_max = max(src_seqnos)
+            self.logger.debug("Max source seqno: %d", src_seqno_max)
+            src_seqno_counts = Counter(src_seqnos)
+            missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
+            duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1])
+
+            if missing_src_seqnos:
+                self.logger.error("Missing source sequence numbers for task " + str(task))
+                errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos))
+                success = False
+            if not allow_dups and duplicate_src_seqnos:
+                self.logger.error("Duplicate source sequence numbers for task " + str(task))
+                errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos))
+                success = False
+
+
+            # Validate sink messages
+            sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task and 'flushed' in msg]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because
+            # clean bouncing should commit on rebalance.
+            sink_seqno_max = max(sink_seqnos)
+            self.logger.debug("Max sink seqno: %d", sink_seqno_max)
+            sink_seqno_counts = Counter(sink_seqnos)
+            missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos)))
+            duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1])
+
+            if missing_sink_seqnos:
+                self.logger.error("Missing sink sequence numbers for task " + str(task))
+                errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos))
+                success = False
+            if not allow_dups and duplicate_sink_seqnos:
+                self.logger.error("Duplicate sink sequence numbers for task " + str(task))
+                errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos))
+                success = False
+
+            # Validate source and sink match
+            if sink_seqno_max > src_seqno_max:
+                self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max)
+                errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max))
+                success = False
+            if src_seqno_max < 1000 or sink_seqno_max < 1000:
+                errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max))
+                success = False
+
+        if not success:
+            self.mark_for_collect(self.cc)
+            # Also collect the data in the topic to aid in debugging
+            consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
+            consumer_validator.run()
+            self.mark_for_collect(consumer_validator, "consumer_stdout")
+
+        assert success, "Found validation errors:\n" + "\n  ".join(errors)
+
+
+
+    def _validate_file_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
+        # Between the first and second rounds, we might even end up with half the data on each node.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+        ]))
+        return input_set == output_set
+
+    def _file_contents(self, node, file):
+        try:
+            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
+        except subprocess.CalledProcessError:
+            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
new file mode 100644
index 0000000..69a8cb7
--- /dev/null
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
+from ducktape.utils.util import wait_until
+import hashlib, subprocess, json, itertools
+
+class ConnectRestApiTest(KafkaTest):
+    """
+    Test of Kafka Connect's REST API endpoints.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    INPUT_FILE2 = "/mnt/connect.input2"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    TOPIC = "test"
+    OFFSETS_TOPIC = "connect-offsets"
+    CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
+
+    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
+    # across all nodes.
+    INPUT_LIST = ["foo", "bar", "baz"]
+    INPUTS = "\n".join(INPUT_LIST) + "\n"
+    LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
+    LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+
+    def test_rest_api(self):
+        # Template parameters
+        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.schemas = True
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+
+        self.cc.start()
+
+        assert self.cc.list_connectors() == []
+
+        self.logger.info("Creating connectors")
+        source_connector_props = self.render("connect-file-source.properties")
+        sink_connector_props = self.render("connect-file-sink.properties")
+        for connector_props in [source_connector_props, sink_connector_props]:
+            connector_config = self._config_dict_from_props(connector_props)
+            self.cc.create_connector(connector_config)
+
+        # We should see the connectors appear
+        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
+                   timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
+
+        # We'll only do very simple validation that the connectors and tasks really ran.
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+
+        # Trying to create the same connector again should cause an error
+        try:
+            self.cc.create_connector(self._config_dict_from_props(source_connector_props))
+            assert False, "creating the same connector should have caused a conflict"
+        except ConnectRestError:
+            pass # expected
+
+        # Validate that we can get info about connectors
+        expected_source_info = {
+            'name': 'local-file-source',
+            'config': self._config_dict_from_props(source_connector_props),
+            'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
+        }
+        source_info = self.cc.get_connector("local-file-source")
+        assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
+        source_config = self.cc.get_connector_config("local-file-source")
+        assert expected_source_info['config'] == source_config, "Incorrect config: " + json.dumps(source_config)
+        expected_sink_info = {
+            'name': 'local-file-sink',
+            'config': self._config_dict_from_props(sink_connector_props),
+            'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
+        }
+        sink_info = self.cc.get_connector("local-file-sink")
+        assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
+        sink_config = self.cc.get_connector_config("local-file-sink")
+        assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
+
+
+        # Validate that we can get info about tasks. This info should definitely be available now without waiting since
+        # we've already seen data appear in files.
+        # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors
+        expected_source_task_info = [{
+            'id': { 'connector': 'local-file-source', 'task': 0 },
+            'config': {
+                'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
+                'file': self.INPUT_FILE,
+                'topic': self.TOPIC
+            }
+        }]
+        source_task_info = self.cc.get_connector_tasks("local-file-source")
+        assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
+        expected_sink_task_info = [{
+            'id': { 'connector': 'local-file-sink', 'task': 0 },
+            'config': {
+                'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask',
+                'file': self.OUTPUT_FILE,
+                'topics': self.TOPIC
+            }
+        }]
+        sink_task_info = self.cc.get_connector_tasks("local-file-sink")
+        assert expected_sink_task_info == sink_task_info, "Incorrect info:" + json.dumps(sink_task_info)
+
+        file_source_config = self._config_dict_from_props(source_connector_props)
+        file_source_config['file'] = self.INPUT_FILE2
+        self.cc.set_connector_config("local-file-source", file_source_config)
+
+        # We should also be able to verify that the modified configs caused the tasks to move to the new file and pick up
+        # more data.
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
+        wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        self.cc.delete_connector("local-file-source")
+        self.cc.delete_connector("local-file-sink")
+        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
+
+    def validate_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+            ]))
+        return input_set == output_set
+
+
+    def file_contents(self, node, file):
+        try:
+            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
+        except subprocess.CalledProcessError:
+            return []
+
+    def _config_dict_from_props(self, connector_props):
+        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
new file mode 100644
index 0000000..90f219a
--- /dev/null
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.connect import ConnectStandaloneService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from ducktape.utils.util import wait_until
+from ducktape.mark import parametrize
+import hashlib, subprocess, json
+
+class ConnectStandaloneFileTest(KafkaTest):
+    """
+    Simple test of Kafka Connect that produces data from a file in one
+    standalone process and consumes it on another, validating the output is
+    identical to the input.
+    """
+
+    INPUT_FILE = "/mnt/connect.input"
+    OUTPUT_FILE = "/mnt/connect.output"
+
+    OFFSETS_FILE = "/mnt/connect.offsets"
+
+    TOPIC = "test"
+
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
+
+    SCHEMA = { "type": "string", "optional": False }
+
+    def __init__(self, test_context):
+        super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'test' : { 'partitions': 1, 'replication-factor': 1 }
+        })
+
+        self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
+    @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+        assert converter != None, "converter type must be set"
+        # Template parameters
+        self.key_converter = converter
+        self.value_converter = converter
+        self.schemas = schemas
+
+        self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
+        self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
+
+        self.source.start()
+        self.sink.start()
+
+        # Generating data on the source node should generate new records and create new output on the sink node
+        self.source.node.account.ssh("echo -e -n " + repr(self.FIRST_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+
+        # Restarting both should result in them picking up where they left off,
+        # only processing new data.
+        self.source.restart()
+        self.sink.restart()
+
+        self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+
+        # Validate the format of the data in the Kafka topic
+        self.consumer_validator.run()
+        expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
+        decoder = (json.loads if converter.endswith("JsonConverter") else str)
+        actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
+        assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
+
+    def validate_output(self, value):
+        try:
+            output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
+            return output_hash == hashlib.md5(value).hexdigest()
+        except subprocess.CalledProcessError:
+            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
new file mode 100644
index 0000000..7a7440a
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+group.id={{ group|default("connect-cluster") }}
+
+key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.topic={{ OFFSETS_TOPIC }}
+config.storage.topic={{ CONFIG_TOPIC }}
+status.storage.topic={{ STATUS_TOPIC }}
+
+# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
+offset.flush.interval.ms=5000
+
+rest.advertised.host.name = {{ node.account.hostname }}
+
+
+# Reduce session timeouts so tests that kill workers don't need to wait as long to recover
+session.timeout.ms=10000
+consumer.session.timeout.ms=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
new file mode 100644
index 0000000..ad78bb3
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-sink
+connector.class=FileStreamSink
+tasks.max=1
+file={{ OUTPUT_FILE }}
+topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
new file mode 100644
index 0000000..d2d5e97
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=local-file-source
+connector.class=FileStreamSource
+tasks.max=1
+file={{ INPUT_FILE }}
+topic={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
new file mode 100644
index 0000000..bf1daf7
--- /dev/null
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bootstrap.servers={{ kafka.bootstrap_servers() }}
+
+key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
+{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
+key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
+value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
+{% endif %}
+
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename={{ OFFSETS_FILE }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
deleted file mode 100644
index 9aa16ab..0000000
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
-from kafkatest.services.console_consumer import ConsoleConsumer
-from ducktape.utils.util import wait_until
-from ducktape.mark import matrix
-import subprocess, itertools, time
-from collections import Counter
-
-class ConnectDistributedTest(KafkaTest):
-    """
-    Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
-    another, validating the total output is identical to the input.
-    """
-
-    INPUT_FILE = "/mnt/connect.input"
-    OUTPUT_FILE = "/mnt/connect.output"
-
-    TOPIC = "test"
-    OFFSETS_TOPIC = "connect-offsets"
-    CONFIG_TOPIC = "connect-configs"
-    STATUS_TOPIC = "connect-status"
-
-    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
-    # across all nodes.
-    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
-    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
-    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
-    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
-        self.cc.log_level = "DEBUG"
-        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
-        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
-        self.schemas = True
-
-    def test_file_source_and_sink(self):
-        """
-        Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
-        correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
-        """
-
-        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
-
-        self.cc.start()
-
-        self.logger.info("Creating connectors")
-        for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
-            connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
-            self.cc.create_connector(connector_config)
-
-        # Generating data on the source node should generate new records and create new output on the sink node. Timeouts
-        # here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
-        # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        # Restarting both should result in them picking up where they left off,
-        # only processing new data.
-        self.cc.restart()
-
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
-
-
-    @matrix(clean=[True, False])
-    def test_bounce(self, clean):
-        """
-        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
-        run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
-        """
-        num_tasks = 3
-
-        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
-        self.cc.start()
-
-        self.source = VerifiableSource(self.cc, tasks=num_tasks)
-        self.source.start()
-        self.sink = VerifiableSink(self.cc, tasks=num_tasks)
-        self.sink.start()
-
-        for _ in range(3):
-            for node in self.cc.nodes:
-                started = time.time()
-                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
-                self.cc.stop_node(node, clean_shutdown=clean)
-                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
-                    self.cc.start_node(node)
-                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
-                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
-                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
-                # If this is a hard bounce, give additional time for the consumer groups to recover. If we don't give
-                # some time here, the next bounce may cause consumers to be shut down before they have any time to process
-                # data and we can end up with zero data making it through the test.
-                if not clean:
-                    time.sleep(15)
-
-
-        self.source.stop()
-        self.sink.stop()
-        self.cc.stop()
-
-        # Validate at least once delivery of everything that was reported as written since we should have flushed and
-        # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed
-        # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across
-        # tasks.
-        success = True
-        errors = []
-        allow_dups = not clean
-        src_messages = self.source.messages()
-        sink_messages = self.sink.messages()
-        for task in range(num_tasks):
-            # Validate source messages
-            src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task]
-            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
-            # bouncing should commit on rebalance.
-            src_seqno_max = max(src_seqnos)
-            self.logger.debug("Max source seqno: %d", src_seqno_max)
-            src_seqno_counts = Counter(src_seqnos)
-            missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
-            duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1])
-
-            if missing_src_seqnos:
-                self.logger.error("Missing source sequence numbers for task " + str(task))
-                errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos))
-                success = False
-            if not allow_dups and duplicate_src_seqnos:
-                self.logger.error("Duplicate source sequence numbers for task " + str(task))
-                errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos))
-                success = False
-
-
-            # Validate sink messages
-            sink_seqnos = [msg['seqno'] for msg in sink_messages if msg['task'] == task and 'flushed' in msg]
-            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because
-            # clean bouncing should commit on rebalance.
-            sink_seqno_max = max(sink_seqnos)
-            self.logger.debug("Max sink seqno: %d", sink_seqno_max)
-            sink_seqno_counts = Counter(sink_seqnos)
-            missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos)))
-            duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1])
-
-            if missing_sink_seqnos:
-                self.logger.error("Missing sink sequence numbers for task " + str(task))
-                errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos))
-                success = False
-            if not allow_dups and duplicate_sink_seqnos:
-                self.logger.error("Duplicate sink sequence numbers for task " + str(task))
-                errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos))
-                success = False
-
-            # Validate source and sink match
-            if sink_seqno_max > src_seqno_max:
-                self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max)
-                errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max))
-                success = False
-            if src_seqno_max < 1000 or sink_seqno_max < 1000:
-                errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max))
-                success = False
-
-        if not success:
-            self.mark_for_collect(self.cc)
-            # Also collect the data in the topic to aid in debugging
-            consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
-            consumer_validator.run()
-            self.mark_for_collect(consumer_validator, "consumer_stdout")
-
-        assert success, "Found validation errors:\n" + "\n  ".join(errors)
-
-
-
-    def _validate_file_output(self, input):
-        input_set = set(input)
-        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
-        # Between the first and second rounds, we might even end up with half the data on each node.
-        output_set = set(itertools.chain(*[
-            [line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
-        ]))
-        return input_set == output_set
-
-    def _file_contents(self, node, file):
-        try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
-            # immediately
-            return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
-            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_rest_test.py b/tests/kafkatest/tests/connect_rest_test.py
deleted file mode 100644
index 69a8cb7..0000000
--- a/tests/kafkatest/tests/connect_rest_test.py
+++ /dev/null
@@ -1,165 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.connect import ConnectDistributedService, ConnectRestError
-from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools
-
-class ConnectRestApiTest(KafkaTest):
-    """
-    Test of Kafka Connect's REST API endpoints.
-    """
-
-    INPUT_FILE = "/mnt/connect.input"
-    INPUT_FILE2 = "/mnt/connect.input2"
-    OUTPUT_FILE = "/mnt/connect.output"
-
-    TOPIC = "test"
-    OFFSETS_TOPIC = "connect-offsets"
-    CONFIG_TOPIC = "connect-configs"
-    STATUS_TOPIC = "connect-status"
-
-    # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
-    # across all nodes.
-    INPUT_LIST = ["foo", "bar", "baz"]
-    INPUTS = "\n".join(INPUT_LIST) + "\n"
-    LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"]
-    LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
-
-    def test_rest_api(self):
-        # Template parameters
-        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
-        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
-        self.schemas = True
-
-        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
-
-        self.cc.start()
-
-        assert self.cc.list_connectors() == []
-
-        self.logger.info("Creating connectors")
-        source_connector_props = self.render("connect-file-source.properties")
-        sink_connector_props = self.render("connect-file-sink.properties")
-        for connector_props in [source_connector_props, sink_connector_props]:
-            connector_config = self._config_dict_from_props(connector_props)
-            self.cc.create_connector(connector_config)
-
-        # We should see the connectors appear
-        wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]),
-                   timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing")
-
-        # We'll only do very simple validation that the connectors and tasks really ran.
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-
-        # Trying to create the same connector again should cause an error
-        try:
-            self.cc.create_connector(self._config_dict_from_props(source_connector_props))
-            assert False, "creating the same connector should have caused a conflict"
-        except ConnectRestError:
-            pass # expected
-
-        # Validate that we can get info about connectors
-        expected_source_info = {
-            'name': 'local-file-source',
-            'config': self._config_dict_from_props(source_connector_props),
-            'tasks': [{ 'connector': 'local-file-source', 'task': 0 }]
-        }
-        source_info = self.cc.get_connector("local-file-source")
-        assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info)
-        source_config = self.cc.get_connector_config("local-file-source")
-        assert expected_source_info['config'] == source_config, "Incorrect config: " + json.dumps(source_config)
-        expected_sink_info = {
-            'name': 'local-file-sink',
-            'config': self._config_dict_from_props(sink_connector_props),
-            'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }]
-        }
-        sink_info = self.cc.get_connector("local-file-sink")
-        assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info)
-        sink_config = self.cc.get_connector_config("local-file-sink")
-        assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config)
-
-
-        # Validate that we can get info about tasks. This info should definitely be available now without waiting since
-        # we've already seen data appear in files.
-        # TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors
-        expected_source_task_info = [{
-            'id': { 'connector': 'local-file-source', 'task': 0 },
-            'config': {
-                'task.class': 'org.apache.kafka.connect.file.FileStreamSourceTask',
-                'file': self.INPUT_FILE,
-                'topic': self.TOPIC
-            }
-        }]
-        source_task_info = self.cc.get_connector_tasks("local-file-source")
-        assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info)
-        expected_sink_task_info = [{
-            'id': { 'connector': 'local-file-sink', 'task': 0 },
-            'config': {
-                'task.class': 'org.apache.kafka.connect.file.FileStreamSinkTask',
-                'file': self.OUTPUT_FILE,
-                'topics': self.TOPIC
-            }
-        }]
-        sink_task_info = self.cc.get_connector_tasks("local-file-sink")
-        assert expected_sink_task_info == sink_task_info, "Incorrect info:" + json.dumps(sink_task_info)
-
-        file_source_config = self._config_dict_from_props(source_connector_props)
-        file_source_config['file'] = self.INPUT_FILE2
-        self.cc.set_connector_config("local-file-source", file_source_config)
-
-        # We should also be able to verify that the modified configs caused the tasks to move to the new file and pick up
-        # more data.
-        for node in self.cc.nodes:
-            node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2)
-        wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        self.cc.delete_connector("local-file-source")
-        self.cc.delete_connector("local-file-sink")
-        wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing")
-
-    def validate_output(self, input):
-        input_set = set(input)
-        # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
-        output_set = set(itertools.chain(*[
-            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
-            ]))
-        return input_set == output_set
-
-
-    def file_contents(self, node, file):
-        try:
-            # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
-            # immediately
-            return list(node.account.ssh_capture("cat " + file))
-        except subprocess.CalledProcessError:
-            return []
-
-    def _config_dict_from_props(self, connector_props):
-        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_test.py b/tests/kafkatest/tests/connect_test.py
deleted file mode 100644
index 90f219a..0000000
--- a/tests/kafkatest/tests/connect_test.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.connect import ConnectStandaloneService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
-import hashlib, subprocess, json
-
-class ConnectStandaloneFileTest(KafkaTest):
-    """
-    Simple test of Kafka Connect that produces data from a file in one
-    standalone process and consumes it on another, validating the output is
-    identical to the input.
-    """
-
-    INPUT_FILE = "/mnt/connect.input"
-    OUTPUT_FILE = "/mnt/connect.output"
-
-    OFFSETS_FILE = "/mnt/connect.offsets"
-
-    TOPIC = "test"
-
-    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
-    FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n"
-    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
-    SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n"
-
-    SCHEMA = { "type": "string", "optional": False }
-
-    def __init__(self, test_context):
-        super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
-
-        self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
-
-    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
-    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
-    @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
-        assert converter != None, "converter type must be set"
-        # Template parameters
-        self.key_converter = converter
-        self.value_converter = converter
-        self.schemas = schemas
-
-        self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
-        self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
-
-        self.source.start()
-        self.sink.start()
-
-        # Generating data on the source node should generate new records and create new output on the sink node
-        self.source.node.account.ssh("echo -e -n " + repr(self.FIRST_INPUT) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
-
-        # Restarting both should result in them picking up where they left off,
-        # only processing new data.
-        self.source.restart()
-        self.sink.restart()
-
-        self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
-
-        # Validate the format of the data in the Kafka topic
-        self.consumer_validator.run()
-        expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST])
-        decoder = (json.loads if converter.endswith("JsonConverter") else str)
-        actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]])
-        assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual)
-
-    def validate_output(self, value):
-        try:
-            output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0]
-            return output_hash == hashlib.md5(value).hexdigest()
-        except subprocess.CalledProcessError:
-            return False

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_group_command_test.py b/tests/kafkatest/tests/consumer_group_command_test.py
deleted file mode 100644
index 1424d96..0000000
--- a/tests/kafkatest/tests/consumer_group_command_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-
-import os
-import re
-
-TOPIC = "topic-consumer-group-command"
-
-class ConsumerGroupCommandTest(Test):
-    """
-    Tests ConsumerGroupCommand
-    """
-    # Root directory for persistent output
-    PERSISTENT_ROOT = "/mnt/consumer_group_command"
-    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
-
-    def __init__(self, test_context):
-        super(ConsumerGroupCommandTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.topics = {
-            TOPIC: {'partitions': 1, 'replication-factor': 1}
-        }
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self, security_protocol, interbroker_security_protocol):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
-            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
-        self.kafka.start()
-
-    def start_consumer(self, security_protocol):
-        enable_new_consumer = security_protocol == SecurityConfig.SSL
-        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
-                                        consumer_timeout_ms=None, new_consumer=enable_new_consumer)
-        self.consumer.start()
-
-    def setup_and_verify(self, security_protocol, group=None):
-        self.start_kafka(security_protocol, security_protocol)
-        self.start_consumer(security_protocol)
-        consumer_node = self.consumer.nodes[0]
-        wait_until(lambda: self.consumer.alive(consumer_node),
-                   timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
-        kafka_node = self.kafka.nodes[0]
-        if security_protocol is not SecurityConfig.PLAINTEXT:
-            prop_file = str(self.kafka.security_config.client_config())
-            self.logger.debug(prop_file)
-            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
-            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
-
-        # Verify ConsumerGroupCommand lists expected consumer groups
-        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
-        command_config_file = None
-        if enable_new_consumer:
-            command_config_file = self.COMMAND_CONFIG_FILE
-
-        if group:
-            wait_until(lambda: re.search("%s\s+topic-consumer-group-command\s+0"%group,self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file)), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-        else:
-            wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer groups.")
-
-        self.consumer.stop()
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is listing correct consumer groups
-        :return: None
-        """
-        self.setup_and_verify(security_protocol)
-
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
-        """
-        Tests if ConsumerGroupCommand is describing a consumer group correctly
-        :return: None
-        """
-        self.setup_and_verify(security_protocol, group="test-consumer-group")

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_rolling_upgrade_test.py b/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
deleted file mode 100644
index 3cd3c7c..0000000
--- a/tests/kafkatest/tests/consumer_rolling_upgrade_test.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 4
-    RANGE = "org.apache.kafka.clients.consumer.RangeAssignor"
-    ROUND_ROBIN = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
-
-    def __init__(self, test_context):
-        super(ConsumerRollingUpgradeTest, self).__init__(test_context, num_consumers=2, num_producers=0,
-                                                         num_zk=1, num_brokers=1, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }
-        })
-
-    def _verify_range_assignment(self, consumer):
-        # range assignment should give us two partition sets: (0, 1) and (2, 3)
-        assignment = set([frozenset(partitions) for partitions in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 1)]),
-            frozenset([TopicPartition(self.TOPIC, 2), TopicPartition(self.TOPIC, 3)])])
-
-    def _verify_roundrobin_assignment(self, consumer):
-        assignment = set([frozenset(x) for x in consumer.current_assignment().values()])
-        assert assignment == set([
-            frozenset([TopicPartition(self.TOPIC, 0), TopicPartition(self.TOPIC, 2)]),
-            frozenset([TopicPartition(self.TOPIC, 1), TopicPartition(self.TOPIC, 3)])])
-
-    def rolling_update_test(self):
-        """
-        Verify rolling updates of partition assignment strategies works correctly. In this
-        test, we use a rolling restart to change the group's assignment strategy from "range" 
-        to "roundrobin." We verify after every restart that all members are still in the group
-        and that the correct assignment strategy was used.
-        """
-
-        # initialize the consumer using range assignment
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=self.RANGE)
-
-        consumer.start()
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-
-        # change consumer configuration to prefer round-robin assignment, but still support range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN + "," + self.RANGE
-
-        # restart one of the nodes and verify that we are still using range assignment
-        consumer.stop_node(consumer.nodes[0])
-        consumer.start_node(consumer.nodes[0])
-        self.await_all_members(consumer)
-        self._verify_range_assignment(consumer)
-        
-        # now restart the other node and verify that we have switched to round-robin
-        consumer.stop_node(consumer.nodes[1])
-        consumer.start_node(consumer.nodes[1])
-        self.await_all_members(consumer)
-        self._verify_roundrobin_assignment(consumer)
-
-        # if we want, we can now drop support for range assignment
-        consumer.assignment_strategy = self.ROUND_ROBIN
-        for node in consumer.nodes:
-            consumer.stop_node(node)
-            consumer.start_node(node)
-            self.await_all_members(consumer)
-            self._verify_roundrobin_assignment(consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/consumer_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py
deleted file mode 100644
index 084b19d..0000000
--- a/tests/kafkatest/tests/consumer_test.py
+++ /dev/null
@@ -1,291 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-from ducktape.utils.util import wait_until
-
-from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition
-
-import signal
-
-class OffsetValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 1
-
-    def __init__(self, test_context):
-        super(OffsetValidationTest, self).__init__(test_context, num_consumers=3, num_producers=1,
-                                                     num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 2 }
-        })
-
-    def rolling_bounce_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-                wait_until(lambda: len(consumer.dead_nodes()) == 1,
-                           timeout_sec=self.session_timeout_sec+5,
-                           err_msg="Timed out waiting for the consumer to shutdown")
-
-                consumer.start_node(node)
-
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def bounce_all_consumers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in consumer.nodes:
-                consumer.stop_node(node, clean_shutdown)
-
-            wait_until(lambda: len(consumer.dead_nodes()) == self.num_consumers, timeout_sec=10,
-                       err_msg="Timed out waiting for the consumers to shutdown")
-            
-            for node in consumer.nodes:
-                consumer.start_node(node)
-
-            self.await_all_members(consumer)
-            self.await_consumed_messages(consumer)
-
-    def rolling_bounce_brokers(self, consumer, num_bounces=5, clean_shutdown=True):
-        for _ in range(num_bounces):
-            for node in self.kafka.nodes:
-                self.kafka.restart_node(node, clean_shutdown=True)
-                self.await_all_members(consumer)
-                self.await_consumed_messages(consumer)
-
-    def test_broker_rolling_bounce(self):
-        """
-        Verify correct consumer behavior when the brokers are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer writing messages to a single topic with one
-        partition, an a set of consumers in the same group reading from the same topic.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each broker consecutively, waiting for the group to stabilize between
-          each broker restart.
-        - Verify delivery semantics according to the failure type and that the broker bounces
-          did not cause unexpected group rebalances.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-        # TODO: make this test work with hard shutdowns, which probably requires
-        #       pausing before the node is restarted to ensure that any ephemeral
-        #       nodes have time to expire
-        self.rolling_bounce_brokers(consumer, clean_shutdown=True)
-        
-        unexpected_rebalances = consumer.num_rebalances() - num_rebalances
-        assert unexpected_rebalances == 0, \
-            "Broker rolling bounce caused %d unexpected group rebalances" % unexpected_rebalances
-
-        consumer.stop_all()
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-    @matrix(clean_shutdown=[True, False], bounce_mode=["all", "rolling"])
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode):
-        """
-        Verify correct consumer behavior when the consumers in the group are consecutively restarted.
-
-        Setup: single Kafka cluster with one producer and a set of consumers in one group.
-
-        - Start a producer which continues producing new messages throughout the test.
-        - Start up the consumers and wait until they've joined the group.
-        - In a loop, restart each consumer, waiting for each one to rejoin the group before
-          restarting the rest.
-        - Verify delivery semantics according to the failure type.
-        """
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
-
-        producer.start()
-        self.await_produced_messages(producer)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        if bounce_mode == "all":
-            self.bounce_all_consumers(consumer, clean_shutdown=clean_shutdown)
-        else:
-            self.rolling_bounce_consumers(consumer, clean_shutdown=clean_shutdown)
-                
-        consumer.stop_all()
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        consumer.start()
-        self.await_all_members(consumer)
-
-        partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
-
-        # startup the producer and ensure that some records have been written
-        producer.start()
-        self.await_produced_messages(producer)
-
-        # stop the partition owner and await its shutdown
-        consumer.kill_node(partition_owner, clean_shutdown=clean_shutdown)
-        wait_until(lambda: len(consumer.joined_nodes()) == (self.num_consumers - 1) and consumer.owner(partition) != None,
-                   timeout_sec=self.session_timeout_sec+5, err_msg="Timed out waiting for consumer to close")
-
-        # ensure that the remaining consumer does some work after rebalancing
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        consumer.stop_all()
-
-        if clean_shutdown:
-            # if the total records consumed matches the current position, we haven't seen any duplicates
-            # this can only be guaranteed with a clean shutdown
-            assert consumer.current_position(partition) == consumer.total_consumed(), \
-                "Total consumed records did not match consumed position"
-        else:
-            # we may have duplicates in a hard failure
-            assert consumer.current_position(partition) <= consumer.total_consumed(), \
-                "Current position greater than the total number of consumed records"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False])
-    def test_broker_failure(self, clean_shutdown, enable_autocommit):
-        partition = TopicPartition(self.TOPIC, 0)
-        
-        consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit)
-        producer = self.setup_producer(self.TOPIC)
-
-        producer.start()
-        consumer.start()
-        self.await_all_members(consumer)
-
-        num_rebalances = consumer.num_rebalances()
-
-        # shutdown one of the brokers
-        # TODO: we need a way to target the coordinator instead of picking arbitrarily
-        self.kafka.signal_node(self.kafka.nodes[0], signal.SIGTERM if clean_shutdown else signal.SIGKILL)
-
-        # ensure that the consumers do some work after the broker failure
-        self.await_consumed_messages(consumer, min_messages=1000)
-
-        # verify that there were no rebalances on failover
-        assert num_rebalances == consumer.num_rebalances(), "Broker failure should not cause a rebalance"
-
-        consumer.stop_all()
-
-        # if the total records consumed matches the current position, we haven't seen any duplicates
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        # if autocommit is not turned on, we can also verify the last committed offset
-        if not enable_autocommit:
-            assert consumer.last_commit(partition) == consumer.current_position(partition), \
-                "Last committed offset did not match last consumed position"
-
-    def test_group_consumption(self):
-        """
-        Verifies correct group rebalance behavior as consumers are started and stopped. 
-        In particular, this test verifies that the partition is readable after every
-        expected rebalance.
-
-        Setup: single Kafka cluster with a group of consumers reading from one topic
-        with one partition while the verifiable producer writes to it.
-
-        - Start the consumers one by one, verifying consumption after each rebalance
-        - Shutdown the consumers one by one, verifying consumption after each rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC)
-        producer = self.setup_producer(self.TOPIC)
-
-        partition = TopicPartition(self.TOPIC, 0)
-
-        producer.start()
-
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            self.await_consumed_messages(consumer)
-
-        for num_stopped, node in enumerate(consumer.nodes, 1):
-            consumer.stop_node(node)
-
-            if num_stopped < self.num_consumers:
-                self.await_members(consumer, self.num_consumers - num_stopped)
-                self.await_consumed_messages(consumer)
-
-        assert consumer.current_position(partition) == consumer.total_consumed(), \
-            "Total consumed records did not match consumed position"
-
-        assert consumer.last_commit(partition) == consumer.current_position(partition), \
-            "Last committed offset did not match last consumed position"
-
-
-class AssignmentValidationTest(VerifiableConsumerTest):
-    TOPIC = "test_topic"
-    NUM_PARTITIONS = 6
-
-    def __init__(self, test_context):
-        super(AssignmentValidationTest, self).__init__(test_context, num_consumers=3, num_producers=0,
-                                                num_zk=1, num_brokers=2, topics={
-            self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
-        })
-
-    @matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-                                 "org.apache.kafka.clients.consumer.RoundRobinAssignor"])
-    def test_valid_assignment(self, assignment_strategy):
-        """
-        Verify assignment strategy correctness: each partition is assigned to exactly
-        one consumer instance.
-
-        Setup: single Kafka cluster with a set of consumers in the same group.
-
-        - Start the consumers one by one
-        - Validate assignment after every expected rebalance
-        """
-        consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy)
-        for num_started, node in enumerate(consumer.nodes, 1):
-            consumer.start_node(node)
-            self.await_members(consumer, num_started)
-            assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment())
-            

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/__init__.py b/tests/kafkatest/tests/core/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/core/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults


Mime
View raw message