kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [23/50] [abbrv] kafka git commit: KAFKA-3483: Restructure ducktape tests to simplify running subsets of tests
Date Mon, 11 Apr 2016 23:09:39 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/quota_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py
deleted file mode 100644
index 7c2ec59..0000000
--- a/tests/kafkatest/tests/quota_test.py
+++ /dev/null
@@ -1,170 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark import parametrize
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.console_consumer import ConsoleConsumer
-
-
-class QuotaTest(Test):
-    """
-    These tests verify that quota provides expected functionality -- they run
-    producer, broker, and consumer with different clientId and quota configuration and
-    check that the observed throughput is close to the value we expect.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(QuotaTest, self).__init__(test_context=test_context)
-
-        self.topic = 'test_topic'
-        self.logger.info('use topic ' + self.topic)
-
-        # quota related parameters
-        self.quota_config = {'quota_producer_default': 2500000,
-                             'quota_consumer_default': 2000000,
-                             'quota_producer_bytes_per_second_overrides': 'overridden_id=3750000',
-                             'quota_consumer_bytes_per_second_overrides': 'overridden_id=3000000'}
-        self.maximum_client_deviation_percentage = 100.0
-        self.maximum_broker_deviation_percentage = 5.0
-        self.num_records = 100000
-        self.record_size = 3000
-
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
-                                  security_protocol='PLAINTEXT',
-                                  interbroker_security_protocol='PLAINTEXT',
-                                  topics={self.topic: {'partitions': 6, 'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
-                                  quota_config=self.quota_config,
-                                  jmx_object_names=['kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec',
-                                                    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'],
-                                  jmx_attributes=['OneMinuteRate'])
-        self.num_producers = 1
-        self.num_consumers = 2
-
-    def setUp(self):
-        self.zk.start()
-        self.kafka.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
-    @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
-    def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
-        # Produce all messages
-        producer = ProducerPerformanceService(
-            self.test_context, producer_num, self.kafka,
-            topic=self.topic, num_records=self.num_records, record_size=self.record_size, throughput=-1, client_id=producer_id,
-            jmx_object_names=['kafka.producer:type=producer-metrics,client-id=%s' % producer_id], jmx_attributes=['outgoing-byte-rate'])
-
-        producer.run()
-
-        # Consume all messages
-        consumer = ConsoleConsumer(self.test_context, consumer_num, self.kafka, self.topic,
-            new_consumer=False,
-            consumer_timeout_ms=60000, client_id=consumer_id,
-            jmx_object_names=['kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s' % consumer_id],
-            jmx_attributes=['OneMinuteRate'])
-        consumer.run()
-
-        for idx, messages in consumer.messages_consumed.iteritems():
-            assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
-
-        success, msg = self.validate(self.kafka, producer, consumer)
-        assert success, msg
-
-    def validate(self, broker, producer, consumer):
-        """
-        For each client_id we validate that:
-        1) number of consumed messages equals number of produced messages
-        2) maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        3) maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        4) maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        5) maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        """
-        success = True
-        msg = ''
-
-        self.kafka.read_jmx_output_all_nodes()
-
-        # validate that number of consumed messages equals number of produced messages
-        produced_num = sum([value['records'] for value in producer.results])
-        consumed_num = sum([len(value) for value in consumer.messages_consumed.values()])
-        self.logger.info('producer produced %d messages' % produced_num)
-        self.logger.info('consumer consumed %d messages' % consumed_num)
-        if produced_num != consumed_num:
-            success = False
-            msg += "number of produced messages %d doesn't equal number of consumed messages %d" % (produced_num, consumed_num)
-
-        # validate that maximum_producer_throughput <= producer_quota * (1 + maximum_client_deviation_percentage/100)
-        producer_attribute_name = 'kafka.producer:type=producer-metrics,client-id=%s:outgoing-byte-rate' % producer.client_id
-        producer_maximum_bps = producer.maximum_jmx_value[producer_attribute_name]
-        producer_quota_bps = self.get_producer_quota(producer.client_id)
-        self.logger.info('producer has maximum throughput %.2f bps with producer quota %.2f bps' % (producer_maximum_bps, producer_quota_bps))
-        if producer_maximum_bps > producer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum producer throughput %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (producer_maximum_bps, producer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_in_rate <= producer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_in_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate'
-        broker_maximum_byte_in_bps = broker.maximum_jmx_value[broker_byte_in_attribute_name]
-        self.logger.info('broker has maximum byte-in rate %.2f bps with producer quota %.2f bps' %
-                         (broker_maximum_byte_in_bps, producer_quota_bps))
-        if broker_maximum_byte_in_bps > producer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-in rate %.2f bps exceeded producer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_in_bps, producer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        # validate that maximum_consumer_throughput <= consumer_quota * (1 + maximum_client_deviation_percentage/100)
-        consumer_attribute_name = 'kafka.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=%s:OneMinuteRate' % consumer.client_id
-        consumer_maximum_bps = consumer.maximum_jmx_value[consumer_attribute_name]
-        consumer_quota_bps = self.get_consumer_quota(consumer.client_id)
-        self.logger.info('consumer has maximum throughput %.2f bps with consumer quota %.2f bps' % (consumer_maximum_bps, consumer_quota_bps))
-        if consumer_maximum_bps > consumer_quota_bps*(self.maximum_client_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum consumer throughput %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (consumer_maximum_bps, consumer_quota_bps, self.maximum_client_deviation_percentage)
-
-        # validate that maximum_broker_byte_out_rate <= consumer_quota * (1 + maximum_broker_deviation_percentage/100)
-        broker_byte_out_attribute_name = 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec:OneMinuteRate'
-        broker_maximum_byte_out_bps = broker.maximum_jmx_value[broker_byte_out_attribute_name]
-        self.logger.info('broker has maximum byte-out rate %.2f bps with consumer quota %.2f bps' %
-                         (broker_maximum_byte_out_bps, consumer_quota_bps))
-        if broker_maximum_byte_out_bps > consumer_quota_bps*(self.maximum_broker_deviation_percentage/100+1):
-            success = False
-            msg += 'maximum broker byte-out rate %.2f bps exceeded consumer quota %.2f bps by more than %.1f%%' % \
-                   (broker_maximum_byte_out_bps, consumer_quota_bps, self.maximum_broker_deviation_percentage)
-
-        return success, msg
-
-    def get_producer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_producer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_producer_default']
-
-    def get_consumer_quota(self, client_id):
-        overridden_quotas = {value.split('=')[0]:value.split('=')[1] for value in self.quota_config['quota_consumer_bytes_per_second_overrides'].split(',')}
-        if client_id in overridden_quotas:
-            return float(overridden_quotas[client_id])
-        return self.quota_config['quota_consumer_default']

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/reassign_partitions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/reassign_partitions_test.py b/tests/kafkatest/tests/reassign_partitions_test.py
deleted file mode 100644
index 24ce097..0000000
--- a/tests/kafkatest/tests/reassign_partitions_test.py
+++ /dev/null
@@ -1,110 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from ducktape.utils.util import wait_until
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-import random
-
-class ReassignPartitionsTest(ProduceConsumeValidateTest):
-    """
-    These tests validate partition reassignment.
-    Create a topic with few partitions, load some data, trigger partition re-assignment with and without broker failure,
-    check that partition re-assignment can complete and there is no data loss.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReassignPartitionsTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 20,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.num_partitions = 20
-        self.timeout_sec = 60
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        # Override this since we're adding services outside of the constructor
-        return super(ReassignPartitionsTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-    def clean_bounce_some_brokers(self):
-        """Bounce every other broker"""
-        for node in self.kafka.nodes[::2]:
-            self.kafka.restart_node(node, clean_shutdown=True)
-
-    def reassign_partitions(self, bounce_brokers):
-        partition_info = self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
-        self.logger.debug("Partitions before reassignment:" + str(partition_info))
-
-        # jumble partition assignment in dictionary
-        seed = random.randint(0, 2 ** 31 - 1)
-        self.logger.debug("Jumble partition assignment with seed " + str(seed))
-        random.seed(seed)
-        # The list may still be in order, but that's ok
-        shuffled_list = range(0, self.num_partitions)
-        random.shuffle(shuffled_list)
-
-        for i in range(0, self.num_partitions):
-            partition_info["partitions"][i]["partition"] = shuffled_list[i]
-        self.logger.debug("Jumbled partitions: " + str(partition_info))
-
-        # send reassign partitions command
-        self.kafka.execute_reassign_partitions(partition_info)
-
-        if bounce_brokers:
-            # bounce a few brokers at the same time
-            self.clean_bounce_some_brokers()
-
-        # Wait until finished or timeout
-        wait_until(lambda: self.kafka.verify_reassign_partitions(partition_info), timeout_sec=self.timeout_sec, backoff_sec=.5)
-
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=False)
-    @parametrize(security_protocol="PLAINTEXT", bounce_brokers=True)
-    def test_reassign_partitions(self, bounce_brokers, security_protocol):
-        """Reassign partitions tests.
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Reassign partitions
-            - If bounce_brokers is True, also bounce a few brokers while partition re-assignment is in progress
-            - When done reassigning partitions and bouncing brokers, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-        
-        self.run_produce_consume_validate(core_test_action=lambda: self.reassign_partitions(bounce_brokers))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py
deleted file mode 100644
index 7b360ab..0000000
--- a/tests/kafkatest/tests/replication_test.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.utils.util import wait_until
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-import signal
-
-def broker_node(test, broker_type):
-    """ Discover node of requested type. For leader type, discovers leader for our topic and partition 0
-    """
-    if broker_type == "leader":
-        node = test.kafka.leader(test.topic, partition=0)
-    elif broker_type == "controller":
-        node = test.kafka.controller()
-    else:
-        raise Exception("Unexpected broker type %s." % (broker_type))
-
-    return node
-
-def clean_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down cleanly.
-    """
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGTERM)
-
-
-def hard_shutdown(test, broker_type):
-    """Discover broker node of requested type and shut it down with a hard kill."""
-    node = broker_node(test, broker_type)
-    test.kafka.signal_node(node, sig=signal.SIGKILL)
-
-
-def clean_bounce(test, broker_type):
-    """Chase the leader of one partition and restart it cleanly."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
-
-
-def hard_bounce(test, broker_type):
-    """Chase the leader and restart it with a hard kill."""
-    for i in range(5):
-        prev_broker_node = broker_node(test, broker_type)
-        test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
-
-        # Since this is a hard kill, we need to make sure the process is down and that
-        # zookeeper and the broker cluster have registered the loss of the leader/controller.
-        # Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
-
-        def role_reassigned():
-            current_elected_broker = broker_node(test, broker_type)
-            return current_elected_broker is not None and current_elected_broker != prev_broker_node
-
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
-        wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
-        test.kafka.start_node(prev_broker_node)
-
-failures = {
-    "clean_shutdown": clean_shutdown,
-    "hard_shutdown": hard_shutdown,
-    "clean_bounce": clean_bounce,
-    "hard_bounce": hard_bounce
-}
-
-
-class ReplicationTest(ProduceConsumeValidateTest):
-    """
-    Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
-    (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
-    too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
-    ordering guarantees.
-
-    Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
-    we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
-
-    Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
-    consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
-    indicator that nothing is left to consume.
-    """
-
-    def __init__(self, test_context):
-        """:type test_context: ducktape.tests.test.TestContext"""
-        super(ReplicationTest, self).__init__(test_context=test_context)
-
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-                                                                    "partitions": 3,
-                                                                    "replication-factor": 3,
-                                                                    'configs': {"min.insync.replicas": 2}}
-                                                                })
-        self.producer_throughput = 1000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def setUp(self):
-        self.zk.start()
-
-    def min_cluster_size(self):
-        """Override this since we're adding services outside of the constructor"""
-        return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
-
-
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["leader"],
-            security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
-            broker_type=["controller"],
-            security_protocol=["PLAINTEXT", "SASL_SSL"])
-    def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
-        """Replication tests.
-        These tests verify that replication provides simple durability guarantees by checking that data acked by
-        brokers is still available for consumption in the face of various failure scenarios.
-
-        Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
-
-            - Produce messages in the background
-            - Consume messages in the background
-            - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
-            - When done driving failures, stop producing, and finish consuming
-            - Validate that every acked message was consumed
-        """
-
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-        new_consumer = False if  self.kafka.security_protocol == "PLAINTEXT" else True
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
-        self.kafka.start()
-        
-        self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
deleted file mode 100644
index fdbedca..0000000
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.utils import is_int
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import matrix
-from kafkatest.services.security.kafka_acls import ACLs
-import time
-
-
-class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
-    """Tests a rolling upgrade from PLAINTEXT to a secured cluster
-    """
-
-    def __init__(self, test_context):
-        super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.acls = ACLs()
-        self.topic = "test_topic"
-        self.group = "group"
-        self.producer_throughput = 100
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-            "partitions": 3,
-            "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-        self.zk.start()
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
-
-        self.consumer.group_id = "group"
-
-    def bounce(self):
-        self.kafka.start_minikdc()
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-            time.sleep(10)
-
-    def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
-        # Roll cluster to include inter broker security protocol.
-        self.kafka.interbroker_security_protocol = broker_protocol
-        self.kafka.open_port(client_protocol)
-        self.kafka.open_port(broker_protocol)
-        self.bounce()
-
-        # Roll cluster to disable PLAINTEXT port
-        self.kafka.close_port('PLAINTEXT')
-        self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-        self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
-        self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
-        self.bounce()
-
-    def open_secured_port(self, client_protocol):
-        self.kafka.security_protocol = client_protocol
-        self.kafka.open_port(client_protocol)
-        self.kafka.start_minikdc()
-        self.bounce()
-
-    @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    def test_rolling_upgrade_phase_one(self, client_protocol):
-        """
-        Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
-        and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
-        """
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.security_protocol = "PLAINTEXT"
-        self.kafka.start()
-
-        # Create PLAINTEXT producer and consumer
-        self.create_producer_and_consumer()
-
-        # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
-        self.run_produce_consume_validate(self.open_secured_port, client_protocol)
-
-        # Now we can produce and consume via the secured port
-        self.kafka.security_protocol = client_protocol
-        self.create_producer_and_consumer()
-        self.run_produce_consume_validate(lambda: time.sleep(1))
-
-    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
-    def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
-        """
-        Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
-        Start an Producer and Consumer via the SECURED port
-        Incrementally upgrade to add inter-broker be the secure protocol
-        Incrementally upgrade again to add ACLs as well as disabling the PLAINTEXT port
-        Ensure the producer and consumer ran throughout
-        """
-        #Given we have a broker that has both secure and PLAINTEXT ports open
-        self.kafka.security_protocol = client_protocol
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.start()
-
-        #Create Secured Producer and Consumer
-        self.create_producer_and_consumer()
-
-        #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
-        self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/simple_consumer_shell_test.py b/tests/kafkatest/tests/simple_consumer_shell_test.py
deleted file mode 100644
index 74a7eeb..0000000
--- a/tests/kafkatest/tests/simple_consumer_shell_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ducktape.utils.util import wait_until
-from ducktape.tests.test import Test
-from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
-from kafkatest.services.verifiable_producer import VerifiableProducer
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-TOPIC = "topic-simple-consumer-shell"
-MAX_MESSAGES = 100
-NUM_PARTITIONS = 1
-REPLICATION_FACTOR = 1
-
-class SimpleConsumerShellTest(Test):
-    """
-    Tests SimpleConsumerShell tool
-    """
-    def __init__(self, test_context):
-        super(SimpleConsumerShellTest, self).__init__(test_context)
-        self.num_zk = 1
-        self.num_brokers = 1
-        self.messages_received_count = 0
-        self.topics = {
-            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
-        }
-
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
-    def setUp(self):
-        self.zk.start()
-
-    def start_kafka(self):
-        self.kafka = KafkaService(
-            self.test_context, self.num_brokers,
-            self.zk, topics=self.topics)
-        self.kafka.start()
-
-    def run_producer(self):
-        # This will produce to kafka cluster
-        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
-                   err_msg="Timeout awaiting messages to be produced and acked")
-
-    def start_simple_consumer_shell(self):
-        self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC)
-        self.simple_consumer_shell.start()
-
-    def test_simple_consumer_shell(self):
-        """
-        Tests if SimpleConsumerShell is fetching expected records
-        :return: None
-        """
-        self.start_kafka()
-        self.run_producer()
-        self.start_simple_consumer_shell()
-
-        # Assert that SimpleConsumerShell is fetching expected number of messages
-        wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10,
-                   err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/streams/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/__init__.py b/tests/kafkatest/tests/streams/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/streams/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/streams/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_bounce_test.py b/tests/kafkatest/tests/streams/streams_bounce_test.py
new file mode 100644
index 0000000..d674641
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_bounce_test.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import ignore
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+import time
+
+class StreamsBounceTest(KafkaTest):
+    """
+    Simple test of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 2 },
+            'data' : { 'partitions': 5, 'replication-factor': 2 },
+            'min' : { 'partitions': 5, 'replication-factor': 2 },
+            'max' : { 'partitions': 5, 'replication-factor': 2 },
+            'sum' : { 'partitions': 5, 'replication-factor': 2 },
+            'dif' : { 'partitions': 5, 'replication-factor': 2 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 2 },
+            'avg' : { 'partitions': 5, 'replication-factor': 2 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 2 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+    def test_bounce(self):
+        """
+        Start a smoke test client, then abort (kill -9) and restart it a few times.
+        Ensure that all records are delivered.
+        """
+
+        self.driver.start()
+
+        self.processor1.start()
+
+        time.sleep(15);
+
+        self.processor1.abortThenRestart()
+
+        time.sleep(15);
+
+        # enable this after we add change log partition replicas
+        #self.kafka.signal_leader("data")
+
+        #time.sleep(15);
+
+        self.processor1.abortThenRestart()
+
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor1.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/streams/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
new file mode 100644
index 0000000..e3c465a
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import ignore
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
+import time
+
+class StreamsSmokeTest(KafkaTest):
+    """
+    Simple test of Kafka Streams.
+    """
+
+    def __init__(self, test_context):
+        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'echo' : { 'partitions': 5, 'replication-factor': 1 },
+            'data' : { 'partitions': 5, 'replication-factor': 1 },
+            'min' : { 'partitions': 5, 'replication-factor': 1 },
+            'max' : { 'partitions': 5, 'replication-factor': 1 },
+            'sum' : { 'partitions': 5, 'replication-factor': 1 },
+            'dif' : { 'partitions': 5, 'replication-factor': 1 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'avg' : { 'partitions': 5, 'replication-factor': 1 },
+            'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
+            'tagg' : { 'partitions': 5, 'replication-factor': 1 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+        self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
+
+    def test_streams(self):
+        """
+        Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
+        Ensure that all results (stats on values computed by Kafka Streams) are correct.
+        """
+
+        self.driver.start()
+
+        self.processor1.start()
+        self.processor2.start()
+
+        time.sleep(15);
+
+        self.processor3.start()
+        self.processor1.stop()
+
+        time.sleep(15);
+
+        self.processor4.start();
+
+        self.driver.wait()
+        self.driver.stop()
+
+        self.processor2.stop()
+        self.processor3.stop()
+        self.processor4.stop()
+
+        node = self.driver.node
+        node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/streams_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py
deleted file mode 100644
index d674641..0000000
--- a/tests/kafkatest/tests/streams_bounce_test.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import ignore
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
-import time
-
-class StreamsBounceTest(KafkaTest):
-    """
-    Simple test of Kafka Streams.
-    """
-
-    def __init__(self, test_context):
-        super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
-            'echo' : { 'partitions': 5, 'replication-factor': 2 },
-            'data' : { 'partitions': 5, 'replication-factor': 2 },
-            'min' : { 'partitions': 5, 'replication-factor': 2 },
-            'max' : { 'partitions': 5, 'replication-factor': 2 },
-            'sum' : { 'partitions': 5, 'replication-factor': 2 },
-            'dif' : { 'partitions': 5, 'replication-factor': 2 },
-            'cnt' : { 'partitions': 5, 'replication-factor': 2 },
-            'avg' : { 'partitions': 5, 'replication-factor': 2 },
-            'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
-            'tagg' : { 'partitions': 5, 'replication-factor': 2 }
-        })
-
-        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
-
-    def test_bounce(self):
-        """
-        Start a smoke test client, then abort (kill -9) and restart it a few times.
-        Ensure that all records are delivered.
-        """
-
-        self.driver.start()
-
-        self.processor1.start()
-
-        time.sleep(15);
-
-        self.processor1.abortThenRestart()
-
-        time.sleep(15);
-
-        # enable this after we add change log partition replicas
-        #self.kafka.signal_leader("data")
-
-        #time.sleep(15);
-
-        self.processor1.abortThenRestart()
-
-        self.driver.wait()
-        self.driver.stop()
-
-        self.processor1.stop()
-
-        node = self.driver.node
-        node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py
deleted file mode 100644
index e3c465a..0000000
--- a/tests/kafkatest/tests/streams_smoke_test.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import ignore
-
-from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
-import time
-
-class StreamsSmokeTest(KafkaTest):
-    """
-    Simple test of Kafka Streams.
-    """
-
-    def __init__(self, test_context):
-        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
-            'echo' : { 'partitions': 5, 'replication-factor': 1 },
-            'data' : { 'partitions': 5, 'replication-factor': 1 },
-            'min' : { 'partitions': 5, 'replication-factor': 1 },
-            'max' : { 'partitions': 5, 'replication-factor': 1 },
-            'sum' : { 'partitions': 5, 'replication-factor': 1 },
-            'dif' : { 'partitions': 5, 'replication-factor': 1 },
-            'cnt' : { 'partitions': 5, 'replication-factor': 1 },
-            'avg' : { 'partitions': 5, 'replication-factor': 1 },
-            'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
-            'tagg' : { 'partitions': 5, 'replication-factor': 1 }
-        })
-
-        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
-        self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
-        self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
-        self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
-
-    def test_streams(self):
-        """
-        Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
-        Ensure that all results (stats on values computed by Kafka Streams) are correct.
-        """
-
-        self.driver.start()
-
-        self.processor1.start()
-        self.processor2.start()
-
-        time.sleep(15);
-
-        self.processor3.start()
-        self.processor1.stop()
-
-        time.sleep(15);
-
-        self.processor4.start();
-
-        self.driver.wait()
-        self.driver.stop()
-
-        self.processor2.stop()
-        self.processor3.stop()
-        self.processor4.stop()
-
-        node = self.driver.node
-        node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
deleted file mode 100644
index 7a7440a..0000000
--- a/tests/kafkatest/tests/templates/connect-distributed.properties
+++ /dev/null
@@ -1,46 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bootstrap.servers={{ kafka.bootstrap_servers() }}
-
-group.id={{ group|default("connect-cluster") }}
-
-key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
-value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
-{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
-key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
-value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.topic={{ OFFSETS_TOPIC }}
-config.storage.topic={{ CONFIG_TOPIC }}
-status.storage.topic={{ STATUS_TOPIC }}
-
-# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
-offset.flush.interval.ms=5000
-
-rest.advertised.host.name = {{ node.account.hostname }}
-
-
-# Reduce session timeouts so tests that kill workers don't need to wait as long to recover
-session.timeout.ms=10000
-consumer.session.timeout.ms=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-sink.properties b/tests/kafkatest/tests/templates/connect-file-sink.properties
deleted file mode 100644
index ad78bb3..0000000
--- a/tests/kafkatest/tests/templates/connect-file-sink.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-sink
-connector.class=FileStreamSink
-tasks.max=1
-file={{ OUTPUT_FILE }}
-topics={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-file-source.properties b/tests/kafkatest/tests/templates/connect-file-source.properties
deleted file mode 100644
index d2d5e97..0000000
--- a/tests/kafkatest/tests/templates/connect-file-source.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name=local-file-source
-connector.class=FileStreamSource
-tasks.max=1
-file={{ INPUT_FILE }}
-topic={{ TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-standalone.properties b/tests/kafkatest/tests/templates/connect-standalone.properties
deleted file mode 100644
index bf1daf7..0000000
--- a/tests/kafkatest/tests/templates/connect-standalone.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bootstrap.servers={{ kafka.bootstrap_servers() }}
-
-key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
-value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
-{% if key_converter is not defined or key_converter.endswith("JsonConverter") %}
-key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-{% if value_converter is not defined or value_converter.endswith("JsonConverter") %}
-value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
-{% endif %}
-
-internal.key.converter=org.apache.kafka.connect.json.JsonConverter
-internal.value.converter=org.apache.kafka.connect.json.JsonConverter
-internal.key.converter.schemas.enable=false
-internal.value.converter.schemas.enable=false
-
-offset.storage.file.filename={{ OFFSETS_FILE }}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/tools/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/__init__.py b/tests/kafkatest/tests/tools/__init__.py
new file mode 100644
index 0000000..ebc9bb3
--- /dev/null
+++ b/tests/kafkatest/tests/tools/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/tools/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py
new file mode 100644
index 0000000..42cfeea
--- /dev/null
+++ b/tests/kafkatest/tests/tools/log4j_appender_test.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
+from kafkatest.services.security.security_config import SecurityConfig
+
+TOPIC = "topic-log4j-appender"
+MAX_MESSAGES = 100
+
+class Log4jAppenderTest(Test):
+    """
+    Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic
+    """
+    def __init__(self, test_context):
+        super(Log4jAppenderTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': 1, 'replication-factor': 1}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_appender(self, security_protocol):
+        self.appender = KafkaLog4jAppender(self.test_context, self.num_brokers, self.kafka, TOPIC, MAX_MESSAGES,
+                                           security_protocol=security_protocol)
+        self.appender.start()
+
+    def custom_message_validator(self, msg):
+        if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg:
+            self.logger.debug("Received message: %s" % msg)
+            self.messages_received_count += 1
+
+
+    def start_consumer(self, security_protocol):
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+                                        consumer_timeout_ms=1000, new_consumer=enable_new_consumer,
+                                        message_validator=self.custom_message_validator)
+        self.consumer.start()
+
+    @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
+    def test_log4j_appender(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if KafkaLog4jAppender is producing to Kafka topic
+        :return: None
+        """
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_appender(security_protocol)
+        self.appender.wait()
+
+        self.start_consumer(security_protocol)
+        node = self.consumer.nodes[0]
+
+        wait_until(lambda: self.consumer.alive(node),
+            timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+
+        # Verify consumed messages count
+        wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10,
+                   err_msg="Timed out waiting to consume expected number of messages.")
+
+        self.consumer.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
deleted file mode 100644
index 9926f11..0000000
--- a/tests/kafkatest/tests/upgrade_test.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import parametrize
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.kafka.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import config_property
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.utils import is_int
-
-
-class TestUpgrade(ProduceConsumeValidateTest):
-
-    def __init__(self, test_context):
-        super(TestUpgrade, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        # Producer and consumer
-        self.producer_throughput = 10000
-        self.num_producers = 1
-        self.num_consumers = 1
-
-    def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
-        self.logger.info("First pass bounce - rolling upgrade")
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            node.version = TRUNK
-            node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version
-            node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version
-            self.kafka.start_node(node)
-
-        self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
-            if to_message_format_version is None:
-                del node.config[config_property.MESSAGE_FORMAT_VERSION]
-            else:
-                node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
-            self.kafka.start_node(node)
-
-
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["snappy"], new_consumer=True)
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
-    @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
-    def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
-        """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
-
-        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
-
-        If to_message_format_version is None, it means that we will upgrade to default (latest)
-        message format version. It is possible to upgrade to 0.10 brokers but still use message
-        format version 0.9
-
-        - Start 3 node broker cluster on version 'from_kafka_version'
-        - Start producer and consumer in the background
-        - Perform two-phase rolling upgrade
-            - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to
-            from_kafka_version and log.message.format.version set to from_kafka_version
-            - Second phase: remove inter.broker.protocol.version config with rolling bounce; if
-            to_message_format_version is set to 0.9, set log.message.format.version to
-            to_message_format_version, otherwise remove log.message.format.version config
-        - Finally, validate that every message acked by the producer was consumed by the consumer
-        """
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
-                                  version=KafkaVersion(from_kafka_version),
-                                  topics={self.topic: {"partitions": 3, "replication-factor": 3,
-                                                       'configs': {"min.insync.replicas": 2}}})
-        self.kafka.start()
-
-        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
-                                           self.topic, throughput=self.producer_throughput,
-                                           message_validator=is_int,
-                                           compression_types=compression_types,
-                                           version=KafkaVersion(from_kafka_version))
-
-        # TODO - reduce the timeout
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
-                                        self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer,
-                                        message_validator=is_int, version=KafkaVersion(from_kafka_version))
-
-        self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version,
-                                                                                        to_message_format_version))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45c585b4/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
deleted file mode 100644
index 7f80deb..0000000
--- a/tests/kafkatest/tests/zookeeper_security_upgrade_test.py
+++ /dev/null
@@ -1,117 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from kafkatest.services.security.kafka_acls import ACLs
-from kafkatest.utils import is_int
-import time
-
-class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
-    """Tests a rolling upgrade for zookeeper.
-    """
-
-    def __init__(self, test_context):
-        super(ZooKeeperSecurityUpgradeTest, self).__init__(test_context=test_context)
-
-    def setUp(self):
-        self.topic = "test_topic"
-        self.group = "group"
-        self.producer_throughput = 100
-        self.num_producers = 1
-        self.num_consumers = 1
-        self.acls = ACLs()
-
-        self.zk = ZookeeperService(self.test_context, num_nodes=3)
-
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
-            "partitions": 3,
-            "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-
-    def create_producer_and_consumer(self):
-        self.producer = VerifiableProducer(
-            self.test_context, self.num_producers, self.kafka, self.topic,
-            throughput=self.producer_throughput)
-
-        self.consumer = ConsoleConsumer(
-            self.test_context, self.num_consumers, self.kafka, self.topic,
-            consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
-
-        self.consumer.group_id = self.group
-
-    @property
-    def no_sasl(self):
-        return self.kafka.security_protocol == "PLAINTEXT" or self.kafka.security_protocol == "SSL"
-
-    @property
-    def is_secure(self):
-        return self.kafka.security_protocol == "SASL_PLAINTEXT" \
-               or self.kafka.security_protocol == "SSL" \
-               or self.kafka.security_protocol == "SASL_SSL"
-
-    def run_zk_migration(self):
-        # change zk config (auth provider + jaas login)
-        self.zk.kafka_opts = self.zk.security_system_properties
-        self.zk.zk_sasl = True
-        if self.no_sasl:
-            self.kafka.start_minikdc(self.zk.zk_principals)
-        # restart zk
-        for node in self.zk.nodes:
-            self.zk.stop_node(node)
-            self.zk.start_node(node)
-
-        # restart broker with jaas login
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-        # run migration tool
-        for node in self.zk.nodes:
-            self.zk.zookeeper_migration(node, "secure")
-
-        # restart broker with zookeeper.set.acl=true and acls
-        self.kafka.zk_set_acl = "true"
-        for node in self.kafka.nodes:
-            self.kafka.stop_node(node)
-            self.kafka.start_node(node)
-
-    @matrix(security_protocol=["PLAINTEXT","SSL","SASL_SSL","SASL_PLAINTEXT"])
-    def test_zk_security_upgrade(self, security_protocol):
-        self.zk.start()
-        self.kafka.security_protocol = security_protocol
-        self.kafka.interbroker_security_protocol = security_protocol
-
-        # set acls
-        if self.is_secure:
-            self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
-            self.acls.set_acls(security_protocol, self.kafka, self.zk, self.topic, self.group)
-
-        if(self.no_sasl):
-            self.kafka.start()
-        else:
-            self.kafka.start(self.zk.zk_principals)
-
-        #Create Producer and Consumer
-        self.create_producer_and_consumer()
-
-        #Run upgrade
-        self.run_produce_consume_validate(self.run_zk_migration)


Mime
View raw message