Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4B258200C15 for ; Wed, 25 Jan 2017 04:36:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 49AB0160B4B; Wed, 25 Jan 2017 03:36:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 44451160B3E for ; Wed, 25 Jan 2017 04:36:20 +0100 (CET) Received: (qmail 87402 invoked by uid 500); 25 Jan 2017 03:36:14 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 87391 invoked by uid 99); 25 Jan 2017 03:36:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 03:36:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3A19EDFBAD; Wed, 25 Jan 2017 03:36:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Date: Wed, 25 Jan 2017 03:36:14 -0000 Message-Id: <0b15e9ccb2624848afb586dbf16d9137@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kafka git commit: HOTIFX: streams system test do not start up correctly archived-at: Wed, 25 Jan 2017 03:36:21 -0000 Repository: kafka Updated Branches: refs/heads/trunk abe19fe69 -> 448c1a411 http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java new file mode 100644 index 0000000..73fe27c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.io.File; + +public class SmokeTestUtil { + + public final static int WINDOW_SIZE = 100; + public final static long START_TIME = 60000L * 60 * 24 * 365 * 30; + public final static int END = Integer.MAX_VALUE; + + public static ProcessorSupplier printProcessorSupplier(final String topic) { + return printProcessorSupplier(topic, false); + } + + public static ProcessorSupplier printProcessorSupplier(final String topic, final boolean printOffset) { + return new ProcessorSupplier() { + public Processor get() { + return new AbstractProcessor() { + private int numRecordsProcessed = 0; + private ProcessorContext context; + + @Override + public void init(ProcessorContext context) { + System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId()); + numRecordsProcessed = 0; + this.context = context; + } + + @Override + public void process(Object key, Object value) { + if (printOffset) System.out.println(">>> " + context.offset()); + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); + } + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + } + + public static final class Unwindow implements KeyValueMapper, V, KeyValue> { + public KeyValue apply(Windowed winKey, V value) { + return new KeyValue(winKey.key(), value); + } + } + + public static class Agg { + + public KeyValueMapper> selector() { + return new KeyValueMapper>() { + @Override + public KeyValue apply(String key, Long value) { + return new KeyValue<>(value == null ? null : Long.toString(value), 1L); + } + }; + } + + public Initializer init() { + return new Initializer() { + @Override + public Long apply() { + return 0L; + } + }; + } + + public Aggregator adder() { + return new Aggregator() { + @Override + public Long apply(String aggKey, Long value, Long aggregate) { + return aggregate + value; + } + }; + } + + public Aggregator remover() { + return new Aggregator() { + @Override + public Long apply(String aggKey, Long value, Long aggregate) { + return aggregate - value; + } + }; + } + } + + public static Serde stringSerde = Serdes.String(); + + public static Serde intSerde = Serdes.Integer(); + + public static Serde longSerde = Serdes.Long(); + + public static Serde doubleSerde = Serdes.Double(); + + public static File createDir(String path) throws Exception { + File dir = new File(path); + + dir.mkdir(); + + return dir; + } + + public static File createDir(File parent, String child) throws Exception { + File dir = new File(parent, child); + + dir.mkdir(); + + return dir; + } + + public static void sleep(long duration) { + try { + Thread.sleep(duration); + } catch (Exception ex) { + // + } + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java new file mode 100644 index 0000000..304cae7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +public class StreamsSmokeTest { + + /** + * args ::= command kafka zookeeper stateDir + * command := "run" | "process" + * + * @param args + */ + public static void main(String[] args) throws Exception { + String kafka = args[0]; + String stateDir = args.length > 1 ? args[1] : null; + String command = args.length > 2 ? args[2] : null; + + System.out.println("StreamsTest instance started"); + System.out.println("command=" + command); + System.out.println("kafka=" + kafka); + System.out.println("stateDir=" + stateDir); + + switch (command) { + case "standalone": + SmokeTestDriver.main(args); + break; + case "run": + // this starts the driver (data generation and result verification) + final int numKeys = 10; + final int maxRecordsPerKey = 500; + Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + break; + case "process": + // this starts a KafkaStreams client + final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka); + client.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + client.close(); + } + }); + break; + case "close-deadlock-test": + final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka); + test.start(); + break; + default: + System.out.println("unknown command: " + command); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index ab9b112..ebd69a6 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -25,7 +25,16 @@ class StreamsSimpleBenchmarkTest(KafkaTest): """ def __init__(self, test_context): - super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1) + super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={ + 'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 }, + 'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }, + 'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 } + }) self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L) http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/services/performance/streams_performance.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index e9fa2a7..4ccc3b2 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -27,3 +27,12 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService): kafka, "org.apache.kafka.streams.perf.SimpleBenchmark", numrecs) + + def collect_data(self, node): + # Collect the data and return it to the framework + output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE) + data = {} + for line in output: + parts = line.split(':') + data[parts[0]] = float(parts[1]) + return data http://git-wip-us.apache.org/repos/asf/kafka/blob/448c1a41/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 9250cd7..e1f292c 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -131,7 +131,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): self.logger.info("Starting StreamsTest process on " + str(node.account)) with node.account.monitor_log(self.STDOUT_FILE) as monitor: node.account.ssh(self.start_cmd(node)) - monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) + monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -143,7 +143,7 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService): def __init__(self, test_context, kafka, command): super(StreamsSmokeTestBaseService, self).__init__(test_context, kafka, - "org.apache.kafka.streams.smoketest.StreamsSmokeTest", + "org.apache.kafka.streams.tests.StreamsSmokeTest", command)