kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3381: Add system test for SimpleConsumerShell
Date Thu, 31 Mar 2016 02:33:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 ef676c15c -> be822510c


KAFKA-3381: Add system test for SimpleConsumerShell

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Geoff Anderson

Closes #1053 from SinghAsDev/KAFKA-3381

(cherry picked from commit 9f6a6f97134a1d4969c91c4b4e9037b376e03440)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be822510
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be822510
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be822510

Branch: refs/heads/0.10.0
Commit: be822510c63d720dedf286c95588fc0c25a6a49c
Parents: ef676c1
Author: Ashish Singh <asingh@cloudera.com>
Authored: Wed Mar 30 19:33:37 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Wed Mar 30 19:33:48 2016 -0700

----------------------------------------------------------------------
 .../kafkatest/services/simple_consumer_shell.py | 69 ++++++++++++++++++
 .../tests/simple_consumer_shell_test.py         | 75 ++++++++++++++++++++
 2 files changed, 144 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be822510/tests/kafkatest/services/simple_consumer_shell.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py
new file mode 100644
index 0000000..8deee85
--- /dev/null
+++ b/tests/kafkatest/services/simple_consumer_shell.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir
+
+
+class SimpleConsumerShell(BackgroundThreadService):
+
+    logs = {
+        "simple_consumer_shell_log": {
+            "path": "/mnt/simple_consumer_shell.log",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, partition=0):
+        super(SimpleConsumerShell, self).__init__(context, num_nodes)
+
+        self.kafka = kafka
+        self.topic = topic
+        self.partition = partition
+        self.output = ""
+
+    def _worker(self, idx, node):
+        cmd = self.start_cmd(node)
+        self.logger.debug("SimpleConsumerShell %d command: %s" % (idx, cmd))
+        self.output = ""
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            self.output += line
+        self.logger.debug(self.output)
+
+    def start_cmd(self, node):
+        cmd = "/opt/%s/bin/" % kafka_dir(node)
+        cmd += "kafka-run-class.sh kafka.tools.SimpleConsumerShell"
+        cmd += " --topic %s --broker-list %s --partition %s --no-wait-at-logend" % (self.topic,
self.kafka.bootstrap_servers(), self.partition)
+
+        cmd += " 2>> /mnt/get_simple_consumer_shell.log | tee -a /mnt/get_simple_consumer_shell.log
&"
+        return cmd
+
+    def get_output(self):
+        return self.output
+
+    def stop_node(self, node):
+        node.account.kill_process("SimpleConsumerShell", allow_fail=False)
+        if self.worker_threads is None:
+            return
+
+        # block until the corresponding thread exits
+        if len(self.worker_threads) >= self.idx(node):
+            # Need to guard this because stop is preemptively called before the worker threads
are added and started
+            self.worker_threads[self.idx(node) - 1].join()
+
+    def clean_node(self, node):
+        node.account.kill_process("SimpleConsumerShell", clean_shutdown=False, allow_fail=False)
+        node.account.ssh("rm -rf /mnt/simple_consumer_shell.log", allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be822510/tests/kafkatest/tests/simple_consumer_shell_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/simple_consumer_shell_test.py b/tests/kafkatest/tests/simple_consumer_shell_test.py
new file mode 100644
index 0000000..74a7eeb
--- /dev/null
+++ b/tests/kafkatest/tests/simple_consumer_shell_test.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.simple_consumer_shell import SimpleConsumerShell
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+TOPIC = "topic-simple-consumer-shell"
+MAX_MESSAGES = 100
+NUM_PARTITIONS = 1
+REPLICATION_FACTOR = 1
+
+class SimpleConsumerShellTest(Test):
+    """
+    Tests SimpleConsumerShell tool
+    """
+    def __init__(self, test_context):
+        super(SimpleConsumerShellTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, topics=self.topics)
+        self.kafka.start()
+
+    def run_producer(self):
+        # This will produce to kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka,
topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES)
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10,
+                   err_msg="Timeout awaiting messages to be produced and acked")
+
+    def start_simple_consumer_shell(self):
+        self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka,
TOPIC)
+        self.simple_consumer_shell.start()
+
+    def test_simple_consumer_shell(self):
+        """
+        Tests if SimpleConsumerShell is fetching expected records
+        :return: None
+        """
+        self.start_kafka()
+        self.run_producer()
+        self.start_simple_consumer_shell()
+
+        # Assert that SimpleConsumerShell is fetching expected number of messages
+        wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES
+ 1), timeout_sec=10,
+                   err_msg="Timed out waiting to receive expected number of messages.")
\ No newline at end of file


Mime
View raw message