Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 96083200C3C for ; Sun, 5 Mar 2017 05:55:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 94876160B7F; Sun, 5 Mar 2017 04:55:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B54A9160B71 for ; Sun, 5 Mar 2017 05:55:20 +0100 (CET) Received: (qmail 22606 invoked by uid 500); 5 Mar 2017 04:55:19 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 22596 invoked by uid 99); 5 Mar 2017 04:55:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Mar 2017 04:55:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B7394DFC15; Sun, 5 Mar 2017 04:55:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Standardised benchmark params for consumer and streams Date: Sun, 5 Mar 2017 04:55:19 +0000 (UTC) archived-at: Sun, 05 Mar 2017 04:55:21 -0000 Repository: kafka Updated Branches: refs/heads/trunk adb70da13 -> b7378d567 MINOR: Standardised benchmark params for consumer and streams There were some minor differences in the basic consumer config and streams config that are now rectified. In addition, in AWS environments the socket size makes a big difference to performance and I've tuned it up accordingly. I've also increased the number of records now that perf is higher. Author: Eno Thereska Reviewers: Guozhang Wang Closes #2634 from enothereska/minor-standardize-params Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7378d56 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7378d56 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7378d56 Branch: refs/heads/trunk Commit: b7378d567fffd06395f5babc36cebd64bdf539d1 Parents: adb70da Author: Eno Thereska Authored: Sat Mar 4 20:55:16 2017 -0800 Committer: Guozhang Wang Committed: Sat Mar 4 20:55:16 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 ++++++++++++++++- .../streams/streams_simple_benchmark_test.py | 2 +- tests/kafkatest/services/streams.py | 2 +- 3 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index cf593e2..7a36d70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -104,6 +104,9 @@ public class SimpleBenchmark { private static int processedRecords = 0; private static long processedBytes = 0; private static final int VALUE_SIZE = 100; + private static final long POLL_MS = 500L; + private static final int MAX_POLL_RECORDS = 1000; + private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024; private static final Serde BYTE_SERDE = Serdes.ByteArray(); private static final Serde INTEGER_SERDE = Serdes.Integer(); @@ -207,8 +210,13 @@ public class SimpleBenchmark { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); + props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS); return props; } @@ -218,9 +226,16 @@ public class SimpleBenchmark { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + // the socket buffer needs to be large, especially when running in AWS with + // high latency. if running locally the default is fine. + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); return props; } @@ -516,7 +531,7 @@ public class SimpleBenchmark { long startTime = System.currentTimeMillis(); while (true) { - ConsumerRecords records = consumer.poll(500); + ConsumerRecords records = consumer.poll(POLL_MS); if (records.isEmpty()) { if (processedRecords == numRecords) break; http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index c1db8c8..c9f970e 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -30,7 +30,7 @@ class StreamsSimpleBenchmarkTest(Test): def __init__(self, test_context): super(StreamsSimpleBenchmarkTest, self).__init__(test_context) - self.num_records = 2000000L + self.num_records = 10000000L self.replication = 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 1e1c676..4f8f1a3 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): self.logger.info("Restarting Kafka Streams on " + str(node.account)) self.start_node(node) - def wait(self, timeout_sec=360): + def wait(self, timeout_sec=720): for node in self.nodes: self.wait_node(node, timeout_sec)