kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5136: move coordinatorEpoch from WriteTxnMarkerRequest to TxnMarkerEntry
Date Tue, 02 May 2017 00:16:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 67f1f4d27 -> 94a35fd93


KAFKA-5136: move coordinatorEpoch from WriteTxnMarkerRequest to TxnMarkerEntry

Moving the coordinatorEpoch from WriteTxnMarkerRequest to TxnMarkerEntry will generate fewer
broker send requests

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #2925 from dguy/tc-write-txn-request-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/324b475e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/324b475e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/324b475e

Branch: refs/heads/trunk
Commit: 324b475eca48502fb16c8efd0de99756f68437bf
Parents: 67f1f4d
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon May 1 17:15:32 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon May 1 17:15:32 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  8 +-
 .../common/requests/WriteTxnMarkersRequest.java | 46 +++++-----
 .../common/requests/RequestResponseTest.java    |  6 +-
 .../transaction/TransactionMarkerChannel.scala  | 97 ++++++++++++--------
 .../TransactionMarkerChannelManager.scala       | 19 ++--
 ...nsactionMarkerRequestCompletionHandler.scala | 19 ++--
 .../TransactionMarkerChannelManagerTest.scala   | 45 ++++-----
 .../TransactionMarkerChannelTest.scala          | 38 ++++----
 ...tionMarkerRequestCompletionHandlerTest.scala | 19 ++--
 9 files changed, 149 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 3da2b3f..14471da 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1457,13 +1457,13 @@ public class Protocol {
                     new ArrayOf(new Schema(
                             new Field("topic", STRING),
                             new Field("partitions", new ArrayOf(INT32)))),
-                    "The partitions to write markers for.")
+                    "The partitions to write markers for."),
+            new Field("coordinator_epoch",
+                      INT32,
+                      "Epoch associated with the transaction state partition hosted by this
transaction coordinator")
     );
 
     public static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema(
-            new Field("coordinator_epoch",
-                    INT32,
-                    "Epoch associated with the transaction state partition hosted by this
transaction coordinator."),
             new Field("transaction_markers",
                     new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0),
                     "The transaction markers to be written.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 7cded24..0c09880 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -43,12 +43,18 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     public static class TxnMarkerEntry {
         private final long producerId;
         private final short producerEpoch;
+        private final int coordinatorEpoch;
         private final TransactionResult result;
         private final List<TopicPartition> partitions;
 
-        public TxnMarkerEntry(long producerId, short producerEpoch, TransactionResult result,
List<TopicPartition> partitions) {
+        public TxnMarkerEntry(long producerId,
+                              short producerEpoch,
+                              int coordinatorEpoch,
+                              TransactionResult result,
+                              List<TopicPartition> partitions) {
             this.producerId = producerId;
             this.producerEpoch = producerEpoch;
+            this.coordinatorEpoch = coordinatorEpoch;
             this.result = result;
             this.partitions = partitions;
         }
@@ -61,6 +67,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             return producerEpoch;
         }
 
+        public int coordinatorEpoch() {
+            return coordinatorEpoch;
+        }
+
         public TransactionResult transactionResult() {
             return result;
         }
@@ -73,8 +83,9 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         @Override
         public String toString() {
             return "TxnMarkerEntry{" +
-                    "pid=" + producerId +
-                    ", epoch=" + producerEpoch +
+                    "producerId=" + producerId +
+                    ", producerEpoch=" + producerEpoch +
+                    ", coordinatorEpoch=" + coordinatorEpoch +
                     ", result=" + result +
                     ", partitions=" + partitions +
                     '}';
@@ -87,47 +98,41 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             final TxnMarkerEntry that = (TxnMarkerEntry) o;
             return producerId == that.producerId &&
                     producerEpoch == that.producerEpoch &&
+                    coordinatorEpoch == that.coordinatorEpoch &&
                     result == that.result &&
                     Objects.equals(partitions, that.partitions);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(producerId, producerEpoch, result, partitions);
+            return Objects.hash(producerId, producerEpoch, coordinatorEpoch, result, partitions);
         }
     }
 
     public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest>
{
-        private final int coordinatorEpoch;
         private final List<TxnMarkerEntry> markers;
 
-        public Builder(int coordinatorEpoch, List<TxnMarkerEntry> markers) {
+        public Builder(List<TxnMarkerEntry> markers) {
             super(ApiKeys.WRITE_TXN_MARKERS);
-
             this.markers = markers;
-            this.coordinatorEpoch = coordinatorEpoch;
         }
 
         @Override
         public WriteTxnMarkersRequest build(short version) {
-            return new WriteTxnMarkersRequest(version, coordinatorEpoch, markers);
+            return new WriteTxnMarkersRequest(version, markers);
         }
     }
 
-    private final int coordinatorEpoch;
     private final List<TxnMarkerEntry> markers;
 
-    private WriteTxnMarkersRequest(short version, int coordinatorEpoch, List<TxnMarkerEntry>
markers) {
+    private WriteTxnMarkersRequest(short version, List<TxnMarkerEntry> markers) {
         super(version);
 
         this.markers = markers;
-        this.coordinatorEpoch = coordinatorEpoch;
     }
 
     public WriteTxnMarkersRequest(Struct struct, short version) {
         super(version);
-        this.coordinatorEpoch = struct.getInt(COORDINATOR_EPOCH_KEY_NAME);
-
         List<TxnMarkerEntry> markers = new ArrayList<>();
         Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
         for (Object markerObj : markersArray) {
@@ -135,6 +140,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
 
             long producerId = markerStruct.getLong(PID_KEY_NAME);
             short producerEpoch = markerStruct.getShort(EPOCH_KEY_NAME);
+            int coordinatorEpoch = markerStruct.getInt(COORDINATOR_EPOCH_KEY_NAME);
             TransactionResult result = TransactionResult.forId(markerStruct.getBoolean(TRANSACTION_RESULT_KEY_NAME));
 
             List<TopicPartition> partitions = new ArrayList<>();
@@ -147,15 +153,12 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
                 }
             }
 
-            markers.add(new TxnMarkerEntry(producerId, producerEpoch, result, partitions));
+            markers.add(new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result,
partitions));
         }
 
         this.markers = markers;
     }
 
-    public int coordinatorEpoch() {
-        return coordinatorEpoch;
-    }
 
     public List<TxnMarkerEntry> markers() {
         return markers;
@@ -164,7 +167,6 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.requestSchema(version()));
-        struct.set(COORDINATOR_EPOCH_KEY_NAME, coordinatorEpoch);
 
         Object[] markersArray = new Object[markers.size()];
         int i = 0;
@@ -172,6 +174,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
             markerStruct.set(PID_KEY_NAME, entry.producerId);
             markerStruct.set(EPOCH_KEY_NAME, entry.producerEpoch);
+            markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch);
             markerStruct.set(TRANSACTION_RESULT_KEY_NAME, entry.result.id);
 
             Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(entry.partitions);
@@ -216,12 +219,11 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
-        return coordinatorEpoch == that.coordinatorEpoch &&
-                Objects.equals(markers, that.markers);
+        return Objects.equals(markers, that.markers);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(coordinatorEpoch, markers);
+        return Objects.hash(markers);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 422f9e6..7c53b54 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -891,9 +891,9 @@ public class RequestResponseTest {
     }
 
     private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
-        return new WriteTxnMarkersRequest.Builder(73,
-            Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short)
42, TransactionResult.ABORT,
-                Collections.singletonList(new TopicPartition("topic", 73))))).build();
+        return new WriteTxnMarkersRequest.Builder(
+            Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short)
42, 73, TransactionResult.ABORT,
+                                                                                Collections.singletonList(new
TopicPartition("topic", 73))))).build();
     }
 
     private WriteTxnMarkersResponse createWriteTxnMarkersResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
index 8eb5a8b..cad3ea5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
@@ -17,7 +17,7 @@
 package kafka.coordinator.transaction
 
 import java.util
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
 
 import kafka.common.{BrokerEndPointNotAvailableException, RequestAndCompletionHandler}
 import kafka.server.{DelayedOperationPurgatory, MetadataCache}
@@ -26,22 +26,54 @@ import org.apache.kafka.clients.NetworkClient
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.{concurrent, immutable, mutable}
 import collection.JavaConverters._
-import collection.JavaConversions._
 
 class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
                                metadataCache: MetadataCache,
                                networkClient: NetworkClient,
                                time: Time) extends Logging {
 
-  // we need the metadataPartition so we can clean up when Transaction Log partitions emigrate
-  case class PendingTxnKey(metadataPartition: Int, producerId: Long)
+  // we need the txnTopicPartition so we can clean up when Transaction Log partitions emigrate
+  case class PendingTxnKey(txnTopicPartition: Int, producerId: Long)
 
-  private val brokerStateMap: concurrent.Map[Int, DestinationBrokerAndQueuedMarkers] = concurrent.TrieMap.empty[Int,
DestinationBrokerAndQueuedMarkers]
+  class BrokerRequestQueue(private var destination: Node) {
+
+    // keep track of the requests per txn topic partition so we can easily clear the queue
+    // during partition emigration
+    private val requestsPerTxnTopicPartition: concurrent.Map[Int, BlockingQueue[TxnMarkerEntry]]
+      = concurrent.TrieMap.empty[Int, BlockingQueue[TxnMarkerEntry]]
+
+    def removeRequestsForPartition(partition: Int): Unit = {
+      requestsPerTxnTopicPartition.remove(partition)
+    }
+
+    def maybeUpdateNode(node: Node): Unit = {
+      destination = node
+    }
+
+    def addRequests(txnTopicPartition: Int, txnMarkerEntry: TxnMarkerEntry): Unit = {
+      val queue = requestsPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, new LinkedBlockingQueue[TxnMarkerEntry]())
+      queue.add(txnMarkerEntry)
+    }
+
+    def eachMetadataPartition[B](f:(Int, BlockingQueue[TxnMarkerEntry]) => B): mutable.Iterable[B]
=
+      requestsPerTxnTopicPartition.filter{ case(_, queue) => !queue.isEmpty}
+        .map{case(partition:Int, queue:BlockingQueue[TxnMarkerEntry]) => f(partition,
queue)}
+
+
+    def node: Node = destination
+
+    def totalQueuedRequests(): Int =
+      requestsPerTxnTopicPartition.map { case(_, queue) => queue.size()}
+        .sum
+
+  }
+
+  private val brokerStateMap: concurrent.Map[Int, BrokerRequestQueue] = concurrent.TrieMap.empty[Int,
BrokerRequestQueue]
   private val pendingTxnMap: concurrent.Map[PendingTxnKey, TransactionMetadata] = concurrent.TrieMap.empty[PendingTxnKey,
TransactionMetadata]
 
   // TODO: What is reasonable for this
@@ -54,41 +86,28 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
 
   private[transaction]
   def drainQueuedTransactionMarkers(txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]):
Iterable[RequestAndCompletionHandler] = {
-    brokerStateMap.flatMap {case (brokerId: Int, destAndMarkerQueue: DestinationBrokerAndQueuedMarkers)
=>
-      val markersToSend: java.util.List[CoordinatorEpochAndMarkers] = new util.ArrayList[CoordinatorEpochAndMarkers]
()
-      destAndMarkerQueue.markersQueue.drainTo (markersToSend)
-      markersToSend.groupBy{ epochAndMarker => (epochAndMarker.metadataPartition, epochAndMarker.coordinatorEpoch)
}
-        .map { case((metadataPartition: Int, coordinatorEpoch:Int), buffer: mutable.Buffer[CoordinatorEpochAndMarkers])
=>
-          val txnMarkerEntries = buffer.flatMap{_.txnMarkerEntries }.asJava
-          val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(
-            this,
-            txnMarkerPurgatory,
-            CoordinatorEpochAndMarkers(metadataPartition, coordinatorEpoch, txnMarkerEntries),
-            brokerId)
-          RequestAndCompletionHandler(destAndMarkerQueue.destBrokerNode, new WriteTxnMarkersRequest.Builder(coordinatorEpoch,
txnMarkerEntries), requestCompletionHandler)
-        }
+    brokerStateMap.flatMap {case (brokerId: Int, brokerRequestQueue: BrokerRequestQueue)
=>
+      brokerRequestQueue.eachMetadataPartition{ case(partitionId, queue) =>
+        val markersToSend: java.util.List[TxnMarkerEntry] = new util.ArrayList[TxnMarkerEntry]()
+        queue.drainTo(markersToSend)
+        val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(this,
txnMarkerPurgatory, partitionId, markersToSend, brokerId)
+        RequestAndCompletionHandler(brokerRequestQueue.node, new WriteTxnMarkersRequest.Builder(markersToSend),
requestCompletionHandler)
+      }
     }
   }
 
 
   def addOrUpdateBroker(broker: Node) {
-    if (brokerStateMap.contains(broker.id())) {
-      val brokerQueue = brokerStateMap(broker.id())
-      if (!brokerQueue.destBrokerNode.equals(broker)) {
-        brokerStateMap.put(broker.id(), DestinationBrokerAndQueuedMarkers(broker, brokerQueue.markersQueue))
-        trace(s"Updated destination broker for ${broker.id} from: ${brokerQueue.destBrokerNode}
to: $broker")
-      }
-    } else {
-      val markersQueue = new LinkedBlockingQueue[CoordinatorEpochAndMarkers]()
-      brokerStateMap.put(broker.id, DestinationBrokerAndQueuedMarkers(broker, markersQueue))
-      trace(s"Added destination broker ${broker.id}: $broker")
+    brokerStateMap.putIfAbsent(broker.id(), new BrokerRequestQueue(broker)) match {
+      case Some(brokerQueue) => brokerQueue.maybeUpdateNode(broker)
+      case None => // nothing to do
     }
   }
 
-  private[transaction] def addRequestForBroker(brokerId: Int, txnMarkerRequest: CoordinatorEpochAndMarkers)
{
-    val markersQueue = brokerStateMap(brokerId).markersQueue
-    markersQueue.add(txnMarkerRequest)
-    trace(s"Added markers $txnMarkerRequest for broker $brokerId")
+  private[transaction] def addRequestForBroker(brokerId: Int, metadataPartition: Int, txnMarkerEntry:
TxnMarkerEntry) {
+    val brokerQueue = brokerStateMap(brokerId)
+    brokerQueue.addRequests(metadataPartition, txnMarkerEntry)
+    trace(s"Added marker $txnMarkerEntry for broker $brokerId")
   }
 
   def addRequestToSend(metadataPartition: Int, pid: Long, epoch: Short, result: TransactionResult,
coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
@@ -128,8 +147,8 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
     }
 
     for ((brokerId: Int, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination)
{
-      val txnMarker = new TxnMarkerEntry(pid, epoch, result, topicPartitions.toList.asJava)
-      addRequestForBroker(brokerId, CoordinatorEpochAndMarkers(metadataPartition, coordinatorEpoch,
Utils.mkList(txnMarker)))
+      val txnMarker = new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
+      addRequestForBroker(brokerId, metadataPartition, txnMarker)
     }
     networkClient.wakeup()
   }
@@ -153,12 +172,10 @@ class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
   }
 
   def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {
-    brokerStateMap.foreach {case(_, destinationAndQueue: DestinationBrokerAndQueuedMarkers)
=>
-      val allMarkers: java.util.List[CoordinatorEpochAndMarkers] = new util.ArrayList[CoordinatorEpochAndMarkers]
()
-      destinationAndQueue.markersQueue.drainTo(allMarkers)
-      destinationAndQueue.markersQueue.addAll(allMarkers.asScala.filter{ epochAndMarkers
=> epochAndMarkers.metadataPartition != partition}.asJava)
+    brokerStateMap.foreach { case(_, brokerQueue) =>
+      brokerQueue.removeRequestsForPartition(partition)
     }
-    pendingTxnMap.filter { case (key: PendingTxnKey, _) => key.metadataPartition == partition
}
+    pendingTxnMap.filter { case (key: PendingTxnKey, _) => key.txnTopicPartition == partition
}
       .map { case (key: PendingTxnKey, _) =>
         pendingTxnMap.remove(key)
         key.producerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 92c8c72..7121e31 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -16,7 +16,6 @@
  */
 package kafka.coordinator.transaction
 
-import java.util
 
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
@@ -24,20 +23,14 @@ import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
-import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
+import org.apache.kafka.common.requests.TransactionResult
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.Node
-
-import java.util.concurrent.BlockingQueue
 
 import org.apache.kafka.common.protocol.Errors
 
 import collection.JavaConverters._
 
-case class CoordinatorEpochAndMarkers(metadataPartition: Int, coordinatorEpoch: Int, txnMarkerEntries:
util.List[WriteTxnMarkersRequest.TxnMarkerEntry])
-case class DestinationBrokerAndQueuedMarkers(destBrokerNode: Node, markersQueue: BlockingQueue[CoordinatorEpochAndMarkers])
-
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
             metrics: Metrics,
@@ -121,10 +114,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
 
-  def addTxnMarkerRequest(coordinatorPartition: Int, metadata: TransactionMetadata, coordinatorEpoch:
Int, completionCallback: WriteTxnMarkerCallback): Unit = {
+  def addTxnMarkerRequest(txnTopicPartition: Int, metadata: TransactionMetadata, coordinatorEpoch:
Int, completionCallback: WriteTxnMarkerCallback): Unit = {
     val metadataToWrite = metadata synchronized metadata.copy()
 
-    if (!transactionMarkerChannel.maybeAddPendingRequest(coordinatorPartition, metadata))
+    if (!transactionMarkerChannel.maybeAddPendingRequest(txnTopicPartition, metadata))
       // TODO: Not sure this is the correct response here?
       completionCallback(Errors.INVALID_TXN_STATE)
     else {
@@ -136,7 +129,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
         case PrepareAbort => TransactionResult.ABORT
         case s => throw new IllegalStateException("Unexpected txn metadata state while
writing markers: " + s)
       }
-      transactionMarkerChannel.addRequestToSend(coordinatorPartition,
+      transactionMarkerChannel.addRequestToSend(txnTopicPartition,
         metadataToWrite.pid,
         metadataToWrite.producerEpoch,
         result,
@@ -145,8 +138,8 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   }
 
-  def removeCompleted(metadataPartition: Int, pid: Long): Unit = {
-    transactionMarkerChannel.removeCompletedTxn(metadataPartition, pid)
+  def removeCompleted(txnTopicPartition: Int, pid: Long): Unit = {
+    transactionMarkerChannel.removeCompletedTxn(txnTopicPartition, pid)
   }
 
   def removeStateForPartition(transactionStateTopicPartitionId: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index cea9775..5d68325 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -30,20 +30,21 @@ import collection.JavaConversions._
 
 class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: TransactionMarkerChannel,
                                                 txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                                                epochAndMarkers: CoordinatorEpochAndMarkers,
+                                                txnTopicPartition: Int,
+                                                txnMarkerEntries: java.util.List[TxnMarkerEntry],
                                                 brokerId: Int) extends RequestCompletionHandler
with Logging {
   override def onComplete(response: ClientResponse): Unit = {
     val correlationId = response.requestHeader.correlationId
     if (response.wasDisconnected) {
       trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
       // re-enqueue the markers
-      for (txnMarker: TxnMarkerEntry <- epochAndMarkers.txnMarkerEntries) {
+      for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
         transactionMarkerChannel.addRequestToSend(
-          epochAndMarkers.metadataPartition,
+          txnTopicPartition,
           txnMarker.producerId(),
           txnMarker.producerEpoch(),
           txnMarker.transactionResult(),
-          epochAndMarkers.coordinatorEpoch,
+          txnMarker.coordinatorEpoch(),
           txnMarker.partitions().toSet)
       }
     } else {
@@ -51,7 +52,7 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel:
Transa
 
       val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
-      for (txnMarker: TxnMarkerEntry <- epochAndMarkers.txnMarkerEntries) {
+      for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
         val errors = writeTxnMarkerResponse.errors(txnMarker.producerId())
 
         if (errors == null)
@@ -61,10 +62,10 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel:
Transa
         for ((topicPartition: TopicPartition, error: Errors) <- errors) {
           error match {
             case Errors.NONE =>
-              transactionMarkerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition,
txnMarker.producerId()) match {
+              transactionMarkerChannel.pendingTxnMetadata(txnTopicPartition, txnMarker.producerId())
match {
                 case None =>
                   // TODO: probably need to respond with Errors.NOT_COORDINATOR
-                  throw new IllegalArgumentException(s"transaction metadata not found during
write txn marker request. partition ${epochAndMarkers.metadataPartition} has likely emigrated")
+                  throw new IllegalArgumentException(s"transaction metadata not found during
write txn marker request. partition ${txnTopicPartition} has likely emigrated")
                 case Some(metadata) =>
                   // do not synchronize on this metadata since it will only be accessed by
the sender thread
                   metadata.topicPartitions -= topicPartition
@@ -79,11 +80,11 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel:
Transa
           if (retryPartitions.nonEmpty) {
             // re-enqueue with possible new leaders of the partitions
             transactionMarkerChannel.addRequestToSend(
-              epochAndMarkers.metadataPartition,
+              txnTopicPartition,
               txnMarker.producerId(),
               txnMarker.producerEpoch(),
               txnMarker.transactionResult,
-              epochAndMarkers.coordinatorEpoch,
+              txnMarker.coordinatorEpoch(),
               retryPartitions.toSet)
           }
           val completed = txnMarkerPurgatory.checkAndComplete(txnMarker.producerId())

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 352daa2..1c49151 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -79,10 +79,10 @@ class TransactionMarkerChannelManagerTest {
     channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1,
partition2))
 
 
-    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(0,
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT,
Utils.mkList(partition1)))).build()
-    val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(0,
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT,
Utils.mkList(partition2)))).build()
+    val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT,
Utils.mkList(partition1)))).build()
+    val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT,
Utils.mkList(partition2)))).build()
 
     val requests: Map[Node, WriteTxnMarkersRequest] = requestGenerator().map{ result =>
       (result.destination, result.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -99,38 +99,29 @@ class TransactionMarkerChannelManagerTest {
 
   @Test
   def shouldGenerateRequestPerPartitionPerBroker(): Unit ={
-    val partitionOneEpoch = 0
-    val partitionTwoEpoch = 1
-
     EasyMock.expect(metadataCache.getPartitionInfo(partition1.topic(), partition1.partition()))
-      .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, partitionOneEpoch,
List.empty, 0), 0), Set.empty)))
+      .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 0, List.empty,
0), 0), Set.empty)))
 
 
     EasyMock.expect(metadataCache.getPartitionInfo(partition2.topic(), partition2.partition()))
-      .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, partitionTwoEpoch,
List.empty, 0), 0), Set.empty)))
+      .andReturn(Some(PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(1, 0, List.empty,
0), 0), Set.empty)))
     EasyMock.expect(metadataCache.getAliveEndpoint(EasyMock.eq(1), EasyMock.anyObject())).andReturn(Some(broker1)).anyTimes()
     EasyMock.replay(metadataCache)
 
-    channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, partitionOneEpoch, Set[TopicPartition](partition1))
-    channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, partitionTwoEpoch, Set[TopicPartition](partition2))
+    channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
+    channel.addRequestToSend(1, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition2))
 
-    val expectedPartition1Request = new WriteTxnMarkersRequest.Builder(0,
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT,
Utils.mkList(partition1)))).build()
-    val expectedPartition2Request = new WriteTxnMarkersRequest.Builder(1,
-      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, TransactionResult.COMMIT,
Utils.mkList(partition2)))).build()
+    val expectedPartition1Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT,
Utils.mkList(partition1)))).build()
+    val expectedPartition2Request = new WriteTxnMarkersRequest.Builder(
+      Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT,
Utils.mkList(partition2)))).build()
 
     val requests = requestGenerator().map { result =>
       val markersRequest = result.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()
-      (markersRequest.coordinatorEpoch(), (result.destination, markersRequest))
-    }.toMap
+      (result.destination, markersRequest)
+    }.toList
 
-    val request1 = requests(partitionOneEpoch)
-    val request2 = requests(partitionTwoEpoch)
-    assertEquals(broker1, request1._1)
-    assertEquals(broker1, request2._1)
-    assertEquals(2, requests.size)
-    assertEquals(expectedPartition1Request, request1._2)
-    assertEquals(expectedPartition2Request, request2._2)
+    assertEquals(List((broker1, expectedPartition1Request), (broker1, expectedPartition2Request)),
requests)
   }
 
   @Test
@@ -142,8 +133,10 @@ class TransactionMarkerChannelManagerTest {
 
     channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
 
-    assertTrue(requestGenerator().nonEmpty)
-    assertTrue(requestGenerator().isEmpty)
+    val result = requestGenerator()
+    assertTrue(result.nonEmpty)
+    val result2 = requestGenerator()
+    assertTrue(result2.isEmpty)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
index 6bfeb9b..89a7606 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelTest.scala
@@ -48,8 +48,8 @@ class TransactionMarkerChannelTest {
   def shouldAddEmptyBrokerQueueWhenAddingNewBroker(): Unit = {
     channel.addOrUpdateBroker(new Node(1, "host", 10))
     channel.addOrUpdateBroker(new Node(2, "host", 10))
-    assertEquals(0, channel.queueForBroker(1).get.markersQueue.size())
-    assertEquals(0, channel.queueForBroker(2).get.markersQueue.size())
+    assertEquals(0, channel.queueForBroker(1).get.eachMetadataPartition{case(partition:Int,
_) => partition}.size)
+    assertEquals(0, channel.queueForBroker(2).get.eachMetadataPartition{case(partition:Int,
_) => partition}.size)
   }
 
   @Test
@@ -66,21 +66,22 @@ class TransactionMarkerChannelTest {
     channel.addOrUpdateBroker(new Node(1, "host", 10))
     channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
 
-    val destinationAndQueue = channel.queueForBroker(1).get
-    assertEquals(newDestination, destinationAndQueue.destBrokerNode)
-    assertEquals(1, destinationAndQueue.markersQueue.size())
+    val brokerRequestQueue = channel.queueForBroker(1).get
+    assertEquals(newDestination, brokerRequestQueue.node)
+    assertEquals(1, brokerRequestQueue.totalQueuedRequests())
   }
 
+
   @Test
   def shouldQueueRequestsByBrokerId(): Unit = {
     channel.addOrUpdateBroker(new Node(1, "host", 10))
     channel.addOrUpdateBroker(new Node(2, "otherhost", 10))
-    channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
-    channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
-    channel.addRequestForBroker(2, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
+    channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
+    channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
+    channel.addRequestForBroker(2, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
 
-    assertEquals(2, channel.queueForBroker(1).get.markersQueue.size())
-    assertEquals(1, channel.queueForBroker(2).get.markersQueue.size())
+    assertEquals(2, channel.queueForBroker(1).get.totalQueuedRequests())
+    assertEquals(1, channel.queueForBroker(2).get.totalQueuedRequests())
   }
 
   @Test
@@ -105,8 +106,8 @@ class TransactionMarkerChannelTest {
     EasyMock.replay(metadataCache)
     channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1,
partition2))
 
-    assertEquals(1, channel.queueForBroker(1).get.markersQueue.size)
-    assertEquals(1, channel.queueForBroker(2).get.markersQueue.size)
+    assertEquals(1, channel.queueForBroker(1).get.totalQueuedRequests())
+    assertEquals(1, channel.queueForBroker(2).get.totalQueuedRequests())
   }
   @Test
   def shouldWakeupNetworkClientWhenRequestsQueued(): Unit = {
@@ -131,7 +132,7 @@ class TransactionMarkerChannelTest {
     EasyMock.replay(metadataCache)
     channel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0, Set[TopicPartition](partition1))
 
-    assertEquals(1, channel.queueForBroker(1).get.markersQueue.size)
+    assertEquals(1, channel.queueForBroker(1).get.totalQueuedRequests())
     EasyMock.verify(metadataCache)
   }
 
@@ -161,16 +162,15 @@ class TransactionMarkerChannelTest {
   @Test
   def shouldRemoveBrokerRequestsForPartitionWhenPartitionEmigrated(): Unit = {
     channel.addOrUpdateBroker(new Node(1, "host", 10))
-    channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(0, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
-    channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(1, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
-    channel.addRequestForBroker(1, CoordinatorEpochAndMarkers(1, 0, Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(0,
0, TransactionResult.COMMIT, Utils.mkList()))))
+    channel.addRequestForBroker(1, 0, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
+    channel.addRequestForBroker(1, 1, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
+    channel.addRequestForBroker(1, 1, new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0,
TransactionResult.COMMIT, Utils.mkList()))
 
     channel.removeStateForPartition(1)
 
 
-    val markersQueue = channel.queueForBroker(1).get.markersQueue
-    assertEquals(1, markersQueue.size())
-    assertEquals(0, markersQueue.peek().metadataPartition)
+    val result = channel.queueForBroker(1).get.eachMetadataPartition{case (partition:Int,
_) => partition}.toList
+    assertEquals(List(0), result)
   }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/324b475e/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 45ea2da..096b826 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -18,7 +18,6 @@ package kafka.coordinator.transaction
 
 import java.{lang, util}
 
-import kafka.coordinator.transaction._
 import kafka.server.DelayedOperationPurgatory
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.clients.ClientResponse
@@ -37,19 +36,15 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private val markerChannel = EasyMock.createNiceMock(classOf[TransactionMarkerChannel])
   private val purgatory = new DelayedOperationPurgatory[DelayedTxnMarker]("txn-purgatory-name",
new MockTimer, reaperEnabled = false)
   private val topic1 = new TopicPartition("topic1", 0)
-  private val epochAndMarkers = CoordinatorEpochAndMarkers(0,
-    0,
+  private val txnMarkers =
     Utils.mkList(
-      new WriteTxnMarkersRequest.TxnMarkerEntry(0,
-        0,
-        TransactionResult.COMMIT,
-        Utils.mkList(topic1))))
+      new WriteTxnMarkersRequest.TxnMarkerEntry(0, 0, 0, TransactionResult.COMMIT, Utils.mkList(topic1)))
 
-  private val handler = new TransactionMarkerRequestCompletionHandler(markerChannel, purgatory,
epochAndMarkers,  0)
+  private val handler = new TransactionMarkerRequestCompletionHandler(markerChannel, purgatory,
0, txnMarkers, 0)
 
   @Test
   def shouldReEnqueuePartitionsWhenBrokerDisconnected(): Unit = {
-    EasyMock.expect(markerChannel.addRequestToSend(epochAndMarkers.metadataPartition, 0,
0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
+    EasyMock.expect(markerChannel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0,
Set[TopicPartition](topic1)))
     EasyMock.replay(markerChannel)
 
     handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null,
0, 0, true, null, null))
@@ -76,7 +71,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
 
     val metadata = new TransactionMetadata(0, 0, 0, PrepareCommit, mutable.Set[TopicPartition](topic1),
0, 0)
-    EasyMock.expect(markerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition, 0))
+    EasyMock.expect(markerChannel.pendingTxnMetadata(0, 0))
       .andReturn(Some(metadata))
     EasyMock.replay(markerChannel)
 
@@ -96,7 +91,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
       completed = true
     }), Seq(0L))
 
-    EasyMock.expect(markerChannel.pendingTxnMetadata(epochAndMarkers.metadataPartition, 0))
+    EasyMock.expect(markerChannel.pendingTxnMetadata(0, 0))
       .andReturn(Some(metadata))
 
     EasyMock.replay(markerChannel)
@@ -144,7 +139,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.UNKNOWN_TOPIC_OR_PARTITION))
     val metadata = new TransactionMetadata(0, 0, 0, PrepareCommit, mutable.Set[TopicPartition](topic1),
0, 0)
 
-    EasyMock.expect(markerChannel.addRequestToSend(epochAndMarkers.metadataPartition, 0,
0, TransactionResult.COMMIT, 0, Set[TopicPartition](topic1)))
+    EasyMock.expect(markerChannel.addRequestToSend(0, 0, 0, TransactionResult.COMMIT, 0,
Set[TopicPartition](topic1)))
     EasyMock.replay(markerChannel)
 
     handler.onComplete(new ClientResponse(new RequestHeader(0, 0, "client", 1), null, null,
0, 0, false, null, response))


Mime
View raw message