kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2950: Fix performance regression in the producer
Date Sat, 05 Dec 2015 23:57:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 39c3512ec -> 23f36c590


KAFKA-2950: Fix performance regression in the producer

Removes all the System.currentTimeMillis calls to help with performance on small messages.

Author: Jay Kreps <jay.kreps@gmail.com>

Reviewers: Guozhang Wang

Closes #632 from jkreps/producer-perf-regression


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23f36c59
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23f36c59
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23f36c59

Branch: refs/heads/trunk
Commit: 23f36c5903e9b65c2b322a9405149de671de7a23
Parents: 39c3512
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Sat Dec 5 15:57:15 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Dec 5 15:57:15 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 34 ++++----------------
 .../kafka/clients/producer/ProducerConfig.java  |  6 ++--
 2 files changed, 9 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23f36c59/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8586439..38fb6a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -409,8 +409,8 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
         try {
             // first make sure the metadata for the topic is available
-            long startTime = time.milliseconds();
-            waitOnMetadata(record.topic(), this.maxBlockTimeMs);
+            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
+            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
             byte[] serializedKey;
             try {
                 serializedKey = keySerializer.serialize(record.topic(), record.key());
@@ -419,7 +419,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()
+
                         " specified in key.serializer");
             }
-            checkMaybeGetRemainingTime(startTime);
             byte[] serializedValue;
             try {
                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
@@ -428,15 +427,12 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
                         " specified in value.serializer");
             }
-            checkMaybeGetRemainingTime(startTime);
             int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
-            checkMaybeGetRemainingTime(startTime);
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey,
serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record,
callback, record.topic(), partition);
-            long remainingTime = checkMaybeGetRemainingTime(startTime);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey,
serializedValue, callback, remainingTime);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey,
serializedValue, callback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full
or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -468,14 +464,15 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
      * Wait for cluster metadata including partitions for the given topic to be available.
      * @param topic The topic we want metadata for
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
+     * @return The amount of time we waited in ms
      */
-    private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException
{
+    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException
{
         // add topic to metadata topic list if it is not there already.
         if (!this.metadata.containsTopic(topic))
             this.metadata.add(topic);
 
         if (metadata.fetch().partitionsForTopic(topic) != null)
-            return;
+            return 0;
 
         long begin = time.milliseconds();
         long remainingWaitMs = maxWaitMs;
@@ -491,6 +488,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                 throw new TopicAuthorizationException(topic);
             remainingWaitMs = maxWaitMs - elapsed;
         }
+        return time.milliseconds() - begin;
     }
 
     /**
@@ -680,24 +678,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             cluster);
     }
 
-    /**
-     * Check and maybe get the time elapsed since startTime.
-     * Throws a {@link org.apache.kafka.common.errors.TimeoutException} if the  elapsed time
-     * is more than the max time to block (max.block.ms)
-     *
-     * @param startTime timestamp used to check the elapsed time
-     * @return remainingTime
-     */
-    private long checkMaybeGetRemainingTime(long startTime) {
-        long elapsedTime = time.milliseconds() - startTime;
-        if (elapsedTime > maxBlockTimeMs) {
-            throw new TimeoutException("Request timed out due to exceeding the maximum threshold
of " + maxBlockTimeMs + " ms");
-        }
-        long remainingTime = maxBlockTimeMs - elapsedTime;
-
-        return remainingTime;
-    }
-
     private static class FutureFailure implements Future<RecordMetadata> {
 
         private final ExecutionException exception;

http://git-wip-us.apache.org/repos/asf/kafka/blob/23f36c59/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 3eaea09..126d2a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -193,10 +193,8 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>max.block.ms</code> */
     public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
     private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link
KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block."
-                                                    + "These methods can be blocked for multiple
reasons. For e.g: buffer full, metadata unavailable."
-                                                    + "This configuration imposes maximum
limit on the total time spent in fetching metadata, serialization of key and value, partitioning
and "
-                                                    + "allocation of buffer memory when doing
a send(). In case of partitionsFor(), this configuration imposes a maximum time threshold
on waiting "
-                                                    + "for metadata";
+                                                    + "These methods can be blocked either
because the buffer is full or metadata unavailable."
+                                                    + "Blocking in the user-supplied serializers
or partitioner will not be counted against this timeout.";
 
     /** <code>request.timeout.ms</code> */
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;


Mime
View raw message