kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: HOTIFX: streams system test do not start up correctly
Date Wed, 25 Jan 2017 03:36:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk abe19fe69 -> 448c1a411


http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
new file mode 100644
index 0000000..73fe27c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.File;
+
+public class SmokeTestUtil {
+
+    public final static int WINDOW_SIZE = 100;
+    public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
+    public final static int END = Integer.MAX_VALUE;
+
+    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String
topic) {
+        return printProcessorSupplier(topic, false);
+    }
+
+    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String
topic, final boolean printOffset) {
+        return new ProcessorSupplier<Object, Object>() {
+            public Processor<Object, Object> get() {
+                return new AbstractProcessor<Object, Object>() {
+                    private int numRecordsProcessed = 0;
+                    private ProcessorContext context;
+
+                    @Override
+                    public void init(ProcessorContext context) {
+                        System.out.println("initializing processor: topic=" + topic + " taskId="
+ context.taskId());
+                        numRecordsProcessed = 0;
+                        this.context = context;
+                    }
+
+                    @Override
+                    public void process(Object key, Object value) {
+                        if (printOffset) System.out.println(">>> " + context.offset());
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records
from topic=" + topic);
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(long timestamp) {
+                    }
+
+                    @Override
+                    public void close() {
+                    }
+                };
+            }
+        };
+    }
+
+    public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>,
V, KeyValue<K, V>> {
+        public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
+            return new KeyValue<K, V>(winKey.key(), value);
+        }
+    }
+
+    public static class Agg {
+
+        public KeyValueMapper<String, Long, KeyValue<String, Long>> selector()
{
+            return new KeyValueMapper<String, Long, KeyValue<String, Long>>()
{
+                @Override
+                public KeyValue<String, Long> apply(String key, Long value) {
+                    return new KeyValue<>(value == null ? null : Long.toString(value),
1L);
+                }
+            };
+        }
+
+        public Initializer<Long> init() {
+            return new Initializer<Long>() {
+                @Override
+                public Long apply() {
+                    return 0L;
+                }
+            };
+        }
+
+        public Aggregator<String, Long, Long> adder() {
+            return new Aggregator<String, Long, Long>() {
+                @Override
+                public Long apply(String aggKey, Long value, Long aggregate) {
+                    return aggregate + value;
+                }
+            };
+        }
+
+        public Aggregator<String, Long, Long> remover() {
+            return new Aggregator<String, Long, Long>() {
+                @Override
+                public Long apply(String aggKey, Long value, Long aggregate) {
+                    return aggregate - value;
+                }
+            };
+        }
+    }
+
+    public static Serde<String> stringSerde = Serdes.String();
+
+    public static Serde<Integer> intSerde = Serdes.Integer();
+
+    public static Serde<Long> longSerde = Serdes.Long();
+
+    public static Serde<Double> doubleSerde = Serdes.Double();
+
+    public static File createDir(String path) throws Exception {
+        File dir = new File(path);
+
+        dir.mkdir();
+
+        return dir;
+    }
+
+    public static File createDir(File parent, String child) throws Exception {
+        File dir = new File(parent, child);
+
+        dir.mkdir();
+
+        return dir;
+    }
+
+    public static void sleep(long duration) {
+        try {
+            Thread.sleep(duration);
+        } catch (Exception ex) {
+            //
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 0000000..304cae7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= command kafka zookeeper stateDir
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception {
+        String kafka = args[0];
+        String stateDir = args.length > 1 ? args[1] : null;
+        String command = args.length > 2 ? args[2] : null;
+
+        System.out.println("StreamsTest instance started");
+        System.out.println("command=" + command);
+        System.out.println("kafka=" + kafka);
+        System.out.println("stateDir=" + stateDir);
+
+        switch (command) {
+            case "standalone":
+                SmokeTestDriver.main(args);
+                break;
+            case "run":
+                // this starts the driver (data generation and result verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka,
numKeys, maxRecordsPerKey);
+                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                break;
+            case "process":
+                // this starts a KafkaStreams client
+                final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka);
+                client.start();
+
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        client.close();
+                    }
+                });
+                break;
+            case "close-deadlock-test":
+                final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
+                test.start();
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index ab9b112..ebd69a6 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -25,7 +25,16 @@ class StreamsSimpleBenchmarkTest(KafkaTest):
     """
 
     def __init__(self, test_context):
-        super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1)
+        super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={
+            'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 },
+            'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 },
+            'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1
},
+            'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1
},
+            'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1
},
+            'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1
},
+            'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 },
+            'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }
+        })
 
         self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index e9fa2a7..4ccc3b2 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -27,3 +27,12 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService):
                                                             kafka,
                                                             "org.apache.kafka.streams.perf.SimpleBenchmark",
                                                             numrecs)
+
+    def collect_data(self, node):
+        # Collect the data and return it to the framework
+        output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
+        data = {}
+        for line in output:
+            parts = line.split(':')
+            data[parts[0]] = float(parts[1])
+        return data

http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 9250cd7..e1f292c 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -131,7 +131,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never
saw message indicating StreamsTest finished startup on " + str(node.account))
+            monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never
saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -143,7 +143,7 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
     def __init__(self, test_context, kafka, command):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
-                                                          "org.apache.kafka.streams.smoketest.StreamsSmokeTest",
+                                                          "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
 
 


Mime
View raw message