kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5438; Fix UnsupportedOperationException in WriteTxnMarkersRequest
Date Tue, 13 Jun 2017 23:15:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4b4102884 -> c2daf4c62


KAFKA-5438; Fix UnsupportedOperationException in WriteTxnMarkersRequest

Before this patch, the `partitionErrors` was an immutable map. As a result if a single producer
had a marker for multiple partitions, and if there were multiple response callbacks for a
single append, we would get an `UnsupportedOperationException` in the `writeTxnMarker` handler.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3313 from apurvam/KAFKA-5438-fix-unsupportedoperationexception-in-writetxnmarker


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2daf4c6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2daf4c6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2daf4c6

Branch: refs/heads/trunk
Commit: c2daf4c623e5919872012aff57ffa25af46a97f8
Parents: 4b41028
Author: Apurva Mehta <apurva@confluent.io>
Authored: Tue Jun 13 16:14:36 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Jun 13 16:14:36 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 36 ++++++++------
 .../kafka/api/TransactionsTest.scala            | 49 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c2daf4c6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5fb13a6..27eb816 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1492,7 +1492,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     authorizeClusterAction(request)
     val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
-    val errors = new ConcurrentHashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
+    val errors = new ConcurrentHashMap[java.lang.Long, util.Map[TopicPartition, Errors]]()
     val markers = writeTxnMarkersRequest.markers
     val numAppends = new AtomicInteger(markers.size)
 
@@ -1501,13 +1501,22 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    def sendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus:
Map[TopicPartition, PartitionResponse]): Unit = {
+    def updateErrors(producerId: Long, currentErrors: ConcurrentHashMap[TopicPartition, Errors]):
Unit = {
+      val previousErrors = errors.putIfAbsent(producerId, currentErrors)
+      if (previousErrors != null)
+        previousErrors.putAll(currentErrors)
+    }
+
+    /**
+      * This is the call back invoked when a log append of transaction markers succeeds.
This can be called multiple
+      * times when handling a single WriteTxnMarkersRequest because there is one append per
TransactionMarker in the
+      * request, so there could be multiple appends of markers to the log. The final response
will be sent only
+      * after all appends have returned.
+      */
+    def maybeSendResponseCallback(producerId: Long, result: TransactionResult)(responseStatus:
Map[TopicPartition, PartitionResponse]): Unit = {
       trace(s"End transaction marker append for producer id $producerId completed with status:
$responseStatus")
-      val partitionErrors = responseStatus.mapValues(_.error).asJava
-      val previous = errors.putIfAbsent(producerId, partitionErrors)
-      if (previous != null)
-        previous.putAll(partitionErrors)
-      
+      val currentErrors = new ConcurrentHashMap[TopicPartition, Errors](responseStatus.mapValues(_.error).asJava)
+      updateErrors(producerId, currentErrors)
       val successfulOffsetsPartitions = responseStatus.filter { case (topicPartition, partitionResponse)
=>
         topicPartition.topic == GROUP_METADATA_TOPIC_NAME && partitionResponse.error
== Errors.NONE
       }.keys
@@ -1520,8 +1529,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         } catch {
           case e: Exception =>
             error(s"Received an exception while trying to update the offsets cache on transaction
marker append", e)
-            val partitionErrors = errors.get(producerId)
-            successfulOffsetsPartitions.foreach(partitionErrors.put(_, Errors.UNKNOWN))
+            val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+            successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN))
+            updateErrors(producerId, updatedErrors)
         }
       }
 
@@ -1544,9 +1554,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       if (partitionsWithIncorrectMessageFormat.nonEmpty) {
-        val partitionErrors = new util.HashMap[TopicPartition, Errors]()
-        partitionsWithIncorrectMessageFormat.foreach { partition => partitionErrors.put(partition,
Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
-        errors.put(producerId, partitionErrors)
+        val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+        partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition,
Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
+        updateErrors(producerId, currentErrors)
       }
 
       if (goodPartitions.isEmpty) {
@@ -1568,7 +1578,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           internalTopicsAllowed = true,
           isFromClient = false,
           entriesPerPartition = controlRecords,
-          responseCallback = sendResponseCallback(producerId, marker.transactionResult))
+          responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c2daf4c6/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8762ea8..af241b7 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -56,7 +56,7 @@ class TransactionsTest extends KafkaServerTestHarness {
   @Before
   override def setUp(): Unit = {
     super.setUp()
-    val numPartitions = 3
+    val numPartitions = 4
     val topicConfig = new Properties()
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
     TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
@@ -434,6 +434,53 @@ class TransactionsTest extends KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testMultipleMarkersOneLeader(): Unit = {
+    val firstProducer = transactionalProducers.head
+    val consumer = transactionalConsumers.head
+    val unCommittedConsumer = nonTransactionalConsumers.head
+    val topicWith10Partitions = "largeTopic"
+    val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica"
+    val topicConfig = new Properties()
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+
+    TestUtils.createTopic(zkUtils, topicWith10Partitions, 10, numServers, servers, topicConfig)
+    TestUtils.createTopic(zkUtils, topicWith10PartitionsAndOneReplica, 10, 1, servers, new
Properties())
+
+    firstProducer.initTransactions()
+
+    firstProducer.beginTransaction()
+    sendTransactionalMessagesWithValueRange(firstProducer, topicWith10Partitions, 0, 5000,
willBeCommitted = false)
+    sendTransactionalMessagesWithValueRange(firstProducer, topicWith10PartitionsAndOneReplica,
5000, 10000, willBeCommitted = false)
+    firstProducer.abortTransaction()
+
+    firstProducer.beginTransaction()
+    sendTransactionalMessagesWithValueRange(firstProducer, topicWith10Partitions, 10000,
11000, willBeCommitted = true)
+    firstProducer.commitTransaction()
+
+    consumer.subscribe(List(topicWith10PartitionsAndOneReplica, topicWith10Partitions).asJava)
+    unCommittedConsumer.subscribe(List(topicWith10PartitionsAndOneReplica, topicWith10Partitions).asJava)
+
+    val records = consumeRecords(consumer, 1000)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
+    }
+
+    val allRecords = consumeRecords(unCommittedConsumer, 11000)
+    val expectedValues = Range(0, 11000).map(_.toString).toSet
+    allRecords.foreach { record =>
+      assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
+    }
+  }
+
+  private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte],
Array[Byte]], topic: String,
+                                                      start: Int, end: Int, willBeCommitted:
Boolean): Unit = {
+    for (i <- start until end) {
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic, i.toString,
i.toString, willBeCommitted))
+    }
+    producer.flush()
+  }
+
   private def serverProps() = {
     val serverProps = new Properties()
     serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)


Mime
View raw message