kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2531: Add Ducktape based tests for KafkaLog4jAppender
Date Sun, 27 Sep 2015 01:32:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 263c10ab7 -> b62f8ea43


KAFKA-2531: Add Ducktape based tests for KafkaLog4jAppender

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Geoff Anderson, Edwerd Ribeiro, Ewen Cheslack-Postava, Gwen Shapira

Closes #235 from SinghAsDev/KAFKA-2531


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b62f8ea4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b62f8ea4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b62f8ea4

Branch: refs/heads/trunk
Commit: b62f8ea43b6d5307f7274fbe8b7984dd5ee22239
Parents: 263c10a
Author: Ashish Singh <asingh@cloudera.com>
Authored: Sat Sep 26 18:32:49 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Sat Sep 26 18:32:49 2015 -0700

----------------------------------------------------------------------
 .../sanity_checks/test_console_consumer.py      |  20 +--
 .../kafkatest/services/kafka_log4j_appender.py  |  61 +++++++
 tests/kafkatest/tests/log4j_appender_test.py    |  62 +++++++
 tests/kafkatest/utils/__init__.py               |  15 ++
 tests/kafkatest/utils/remote_account.py         |  32 ++++
 .../clients/tools/VerifiableLog4jAppender.java  | 162 +++++++++++++++++++
 .../kafka/clients/tools/VerifiableProducer.java |   3 +-
 7 files changed, 335 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 5b061aa..4544c00 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -19,28 +19,10 @@ from ducktape.utils.util import wait_until
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.utils.remote_account import line_count, file_exists
 
 import time
 
-
-def file_exists(node, file):
-    """Quick and dirty check for existence of remote file."""
-    try:
-        node.account.ssh("cat " + file, allow_fail=False)
-        return True
-    except:
-        return False
-
-
-def line_count(node, file):
-    """Return the line count of file on node"""
-    out = [line for line in node.account.ssh_capture("wc -l %s" % file)]
-    if len(out) != 1:
-        raise Exception("Expected single line of output from wc -l")
-
-    return int(out[0].strip().split(" ")[0])
-
-
 class ConsoleConsumerTest(Test):
     """Sanity checks on console consumer service class."""
     def __init__(self, test_context):

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
new file mode 100644
index 0000000..11369aa
--- /dev/null
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+
+class KafkaLog4jAppender(BackgroundThreadService):
+
+    logs = {
+        "producer_log": {
+            "path": "/mnt/kafka_log4j_appender.log",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, max_messages=-1):
+        super(KafkaLog4jAppender, self).__init__(context, num_nodes)
+
+        self.kafka = kafka
+        self.topic = topic
+        self.max_messages = max_messages
+
+    def _worker(self, idx, node):
+        cmd = self.start_cmd
+        self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd))
+        node.account.ssh(cmd)
+
+    @property
+    def start_cmd(self):
+        cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableLog4jAppender"
\
+              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+        if self.max_messages > 0:
+            cmd += " --max-messages %s" % str(self.max_messages)
+
+        cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log
&"
+        return cmd
+
+    def stop_node(self, node):
+        node.account.kill_process("VerifiableKafkaLog4jAppender", allow_fail=False)
+        if self.worker_threads is None:
+            return
+
+        # block until the corresponding thread exits
+        if len(self.worker_threads) >= self.idx(node):
+            # Need to guard this because stop is preemptively called before the worker threads
are added and started
+            self.worker_threads[self.idx(node) - 1].join()
+
+    def clean_node(self, node):
+        node.account.kill_process("VerifiableKafkaLog4jAppender", clean_shutdown=False, allow_fail=False)
+        node.account.ssh("rm -rf /mnt/kafka_log4j_appender.log", allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/tests/log4j_appender_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/log4j_appender_test.py b/tests/kafkatest/tests/log4j_appender_test.py
new file mode 100644
index 0000000..0875dbe
--- /dev/null
+++ b/tests/kafkatest/tests/log4j_appender_test.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
+
+import time
+
+TOPIC = "topic-log4j-appender"
+MAX_MESSAGES = 100
+
+class Log4jAppenderTest(KafkaTest):
+    """
+    Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints
to a Kafka topic
+    """
+    def __init__(self, test_context):
+        super(Log4jAppenderTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            TOPIC: {'partitions': 1, 'replication-factor': 1}
+        })
+        self.num_nodes = 1
+
+        self.appender = KafkaLog4jAppender(self.test_context, self.num_nodes, self.kafka,
TOPIC, MAX_MESSAGES)
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_nodes, kafka=self.kafka,
topic=TOPIC, consumer_timeout_ms=1000)
+
+    def test_log4j_appender(self):
+        """
+        Tests if KafkaLog4jAppender is producing to Kafka topic
+        :return: None
+        """
+        self.appender.start()
+        self.appender.wait()
+
+        t0 = time.time()
+        self.consumer.start()
+        node = self.consumer.nodes[0]
+
+        wait_until(lambda: self.consumer.alive(node),
+            timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+        self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
+
+        # Verify consumed messages count
+        expected_lines_count = MAX_MESSAGES * 2  # two times to account for new lines introduced
by log4j
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count,
timeout_sec=10,
+                   err_msg="Timed out waiting to consume expected number of messages.")
+
+        self.consumer.stop_node(node)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py
new file mode 100644
index 0000000..cff6d2b
--- /dev/null
+++ b/tests/kafkatest/utils/__init__.py
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tests/kafkatest/utils/remote_account.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/utils/remote_account.py b/tests/kafkatest/utils/remote_account.py
new file mode 100644
index 0000000..b69a591
--- /dev/null
+++ b/tests/kafkatest/utils/remote_account.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def file_exists(node, file):
+    """Quick and dirty check for existence of remote file."""
+    try:
+        node.account.ssh("cat " + file, allow_fail=False)
+        return True
+    except:
+        return False
+
+
+def line_count(node, file):
+    """Return the line count of file on node"""
+    out = [line for line in node.account.ssh_capture("wc -l %s" % file)]
+    if len(out) != 1:
+        raise Exception("Expected single line of output from wc -l")
+
+    return int(out[0].strip().split(" ")[0])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
new file mode 100644
index 0000000..e78f96a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * Primarily intended for use with system testing, this appender produces message
+ * to Kafka on each "append" request. For example, this helps with end-to-end tests
+ * of KafkaLog4jAppender.
+ *
+ * When used as a command-line tool, it appends increasing integers. It will produce a
+ * fixed number of messages unless the default max-messages -1 is used, in which case
+ * it appends indefinitely.
+ */
+
+public class VerifiableLog4jAppender {
+    Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);
+
+    // If maxMessages < 0, log until the process is killed externally
+    private long maxMessages = -1;
+
+    // Hook to trigger logging thread to stop logging messages
+    private volatile boolean stopLogging = false;
+
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("verifiable-log4j-appender")
+            .defaultHelp(true)
+            .description("This tool produces increasing integers to the specified topic using
KafkaLog4jAppender.");
+
+        parser.addArgument("--topic")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("TOPIC")
+            .help("Produce messages to this topic.");
+
+        parser.addArgument("--broker-list")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+            .dest("brokerList")
+            .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--max-messages")
+            .action(store())
+            .required(false)
+            .setDefault(-1)
+            .type(Integer.class)
+            .metavar("MAX-MESSAGES")
+            .dest("maxMessages")
+            .help("Produce this many messages. If -1, produce messages until the process
is killed externally.");
+
+        parser.addArgument("--acks")
+            .action(store())
+            .required(false)
+            .setDefault("-1")
+            .type(String.class)
+            .choices("0", "1", "-1")
+            .metavar("ACKS")
+            .help("Acks required on each produced message. See Kafka docs on request.required.acks
for details.");
+
+        return parser;
+    }
+
+    /** Construct a VerifiableLog4jAppender object from command-line arguments. */
+    public static VerifiableLog4jAppender createFromArgs(String[] args) {
+        ArgumentParser parser = argParser();
+        VerifiableLog4jAppender producer = null;
+
+        try {
+            Namespace res = parser.parseArgs(args);
+
+            int maxMessages = res.getInt("maxMessages");
+            String topic = res.getString("topic");
+
+
+            Properties props = new Properties();
+            props.setProperty("log4j.rootLogger", "INFO, KAFKA");
+            props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
+            props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
+            props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c
- %m%n");
+            props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList"));
+            props.setProperty("log4j.appender.KAFKA.Topic", topic);
+            props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
+            props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
+            props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+            producer = new VerifiableLog4jAppender(props, maxMessages);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+
+        return producer;
+    }
+
+
+    public VerifiableLog4jAppender(Properties props, int maxMessages) {
+        this.maxMessages = maxMessages;
+        PropertyConfigurator.configure(props);
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        final VerifiableLog4jAppender appender = createFromArgs(args);
+        boolean infinite = appender.maxMessages < 0;
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Trigger main thread to stop producing messages
+                appender.stopLogging = true;
+            }
+        });
+
+        long maxMessages = infinite ? Long.MAX_VALUE: appender.maxMessages;
+        for (long i = 0; i < maxMessages; i++) {
+            if (appender.stopLogging) {
+                break;
+            }
+            appender.append(String.format("%d", i));
+        }
+    }
+
+    private void append(String msg) {
+        logger.info(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62f8ea4/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
index b04876f..b195093 100644
--- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
@@ -291,7 +291,8 @@ public class VerifiableProducer {
         });
 
         ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
-        for (int i = 0; i < producer.maxMessages || infinite; i++) {
+        long maxMessages = infinite ? Long.MAX_VALUE: producer.maxMessages;
+        for (long i = 0; i < maxMessages; i++) {
             if (producer.stopProducing) {
                 break;
             }


Mime
View raw message