kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4901; Make ProduceRequest thread-safe
Date Thu, 06 Apr 2017 20:58:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 5edb09fcb -> 4def30c67


KAFKA-4901; Make ProduceRequest thread-safe

A more conservative version of the change for the 0.10.2
branch.

Trunk commit: 1659ca1773596b.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2810 from ijuma/kafka-4901-produce-request-thread-safety-0-10-2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4def30c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4def30c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4def30c6

Branch: refs/heads/0.10.2
Commit: 4def30c6720b45f6a766a1fc394a75230b66ef26
Parents: 5edb09f
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Apr 6 21:58:27 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Apr 6 21:58:27 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/requests/ProduceRequest.java   | 52 ++++++++++++++++++--
 .../common/requests/RequestResponseTest.java    | 43 ++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala | 14 +++---
 3 files changed, 97 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4def30c6/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index bd3ae8f..e0e0090 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +78,13 @@ public class ProduceRequest extends AbstractRequest {
 
     private final short acks;
     private final int timeout;
-    private final Map<TopicPartition, MemoryRecords> partitionRecords;
+
+    private final Map<TopicPartition, Integer> partitionSizes;
+
+    // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention
when a produce request is
+    // put in the purgatory (due to client throttling, it can take a while before the response
is sent).
+    // Care should be taken in methods that use this field.
+    private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
 
     private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition,
MemoryRecords> partitionRecords) {
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version)), version);
@@ -103,6 +110,14 @@ public class ProduceRequest extends AbstractRequest {
         this.acks = acks;
         this.timeout = timeout;
         this.partitionRecords = partitionRecords;
+        this.partitionSizes = createPartitionSizes(partitionRecords);
+    }
+
+    private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition,
MemoryRecords> partitionRecords) {
+        Map<TopicPartition, Integer> result = new HashMap<>(partitionRecords.size());
+        for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
+            result.put(entry.getKey(), entry.getValue().sizeInBytes());
+        return result;
     }
 
     public ProduceRequest(Struct struct, short version) {
@@ -118,11 +133,24 @@ public class ProduceRequest extends AbstractRequest {
                 partitionRecords.put(new TopicPartition(topic, partition), records);
             }
         }
+        partitionSizes = createPartitionSizes(partitionRecords);
         acks = struct.getShort(ACKS_KEY_NAME);
         timeout = struct.getInt(TIMEOUT_KEY_NAME);
     }
 
     @Override
+    public String toString() {
+        // Use the same format as `Struct.toString()`
+        StringBuilder bld = new StringBuilder();
+        bld.append("{acks=").append(acks)
+                .append(",timeout=").append(timeout)
+                .append(",partitionSizes=")
+                .append(Utils.mkString(partitionSizes, "[", "]", "=", ","))
+                .append("}");
+        return bld.toString();
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         /* In case the producer doesn't actually want any response */
         if (acks == 0)
@@ -131,8 +159,8 @@ public class ProduceRequest extends AbstractRequest {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
         ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.forException(e));
 
-        for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet())
-            responseMap.put(entry.getKey(), partitionResponse);
+        for (TopicPartition tp : partitions())
+            responseMap.put(tp, partitionResponse);
 
         short versionId = version();
         switch (versionId) {
@@ -147,6 +175,10 @@ public class ProduceRequest extends AbstractRequest {
         }
     }
 
+    private Collection<TopicPartition> partitions() {
+        return partitionSizes.keySet();
+    }
+
     public short acks() {
         return acks;
     }
@@ -155,13 +187,23 @@ public class ProduceRequest extends AbstractRequest {
         return timeout;
     }
 
-    public Map<TopicPartition, MemoryRecords> partitionRecords() {
+    /**
+     * Returns the partition records or throws IllegalStateException if clearPartitionRecords()
has been invoked.
+     */
+    public Map<TopicPartition, MemoryRecords> partitionRecordsOrFail() {
+        // Store it in a local variable to protect against concurrent updates
+        Map<TopicPartition, MemoryRecords> partitionRecords = this.partitionRecords;
+        if (partitionRecords == null)
+            throw new IllegalStateException("The partition records are no longer available
because " +
+                    "clearPartitionRecords() has been invoked.");
         return partitionRecords;
     }
 
     public void clearPartitionRecords() {
+        partitionRecords = null;
+        // It would be better to make this null, but the change is too large for 0.10.2.
In trunk, the struct field
+        // was removed
         struct.clear();
-        partitionRecords.clear();
     }
 
     public static ProduceRequest parse(ByteBuffer buffer, int versionId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4def30c6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69f6276..47313c4 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,6 +16,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
@@ -46,6 +47,7 @@ import java.util.Set;
 import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class RequestResponseTest {
 
@@ -181,6 +183,47 @@ public class RequestResponseTest {
     }
 
     @Test
+    public void produceRequestToStringTest() {
+        ProduceRequest request = createProduceRequest();
+        assertEquals(1, request.partitionRecordsOrFail().size());
+        assertTrue(request.toString().contains("partitionSizes"));
+
+        request.clearPartitionRecords();
+        try {
+            request.partitionRecordsOrFail();
+            fail("partitionRecordsOrFail should fail after clearPartitionRecords()");
+        } catch (IllegalStateException e) {
+            // OK
+        }
+
+        // `toString` should behave the same after `clearPartitionRecords`
+        assertTrue(request.toString().contains("partitionSizes"));
+    }
+
+    @Test
+    public void produceRequestGetErrorResponseTest() {
+        ProduceRequest request = createProduceRequest();
+        Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
+
+        ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+        assertEquals(partitions, errorResponse.responses().keySet());
+        ProduceResponse.PartitionResponse partitionResponse = errorResponse.responses().values().iterator().next();
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+        assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+
+        request.clearPartitionRecords();
+
+        // `getErrorResponse` should behave the same after `clearPartitionRecords`
+        errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
+        assertEquals(partitions, errorResponse.responses().keySet());
+        partitionResponse = errorResponse.responses().values().iterator().next();
+        assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
+        assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
+        assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+    }
+
+    @Test
     public void produceResponseVersionTest() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,

http://git-wip-us.apache.org/repos/asf/kafka/blob/4def30c6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 134793e..68c3dce 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -349,12 +349,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.body.asInstanceOf[ProduceRequest]
     val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics)
= produceRequest.partitionRecords.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic,
topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
-    }
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics)
=
+      produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
+        authorize(request.session, Describe, new Resource(auth.Topic, tp.topic)) &&
metadataCache.contains(tp.topic)
+      }
 
     val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition
{
-      case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic,
topicPartition.topic))
+      case (tp, _) => authorize(request.session, Write, new Resource(auth.Topic, tp.topic))
     }
 
     // the callback for sending a produce response
@@ -431,9 +432,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         authorizedRequestInfo,
         sendResponseCallback)
 
-      // if the request is put into the purgatory, it will have a held reference
-      // and hence cannot be garbage collected; hence we clear its data here in
-      // order to let GC re-claim its memory since it is already appended to log
+      // if the request is put into the purgatory, it will have a held reference and hence
cannot be garbage collected;
+      // hence we clear its data here inorder to let GC re-claim its memory since it is already
appended to log
       produceRequest.clearPartitionRecords()
     }
   }


Mime
View raw message