kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Standardised benchmark params for consumer and streams
Date Sun, 05 Mar 2017 04:55:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk adb70da13 -> b7378d567


MINOR: Standardised benchmark params for consumer and streams

There were some minor differences in the basic consumer config and streams config that are
now rectified. In addition, in AWS environments the socket size makes a big difference to
performance and I've tuned it up accordingly. I've also increased the number of records now
that perf is higher.

Author: Eno Thereska <eno@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2634 from enothereska/minor-standardize-params


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7378d56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7378d56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7378d56

Branch: refs/heads/trunk
Commit: b7378d567fffd06395f5babc36cebd64bdf539d1
Parents: adb70da
Author: Eno Thereska <eno@confluent.io>
Authored: Sat Mar 4 20:55:16 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Mar 4 20:55:16 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 ++++++++++++++++-
 .../streams/streams_simple_benchmark_test.py       |  2 +-
 tests/kafkatest/services/streams.py                |  2 +-
 3 files changed, 18 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index cf593e2..7a36d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -104,6 +104,9 @@ public class SimpleBenchmark {
     private static int processedRecords = 0;
     private static long processedBytes = 0;
     private static final int VALUE_SIZE = 100;
+    private static final long POLL_MS = 500L;
+    private static final int MAX_POLL_RECORDS = 1000;
+    private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
 
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
@@ -207,8 +210,13 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
         return props;
     }
 
@@ -218,9 +226,16 @@ public class SimpleBenchmark {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
         return props;
     }
 
@@ -516,7 +531,7 @@ public class SimpleBenchmark {
         long startTime = System.currentTimeMillis();
 
         while (true) {
-            ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
+            ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
             if (records.isEmpty()) {
                 if (processedRecords == numRecords)
                     break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index c1db8c8..c9f970e 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -30,7 +30,7 @@ class StreamsSimpleBenchmarkTest(Test):
 
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
-        self.num_records = 2000000L
+        self.num_records = 10000000L
         self.replication = 1
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7378d56/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 1e1c676..4f8f1a3 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -97,7 +97,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             self.logger.info("Restarting Kafka Streams on " + str(node.account))
             self.start_node(node)
 
-    def wait(self, timeout_sec=360):
+    def wait(self, timeout_sec=720):
         for node in self.nodes:
             self.wait_node(node, timeout_sec)
 


Mime
View raw message