Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0F04419DA9 for ; Mon, 11 Apr 2016 23:09:19 +0000 (UTC) Received: (qmail 27219 invoked by uid 500); 11 Apr 2016 23:09:18 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 27054 invoked by uid 500); 11 Apr 2016 23:09:17 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 26687 invoked by uid 99); 11 Apr 2016 23:09:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Apr 2016 23:09:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8FC9EE09C7; Mon, 11 Apr 2016 23:09:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gwenshap@apache.org To: commits@kafka.apache.org Date: Mon, 11 Apr 2016 23:09:25 -0000 Message-Id: <00a058a8f76e4c05a5b50e0887b15eb8@git.apache.org> In-Reply-To: <757126e890b149f09c8bbd827e426930@git.apache.org> References: <757126e890b149f09c8bbd827e426930@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] kafka git commit: KAFKA-3381: Add system test for SimpleConsumerShell KAFKA-3381: Add system test for SimpleConsumerShell Author: Ashish Singh Reviewers: Geoff Anderson Closes #1053 from SinghAsDev/KAFKA-3381 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f6a6f97 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f6a6f97 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f6a6f97 Branch: refs/heads/0.10.0 Commit: 9f6a6f97134a1d4969c91c4b4e9037b376e03440 Parents: 78d91dc Author: Ashish Singh Authored: Wed Mar 30 19:33:37 2016 -0700 Committer: Gwen Shapira Committed: Wed Mar 30 19:33:37 2016 -0700 ---------------------------------------------------------------------- .../kafkatest/services/simple_consumer_shell.py | 69 ++++++++++++++++++ .../tests/simple_consumer_shell_test.py | 75 ++++++++++++++++++++ 2 files changed, 144 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6a6f97/tests/kafkatest/services/simple_consumer_shell.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/simple_consumer_shell.py b/tests/kafkatest/services/simple_consumer_shell.py new file mode 100644 index 0000000..8deee85 --- /dev/null +++ b/tests/kafkatest/services/simple_consumer_shell.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir + + +class SimpleConsumerShell(BackgroundThreadService): + + logs = { + "simple_consumer_shell_log": { + "path": "/mnt/simple_consumer_shell.log", + "collect_default": False} + } + + def __init__(self, context, num_nodes, kafka, topic, partition=0): + super(SimpleConsumerShell, self).__init__(context, num_nodes) + + self.kafka = kafka + self.topic = topic + self.partition = partition + self.output = "" + + def _worker(self, idx, node): + cmd = self.start_cmd(node) + self.logger.debug("SimpleConsumerShell %d command: %s" % (idx, cmd)) + self.output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + self.output += line + self.logger.debug(self.output) + + def start_cmd(self, node): + cmd = "/opt/%s/bin/" % kafka_dir(node) + cmd += "kafka-run-class.sh kafka.tools.SimpleConsumerShell" + cmd += " --topic %s --broker-list %s --partition %s --no-wait-at-logend" % (self.topic, self.kafka.bootstrap_servers(), self.partition) + + cmd += " 2>> /mnt/get_simple_consumer_shell.log | tee -a /mnt/get_simple_consumer_shell.log &" + return cmd + + def get_output(self): + return self.output + + def stop_node(self, node): + node.account.kill_process("SimpleConsumerShell", allow_fail=False) + if self.worker_threads is None: + return + + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + node.account.kill_process("SimpleConsumerShell", clean_shutdown=False, allow_fail=False) + node.account.ssh("rm -rf /mnt/simple_consumer_shell.log", allow_fail=False) http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6a6f97/tests/kafkatest/tests/simple_consumer_shell_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/simple_consumer_shell_test.py b/tests/kafkatest/tests/simple_consumer_shell_test.py new file mode 100644 index 0000000..74a7eeb --- /dev/null +++ b/tests/kafkatest/tests/simple_consumer_shell_test.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from kafkatest.services.simple_consumer_shell import SimpleConsumerShell +from kafkatest.services.verifiable_producer import VerifiableProducer + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +TOPIC = "topic-simple-consumer-shell" +MAX_MESSAGES = 100 +NUM_PARTITIONS = 1 +REPLICATION_FACTOR = 1 + +class SimpleConsumerShellTest(Test): + """ + Tests SimpleConsumerShell tool + """ + def __init__(self, test_context): + super(SimpleConsumerShellTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + self.messages_received_count = 0 + self.topics = { + TOPIC: {'partitions': NUM_PARTITIONS, 'replication-factor': REPLICATION_FACTOR} + } + + self.zk = ZookeeperService(test_context, self.num_zk) + + def setUp(self): + self.zk.start() + + def start_kafka(self): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + self.zk, topics=self.topics) + self.kafka.start() + + def run_producer(self): + # This will produce to kafka cluster + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, max_messages=MAX_MESSAGES) + self.producer.start() + wait_until(lambda: self.producer.num_acked == MAX_MESSAGES, timeout_sec=10, + err_msg="Timeout awaiting messages to be produced and acked") + + def start_simple_consumer_shell(self): + self.simple_consumer_shell = SimpleConsumerShell(self.test_context, 1, self.kafka, TOPIC) + self.simple_consumer_shell.start() + + def test_simple_consumer_shell(self): + """ + Tests if SimpleConsumerShell is fetching expected records + :return: None + """ + self.start_kafka() + self.run_producer() + self.start_simple_consumer_shell() + + # Assert that SimpleConsumerShell is fetching expected number of messages + wait_until(lambda: self.simple_consumer_shell.get_output().count("\n") == (MAX_MESSAGES + 1), timeout_sec=10, + err_msg="Timed out waiting to receive expected number of messages.") \ No newline at end of file