kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-5130: Refactor transaction coordinator's in-memory cache; plus fixes on transaction metadata synchronization
Date Fri, 12 May 2017 22:01:07 GMT
KAFKA-5130: Refactor transaction coordinator's in-memory cache; plus fixes on transaction metadata synchronization

1. Collapsed the `ownedPartitions`, `pendingTxnMap` and the `transactionMetadataCache` into a single in-memory structure, which is a two-layered map: first keyed by the transactionTxnLog, and then valued with the current coordinatorEpoch of that map plus another map keyed by the transactional id.

2. Use `transactionalId` across the modules in transactional coordinator, attach this id with the transactional marker entries.

3. Use two keys: `transactionalId` and `txnLogPartitionId` in the writeMarkerPurgatory as well as passing it along with the TxnMarkerEntry, so that `TransactionMarkerRequestCompletionHandler` can use it to access the two-layered map upon getting responses.

4. Use one queue per `broker-id` and `txnLogPartitionId`. Also when there is a possible update on the end point associated with the `broker-id`, update the Node without clearing the queue but relying on the requests to retry in the next round.

5. Centralize the error handling callback for appending-metadata-to-log and sending-markers-to-brokers in `TransactionStateManager#appendTransactionToLog`, and `TransactionMarkerChannelManager#addTxnMarkersToSend`.

6. Always update the in-memory transaction metadata AFTER the txn log has been appended and replicated, and then double check on the cache to make sure nothing has changed since log appending. The only exception is when initializing the pid for the first time, in which we will put a dummy into the cache but set its pendingState as `Empty` (so it will be valid to transit from `Empty` to `Empty`) so that it can be updated after the log append has completed.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Ismael Juma, Damian Guy, Jason Gustafson, Jun Rao

Closes #2964 from guozhangwang/K5130-refactor-tc-inmemory-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/794e6dbd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/794e6dbd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/794e6dbd

Branch: refs/heads/trunk
Commit: 794e6dbd14f040d21d3402c5eda22cfa8f5c4b3d
Parents: 7baa58d
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri May 12 15:01:01 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 12 15:01:01 2017 -0700

----------------------------------------------------------------------
 .../requests/WriteTxnMarkersResponse.java       |   2 +
 .../kafka/common/InterBrokerSendThread.scala    |   3 +
 .../transaction/TransactionCoordinator.scala    | 525 ++++++++++---------
 .../transaction/TransactionLog.scala            |  14 +-
 .../transaction/TransactionMarkerChannel.scala  | 186 -------
 .../TransactionMarkerChannelManager.scala       | 243 +++++++--
 ...nsactionMarkerRequestCompletionHandler.scala | 130 +++--
 .../transaction/TransactionMetadata.scala       | 200 +++++--
 .../transaction/TransactionStateManager.scala   | 385 +++++++-------
 .../scala/kafka/server/DelayedOperation.scala   |   3 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   8 +-
 .../main/scala/kafka/server/MetadataCache.scala |  18 +
 .../scala/kafka/server/ReplicaManager.scala     |   6 +-
 .../TransactionCoordinatorIntegrationTest.scala |   6 +
 .../TransactionCoordinatorTest.scala            | 380 +++++---------
 .../transaction/TransactionLogTest.scala        |   8 +-
 .../TransactionMarkerChannelManagerTest.scala   | 304 +++++------
 .../TransactionMarkerChannelTest.scala          | 179 -------
 ...tionMarkerRequestCompletionHandlerTest.scala | 195 +++++--
 .../TransactionStateManagerTest.scala           | 139 +++--
 20 files changed, 1430 insertions(+), 1504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 916dbab..00133a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -46,6 +46,8 @@ public class WriteTxnMarkersResponse extends AbstractResponse {
     //   NotEnoughReplicas
     //   NotEnoughReplicasAfterAppend
     //   InvalidRequiredAcks
+    //   TransactionCoordinatorFenced
+    //   RequestTimeout
 
     private final Map<Long, Map<TopicPartition, Errors>> errors;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 217aa80..ac14243 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -33,6 +33,9 @@ class InterBrokerSendThread(name: String,
                             isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
 
+  // visible for testing
+  def generateRequests(): Iterable[RequestAndCompletionHandler] = requestGenerator()
+
   override def doWork() {
     val now = time.milliseconds()
     var pollTimeout = Long.MaxValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 982e009..7632f3f 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -48,19 +48,19 @@ object TransactionCoordinator {
       config.transactionTransactionsExpiredTransactionCleanupIntervalMs)
 
     val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
-    val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
-    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId)
-    val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time)
+    val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
+    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false)
+    val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, txnMarkerPurgatory, time)
 
-    new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, scheduler, time)
+    new TransactionCoordinator(config.brokerId, scheduler, pidManager, txnStateManager, txnMarkerChannelManager, txnMarkerPurgatory, time)
   }
 
   private def initTransactionError(error: Errors): InitPidResult = {
     InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
   }
 
-  private def initTransactionMetadata(txnMetadata: TransactionMetadata): InitPidResult = {
-    InitPidResult(txnMetadata.pid, txnMetadata.producerEpoch, Errors.NONE)
+  private def initTransactionMetadata(txnMetadata: TransactionMetadataTransition): InitPidResult = {
+    InitPidResult(txnMetadata.producerId, txnMetadata.producerEpoch, Errors.NONE)
   }
 }
 
@@ -73,18 +73,18 @@ object TransactionCoordinator {
  * Producers with no specific transactional id may talk to a random broker as their coordinators.
  */
 class TransactionCoordinator(brokerId: Int,
+                             scheduler: Scheduler,
                              pidManager: ProducerIdManager,
                              txnManager: TransactionStateManager,
                              txnMarkerChannelManager: TransactionMarkerChannelManager,
                              txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                             scheduler: Scheduler,
                              time: Time) extends Logging {
   this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
 
   import TransactionCoordinator._
 
   type InitPidCallback = InitPidResult => Unit
-  type TxnMetadataUpdateCallback = Errors => Unit
+  type AddPartitionsCallback = Errors => Unit
   type EndTxnCallback = Errors => Unit
 
   /* Active flag of the coordinator */
@@ -93,90 +93,113 @@ class TransactionCoordinator(brokerId: Int,
   def handleInitPid(transactionalId: String,
                     transactionTimeoutMs: Int,
                     responseCallback: InitPidCallback): Unit = {
-      if (transactionalId == null || transactionalId.isEmpty) {
-        // if the transactional id is not specified, then always blindly accept the request
-        // and return a new pid from the pid manager
-        val pid = pidManager.nextPid()
-        responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
-      } else if (!txnManager.isCoordinatorFor(transactionalId)) {
-        // check if it is the assigned coordinator for the transactional id
-        responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
-      } else if (txnManager.isCoordinatorLoadingInProgress(transactionalId)) {
-        responseCallback(initTransactionError(Errors.COORDINATOR_LOAD_IN_PROGRESS))
-      } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
-        // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
-        responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
-      } else {
-        // only try to get a new pid and update the cache if the transactional id is unknown
-        txnManager.getTransactionState(transactionalId) match {
-          case None =>
-            val pid = pidManager.nextPid()
-            val newMetadata: TransactionMetadata = new TransactionMetadata(pid = pid,
-              producerEpoch = 0,
-              txnTimeoutMs = transactionTimeoutMs,
-              state = Empty,
-              topicPartitions = collection.mutable.Set.empty[TopicPartition],
-              lastUpdateTimestamp = time.milliseconds())
-
-            val metadata = txnManager.addTransaction(transactionalId, newMetadata)
-
-            // there might be a concurrent thread that has just updated the mapping
-            // with the transactional id at the same time; in this case we will
-            // treat it as the metadata has existed and update it accordingly
-            metadata synchronized {
-              if (!metadata.eq(newMetadata))
-                initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
-              else
-                appendMetadataToLog(transactionalId, metadata, responseCallback)
 
+    if (transactionalId == null || transactionalId.isEmpty) {
+      // if the transactional id is not specified, then always blindly accept the request
+      // and return a new pid from the pid manager
+      val pid = pidManager.nextPid()
+      responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+    } else if (!txnManager.isCoordinatorFor(transactionalId)) {
+      // check if it is the assigned coordinator for the transactional id
+      responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
+    } else if (txnManager.isCoordinatorLoadingInProgress(transactionalId)) {
+      responseCallback(initTransactionError(Errors.COORDINATOR_LOAD_IN_PROGRESS))
+    } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
+      // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
+      responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
+    } else {
+      // only try to get a new pid and update the cache if the transactional id is unknown
+      val result: Either[InitPidResult, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
+        case None =>
+          val pid = pidManager.nextPid()
+          val now = time.milliseconds()
+          val createdMetadata = new TransactionMetadata(producerId = pid,
+            producerEpoch = 0,
+            txnTimeoutMs = transactionTimeoutMs,
+            state = Empty,
+            topicPartitions = collection.mutable.Set.empty[TopicPartition],
+            txnLastUpdateTimestamp = now)
+
+          val epochAndMetadata = txnManager.addTransaction(transactionalId, createdMetadata)
+          val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
+          val txnMetadata = epochAndMetadata.transactionMetadata
+
+          // there might be a concurrent thread that has just updated the mapping
+          // with the transactional id at the same time (hence reference equality will fail);
+          // in this case we will treat it as the metadata has existed already
+          txnMetadata synchronized {
+            if (!txnMetadata.eq(createdMetadata)) {
+              initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+            } else {
+              Right(coordinatorEpoch, txnMetadata.prepareNewPid(time.milliseconds()))
             }
-          case Some(metadata) =>
-            initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
-        }
+          }
+
+        case Some(existingEpochAndMetadata) =>
+          val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
+          val txnMetadata = existingEpochAndMetadata.transactionMetadata
+
+          txnMetadata synchronized {
+            initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
+          }
       }
-  }
 
-  private def appendMetadataToLog(transactionalId: String,
-                             metadata: TransactionMetadata,
-                             initPidCallback: InitPidCallback): Unit ={
-    def callback(errors: Errors): Unit = {
-      if (errors == Errors.NONE)
-        initPidCallback(initTransactionMetadata(metadata))
-      else
-        initPidCallback(initTransactionError(errors))
+      result match {
+        case Left(pidResult) =>
+          responseCallback(pidResult)
+
+        case Right((coordinatorEpoch, newMetadata)) =>
+          if (newMetadata.txnState == Ongoing) {
+            // abort the ongoing transaction and then return CONCURRENT_TRANSACTIONS to let client wait and retry
+            def sendRetriableErrorCallback(error: Errors): Unit = {
+              if (error != Errors.NONE) {
+                responseCallback(initTransactionError(error))
+              } else {
+                responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
+              }
+            }
+
+            handleEndTransaction(transactionalId,
+              newMetadata.producerId,
+              newMetadata.producerEpoch,
+              TransactionResult.ABORT,
+              sendRetriableErrorCallback)
+          } else {
+            def sendPidResponseCallback(error: Errors): Unit = {
+              if (error == Errors.NONE)
+                responseCallback(initTransactionMetadata(newMetadata))
+              else
+                responseCallback(initTransactionError(error))
+            }
+
+            txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendPidResponseCallback)
+          }
+      }
     }
-    txnManager.appendTransactionToLog(transactionalId, metadata, callback)
   }
 
-
   private def initPidWithExistingMetadata(transactionalId: String,
                                           transactionTimeoutMs: Int,
-                                          responseCallback: InitPidCallback,
-                                          metadata: TransactionMetadata) = {
-
-    metadata synchronized {
-      if (metadata.state == Ongoing) {
-        // abort the ongoing transaction
-        handleEndTransaction(transactionalId,
-          metadata.pid,
-          metadata.producerEpoch,
-          TransactionResult.ABORT,
-          (errors: Errors) => {
-            if (errors != Errors.NONE) {
-              responseCallback(initTransactionError(errors))
-            } else {
-              responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
-            }
-          })
-      } else if (metadata.state == PrepareAbort || metadata.state == PrepareCommit) {
-        responseCallback(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
-      } else {
-        metadata.producerEpoch = (metadata.producerEpoch + 1).toShort
-        metadata.txnTimeoutMs = transactionTimeoutMs
-        metadata.topicPartitions.clear()
-        metadata.lastUpdateTimestamp = time.milliseconds()
-        metadata.state = Empty
-        appendMetadataToLog(transactionalId, metadata, responseCallback)
+                                          coordinatorEpoch: Int,
+                                          txnMetadata: TransactionMetadata): Either[InitPidResult, (Int, TransactionMetadataTransition)] = {
+
+    if (txnMetadata.pendingTransitionInProgress) {
+      // return a retriable exception to let the client backoff and retry
+      Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
+    } else {
+      // caller should have synchronized on txnMetadata already
+      txnMetadata.state match {
+        case PrepareAbort | PrepareCommit =>
+          // reply to client and let client backoff and retry
+          Left(initTransactionError(Errors.CONCURRENT_TRANSACTIONS))
+
+        case CompleteAbort | CompleteCommit | Empty =>
+          // try to append and then update
+          Right(coordinatorEpoch, txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, time.milliseconds()))
+
+        case Ongoing =>
+          // indicate to abort the current ongoing txn first
+          Right(coordinatorEpoch, txnMetadata.prepareNoTransit())
       }
     }
   }
@@ -196,189 +219,192 @@ class TransactionCoordinator(brokerId: Int,
                                        pid: Long,
                                        epoch: Short,
                                        partitions: collection.Set[TopicPartition],
-                                       responseCallback: TxnMetadataUpdateCallback): Unit = {
-    val errors = validateTransactionalId(transactionalId)
-    if (errors != Errors.NONE)
-      responseCallback(errors)
-    else {
+                                       responseCallback: AddPartitionsCallback): Unit = {
+    val error = validateTransactionalId(transactionalId)
+    if (error != Errors.NONE) {
+      responseCallback(error)
+    } else {
       // try to update the transaction metadata and append the updated metadata to txn log;
       // if there is no such metadata treat it as invalid pid mapping error.
-      val (error, newMetadata) = txnManager.getTransactionState(transactionalId) match {
+      val result: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          (Errors.INVALID_PID_MAPPING, null)
+          Left(Errors.INVALID_PID_MAPPING)
+
+        case Some(epochAndMetadata) =>
+          val coordinatorEpoch = epochAndMetadata.coordinatorEpoch
+          val txnMetadata = epochAndMetadata.transactionMetadata
 
-        case Some(metadata) =>
           // generate the new transaction metadata with added partitions
-          metadata synchronized {
-            if (metadata.pid != pid) {
-              (Errors.INVALID_PID_MAPPING, null)
-            } else if (metadata.producerEpoch != epoch) {
-              (Errors.INVALID_PRODUCER_EPOCH, null)
-            } else if (metadata.pendingState.isDefined) {
+          txnMetadata synchronized {
+            if (txnMetadata.producerId != pid) {
+              Left(Errors.INVALID_PID_MAPPING)
+            } else if (txnMetadata.producerEpoch != epoch) {
+              Left(Errors.INVALID_PRODUCER_EPOCH)
+            } else if (txnMetadata.pendingTransitionInProgress) {
               // return a retriable exception to let the client backoff and retry
-              (Errors.CONCURRENT_TRANSACTIONS, null)
-            } else if (metadata.state == PrepareCommit || metadata.state == PrepareAbort) {
-              (Errors.CONCURRENT_TRANSACTIONS, null)
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (partitions.subsetOf(txnMetadata.topicPartitions)) {
+              // this is an optimization: if the partitions are already in the metadata reply OK immediately
+              Left(Errors.NONE)
             } else {
-              if (metadata.state == CompleteAbort || metadata.state == CompleteCommit)
-                metadata.topicPartitions.clear()
-              if (partitions.subsetOf(metadata.topicPartitions)) {
-                // this is an optimization: if the partitions are already in the metadata reply OK immediately
-                (Errors.NONE, null)
-              } else {
-                val now = time.milliseconds()
-                val newMetadata = new TransactionMetadata(pid,
-                  epoch,
-                  metadata.txnTimeoutMs,
-                  Ongoing,
-                  metadata.topicPartitions ++ partitions,
-                  if (metadata.state == Empty || metadata.state == CompleteCommit || metadata.state == CompleteAbort)
-                    now
-                  else metadata.transactionStartTime,
-                  now)
-                metadata.prepareTransitionTo(Ongoing)
-                (Errors.NONE, newMetadata)
-              }
+              Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
             }
           }
       }
 
-      if (newMetadata != null) {
-        txnManager.appendTransactionToLog(transactionalId, newMetadata, responseCallback)
-      } else {
-        responseCallback(error)
+      result match {
+        case Left(err) =>
+          responseCallback(err)
+
+        case Right((coordinatorEpoch, newMetadata)) =>
+          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, responseCallback)
       }
     }
   }
 
-  def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
-      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch, writeTxnMarkers)
+  def handleTxnImmigration(txnTopicPartitionId: Int, coordinatorEpoch: Int) {
+      txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, txnMarkerChannelManager.addTxnMarkersToSend)
   }
 
-  def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
-      txnManager.removeTransactionsForPartition(transactionStateTopicPartitionId)
-      txnMarkerChannelManager.removeStateForPartition(transactionStateTopicPartitionId)
+  def handleTxnEmigration(txnTopicPartitionId: Int) {
+      txnManager.removeTransactionsForTxnTopicPartition(txnTopicPartitionId)
+      txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
   }
 
   def handleEndTransaction(transactionalId: String,
                            pid: Long,
                            epoch: Short,
-                           command: TransactionResult,
+                           txnMarkerResult: TransactionResult,
                            responseCallback: EndTxnCallback): Unit = {
-    val errors = validateTransactionalId(transactionalId)
-    if (errors != Errors.NONE)
-      responseCallback(errors)
-    else
-      txnManager.getTransactionState(transactionalId) match {
+    val error = validateTransactionalId(transactionalId)
+    if (error != Errors.NONE)
+      responseCallback(error)
+    else {
+      val preAppendResult: Either[Errors, (Int, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
         case None =>
-          responseCallback(Errors.INVALID_PID_MAPPING)
-        case Some(metadata) =>
-          metadata synchronized {
-            if (metadata.pid != pid)
-              responseCallback(Errors.INVALID_PID_MAPPING)
-            else if (metadata.producerEpoch != epoch)
-              responseCallback(Errors.INVALID_PRODUCER_EPOCH)
-            else metadata.state match {
+          Left(Errors.INVALID_PID_MAPPING)
+
+        case Some(epochAndTxnMetadata) =>
+          val txnMetadata = epochAndTxnMetadata.transactionMetadata
+          val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
+
+          txnMetadata synchronized {
+            if (txnMetadata.producerId != pid)
+              Left(Errors.INVALID_PID_MAPPING)
+            else if (txnMetadata.producerEpoch != epoch)
+              Left(Errors.INVALID_PRODUCER_EPOCH)
+            else if (txnMetadata.pendingTransitionInProgress)
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            else txnMetadata.state match {
               case Ongoing =>
-                commitOrAbort(transactionalId, pid, epoch, command, responseCallback, metadata)
+                val nextState = if (txnMarkerResult == TransactionResult.COMMIT)
+                  PrepareCommit
+                else
+                  PrepareAbort
+                Right(coordinatorEpoch, txnMetadata.prepareAbortOrCommit(nextState, time.milliseconds()))
               case CompleteCommit =>
-                if (command == TransactionResult.COMMIT)
-                  responseCallback(Errors.NONE)
+                if (txnMarkerResult == TransactionResult.COMMIT)
+                  Left(Errors.NONE)
                 else
-                  responseCallback(Errors.INVALID_TXN_STATE)
+                  Left(Errors.INVALID_TXN_STATE)
               case CompleteAbort =>
-                if (command == TransactionResult.ABORT)
-                  responseCallback(Errors.NONE)
+                if (txnMarkerResult == TransactionResult.ABORT)
+                  Left(Errors.NONE)
                 else
-                  responseCallback(Errors.INVALID_TXN_STATE)
-              case _ =>
-                responseCallback(Errors.INVALID_TXN_STATE)
+                  Left(Errors.INVALID_TXN_STATE)
+              case PrepareCommit =>
+                if (txnMarkerResult == TransactionResult.COMMIT)
+                  Left(Errors.CONCURRENT_TRANSACTIONS)
+                else
+                  Left(Errors.INVALID_TXN_STATE)
+              case PrepareAbort =>
+                if (txnMarkerResult == TransactionResult.ABORT)
+                  Left(Errors.CONCURRENT_TRANSACTIONS)
+                else
+                  Left(Errors.INVALID_TXN_STATE)
+              case Empty =>
+                Left(Errors.INVALID_TXN_STATE)
             }
           }
       }
-  }
 
-  private def commitOrAbort(transactionalId: String,
-                            pid: Long,
-                            epoch: Short,
-                            command: TransactionResult,
-                            responseCallback: EndTxnCallback,
-                            metadata: TransactionMetadata) = {
-    val nextState = if (command == TransactionResult.COMMIT) PrepareCommit else PrepareAbort
-    val newMetadata = new TransactionMetadata(pid,
-      epoch,
-      metadata.txnTimeoutMs,
-      nextState,
-      metadata.topicPartitions,
-      metadata.transactionStartTime,
-      time.milliseconds())
-    metadata.prepareTransitionTo(nextState)
-
-    def logAppendCallback(errors: Errors): Unit = {
-      // we can respond to the client immediately and continue to write the txn markers if
-      // the log append was successful
-      responseCallback(errors)
-      if (errors == Errors.NONE)
-        txnManager.coordinatorEpochFor(transactionalId) match {
-          case Some(coordinatorEpoch) =>
-            writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata, coordinatorEpoch))
-          case None =>
-            // this one should be completed by the new coordinator
-            warn(s"no longer the coordinator for transactionalId: $transactionalId")
-        }
-    }
-    txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
-  }
+      preAppendResult match {
+        case Left(err) =>
+          responseCallback(err)
+
+        case Right((coordinatorEpoch, newMetadata)) =>
+          def sendTxnMarkersCallback(error: Errors): Unit = {
+            if (error == Errors.NONE) {
+              val preSendResult: Either[Errors, (TransactionMetadata, TransactionMetadataTransition)] = txnManager.getTransactionState(transactionalId) match {
+                case Some(epochAndMetadata) =>
+                  if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
+
+                    val txnMetadata = epochAndMetadata.transactionMetadata
+                    txnMetadata synchronized {
+                      if (txnMetadata.producerId != pid)
+                        Left(Errors.INVALID_PID_MAPPING)
+                      else if (txnMetadata.producerEpoch != epoch)
+                        Left(Errors.INVALID_PRODUCER_EPOCH)
+                      else if (txnMetadata.pendingTransitionInProgress)
+                        Left(Errors.CONCURRENT_TRANSACTIONS)
+                      else txnMetadata.state match {
+                        case Empty| Ongoing | CompleteCommit | CompleteAbort =>
+                          Left(Errors.INVALID_TXN_STATE)
+                        case PrepareCommit =>
+                          if (txnMarkerResult != TransactionResult.COMMIT)
+                            Left(Errors.INVALID_TXN_STATE)
+                          else
+                            Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
+                        case PrepareAbort =>
+                          if (txnMarkerResult != TransactionResult.ABORT)
+                            Left(Errors.INVALID_TXN_STATE)
+                          else
+                            Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
+                      }
+                    }
+                  } else {
+                    info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed since the transaction coordinator epoch " +
+                      s"has been changed to ${epochAndMetadata.coordinatorEpoch} after the transaction metadata has been successfully appended to the log")
+
+                    Left(Errors.NOT_COORDINATOR)
+                  }
+
+                case None =>
+                  if (txnManager.isCoordinatorFor(transactionalId)) {
+                    throw new IllegalStateException("Cannot find the metadata in coordinator's cache while it is still the leader of the txn topic partition")
+                  } else {
+                    // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
+                    info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
+                      s"has been appended to the log. The partition ${partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
+
+                    Left(Errors.NOT_COORDINATOR)
+                  }
+              }
+
+              preSendResult match {
+                case Left(err) =>
+                  responseCallback(err)
+
+                case Right((txnMetadata, newPreSendMetadata)) =>
+                  // we can respond to the client immediately and continue to write the txn markers if
+                  // the log append was successful
+                  responseCallback(Errors.NONE)
 
-  private def writeTxnMarkers(markerArgs: WriteTxnMarkerArgs): Unit = {
-    def completionCallback(error: Errors): Unit = {
-      error match {
-        case Errors.NONE =>
-          txnManager.getTransactionState(markerArgs.transactionalId) match {
-            case Some(preparedCommitMetadata) =>
-              val completedState = if (markerArgs.nextState == PrepareCommit) CompleteCommit else CompleteAbort
-              val committedMetadata = new TransactionMetadata(markerArgs.pid,
-                markerArgs.epoch,
-                preparedCommitMetadata.txnTimeoutMs,
-                completedState,
-                preparedCommitMetadata.topicPartitions,
-                preparedCommitMetadata.transactionStartTime,
-                time.milliseconds())
-              preparedCommitMetadata.prepareTransitionTo(completedState)
-
-              def writeCommittedTransactionCallback(error: Errors): Unit = {
-                error match {
-                  case Errors.NONE =>
-                    txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(markerArgs.transactionalId),
-                      markerArgs.pid)
-                  case Errors.NOT_COORDINATOR =>
-                    // this one should be completed by the new coordinator
-                    warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
-                  case _ =>
-                    warn(s"error: $error caught for transactionalId: ${markerArgs.transactionalId} when appending state: $completedState. Retrying.")
-                    // retry until success
-                    txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
-                }
+                  txnMarkerChannelManager.addTxnMarkersToSend(transactionalId, coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)
               }
-              txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
-            case None =>
-              // this one should be completed by the new coordinator
-              warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
+            } else {
+              info(s"Updating $transactionalId's transaction state to $newMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed since the transaction message " +
+                s"cannot be appended to the log. Returning error code $error to the client")
+
+              responseCallback(error)
+            }
           }
-        case Errors.NOT_COORDINATOR =>
-          warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
-        case _ =>
-          warn(s"error: $error caught when writing transaction markers for transactionalId: ${markerArgs.transactionalId}. retrying")
-          txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
-            markerArgs.newMetadata,
-            markerArgs.coordinatorEpoch,
-            completionCallback)
+
+          txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, sendTxnMarkersCallback)
       }
     }
-    txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
-      markerArgs.newMetadata,
-      markerArgs.coordinatorEpoch,
-      completionCallback)
   }
 
   def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
@@ -386,30 +412,15 @@ class TransactionCoordinator(brokerId: Int,
   def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
 
   private def expireTransactions(): Unit = {
-
-    txnManager.transactionsToExpire().foreach{ idAndMetadata =>
-      idAndMetadata.metadata synchronized {
-        if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)
-          && idAndMetadata.metadata.pendingState.isEmpty) {
-          // bump the producerEpoch so that any further requests for this transactionalId will be fenced
-          idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
-          idAndMetadata.metadata.prepareTransitionTo(Ongoing)
-          txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
-            if (errors != Errors.NONE)
-              warn(s"failed to append transactionalId ${idAndMetadata.transactionalId} to log during transaction expiry. errors:$errors")
-            else
-              handleEndTransaction(idAndMetadata.transactionalId,
-                idAndMetadata.metadata.pid,
-                idAndMetadata.metadata.producerEpoch,
-                TransactionResult.ABORT,
-                (errors: Errors) => {
-                  if (errors != Errors.NONE)
-                    warn(s"rollback of transactionalId: ${idAndMetadata.transactionalId} failed during transaction expiry. errors: $errors")
-                }
-              )
-          })
-        }
-      }
+    txnManager.transactionsToExpire().foreach { txnIdAndPidEpoch =>
+      handleEndTransaction(txnIdAndPidEpoch.transactionalId,
+        txnIdAndPidEpoch.producerId,
+        txnIdAndPidEpoch.producerEpoch,
+        TransactionResult.ABORT,
+        (error: Errors) => {
+          if (error != Errors.NONE)
+            warn(s"Rollback ongoing transaction of transactionalId: ${txnIdAndPidEpoch.transactionalId} aborted due to ${error.exceptionName()}")
+        })
     }
   }
 
@@ -421,8 +432,8 @@ class TransactionCoordinator(brokerId: Int,
     scheduler.startup()
     scheduler.schedule("transaction-expiration",
       expireTransactions,
-      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs,
-      TransactionManager.DefaultRemoveExpiredTransactionsIntervalMs
+      TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs,
+      TransactionStateManager.DefaultRemoveExpiredTransactionsIntervalMs
     )
     if (enablePidExpiration)
       txnManager.enablePidExpiration()
@@ -449,9 +460,3 @@ class TransactionCoordinator(brokerId: Int,
 }
 
 case class InitPidResult(pid: Long, epoch: Short, error: Errors)
-case class WriteTxnMarkerArgs(transactionalId: String,
-                              pid: Long,
-                              epoch: Short,
-                              nextState: TransactionState,
-                              newMetadata: TransactionMetadata,
-                              coordinatorEpoch: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 4a0dc71..a180502 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -141,18 +141,18 @@ object TransactionLog {
     *
     * @return value payload bytes
     */
-  private[coordinator] def valueToBytes(txnMetadata: TransactionMetadata): Array[Byte] = {
+  private[coordinator] def valueToBytes(txnMetadata: TransactionMetadataTransition): Array[Byte] = {
     val value = new Struct(CURRENT_VALUE_SCHEMA)
-    value.set(VALUE_SCHEMA_PID_FIELD, txnMetadata.pid)
+    value.set(VALUE_SCHEMA_PID_FIELD, txnMetadata.producerId)
     value.set(VALUE_SCHEMA_EPOCH_FIELD, txnMetadata.producerEpoch)
     value.set(VALUE_SCHEMA_TXN_TIMEOUT_FIELD, txnMetadata.txnTimeoutMs)
-    value.set(VALUE_SCHEMA_TXN_STATUS_FIELD, txnMetadata.state.byte)
-    value.set(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD, txnMetadata.lastUpdateTimestamp)
-    value.set(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD, txnMetadata.transactionStartTime)
+    value.set(VALUE_SCHEMA_TXN_STATUS_FIELD, txnMetadata.txnState.byte)
+    value.set(VALUE_SCHEMA_TXN_ENTRY_TIMESTAMP_FIELD, txnMetadata.txnLastUpdateTimestamp)
+    value.set(VALUE_SCHEMA_TXN_START_TIMESTAMP_FIELD, txnMetadata.txnStartTimestamp)
 
-    if (txnMetadata.state == Empty) {
+    if (txnMetadata.txnState == Empty) {
       if (txnMetadata.topicPartitions.nonEmpty)
-        throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.state}: $txnMetadata")
+        throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata")
 
       value.set(VALUE_SCHEMA_TXN_PARTITIONS_FIELD, null)
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
deleted file mode 100644
index e60bd40..0000000
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannel.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.transaction
-
-import java.util
-import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
-
-import kafka.common.{BrokerEndPointNotAvailableException, RequestAndCompletionHandler}
-import kafka.server.{DelayedOperationPurgatory, MetadataCache}
-import kafka.utils.Logging
-import org.apache.kafka.clients.NetworkClient
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
-import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Node, TopicPartition}
-
-import scala.collection.{concurrent, immutable, mutable}
-import collection.JavaConverters._
-
-class TransactionMarkerChannel(interBrokerListenerName: ListenerName,
-                               metadataCache: MetadataCache,
-                               networkClient: NetworkClient,
-                               time: Time) extends Logging {
-
-  // we need the txnTopicPartition so we can clean up when Transaction Log partitions emigrate
-  case class PendingTxnKey(txnTopicPartition: Int, producerId: Long)
-
-  class BrokerRequestQueue(private var destination: Node) {
-
-    // keep track of the requests per txn topic partition so we can easily clear the queue
-    // during partition emigration
-    private val requestsPerTxnTopicPartition: concurrent.Map[Int, BlockingQueue[TxnMarkerEntry]]
-      = concurrent.TrieMap.empty[Int, BlockingQueue[TxnMarkerEntry]]
-
-    def removeRequestsForPartition(partition: Int): Unit = {
-      requestsPerTxnTopicPartition.remove(partition)
-    }
-
-    def maybeUpdateNode(node: Node): Unit = {
-      destination = node
-    }
-
-    def addRequests(txnTopicPartition: Int, txnMarkerEntry: TxnMarkerEntry): Unit = {
-      val queue = requestsPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, new LinkedBlockingQueue[TxnMarkerEntry]())
-      queue.add(txnMarkerEntry)
-    }
-
-    def eachMetadataPartition[B](f:(Int, BlockingQueue[TxnMarkerEntry]) => B): mutable.Iterable[B] =
-      requestsPerTxnTopicPartition.filter{ case(_, queue) => !queue.isEmpty}
-        .map{case(partition:Int, queue:BlockingQueue[TxnMarkerEntry]) => f(partition, queue)}
-
-
-    def node: Node = destination
-
-    def totalQueuedRequests(): Int =
-      requestsPerTxnTopicPartition.map { case(_, queue) => queue.size()}
-        .sum
-
-  }
-
-  private val brokerStateMap: concurrent.Map[Int, BrokerRequestQueue] = concurrent.TrieMap.empty[Int, BrokerRequestQueue]
-  private val pendingTxnMap: concurrent.Map[PendingTxnKey, TransactionMetadata] = concurrent.TrieMap.empty[PendingTxnKey, TransactionMetadata]
-
-  // TODO: What is reasonable for this
-  private val brokerNotAliveBackoffMs = 10
-
-  // visible for testing
-  private[transaction] def queueForBroker(brokerId: Int) = {
-    brokerStateMap.get(brokerId)
-  }
-
-  private[transaction]
-  def drainQueuedTransactionMarkers(txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): Iterable[RequestAndCompletionHandler] = {
-    brokerStateMap.flatMap {case (brokerId: Int, brokerRequestQueue: BrokerRequestQueue) =>
-      brokerRequestQueue.eachMetadataPartition{ case(partitionId, queue) =>
-        val markersToSend: java.util.List[TxnMarkerEntry] = new util.ArrayList[TxnMarkerEntry]()
-        queue.drainTo(markersToSend)
-        val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(this, txnMarkerPurgatory, partitionId, markersToSend, brokerId)
-        RequestAndCompletionHandler(brokerRequestQueue.node, new WriteTxnMarkersRequest.Builder(markersToSend), requestCompletionHandler)
-      }
-    }
-  }
-
-
-  def addOrUpdateBroker(broker: Node) {
-    brokerStateMap.putIfAbsent(broker.id(), new BrokerRequestQueue(broker)) match {
-      case Some(brokerQueue) => brokerQueue.maybeUpdateNode(broker)
-      case None => // nothing to do
-    }
-  }
-
-  private[transaction] def addRequestForBroker(brokerId: Int, metadataPartition: Int, txnMarkerEntry: TxnMarkerEntry) {
-    val brokerQueue = brokerStateMap(brokerId)
-    brokerQueue.addRequests(metadataPartition, txnMarkerEntry)
-    trace(s"Added marker $txnMarkerEntry for broker $brokerId")
-  }
-
-  def addRequestToSend(metadataPartition: Int, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
-    val partitionsByDestination: immutable.Map[Int, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
-      val currentBrokers = mutable.Set.empty[Int]
-      var brokerId:Option[Int] = None
-
-      while(brokerId.isEmpty) {
-        val leaderForPartition = metadataCache.getPartitionInfo(topicPartition.topic, topicPartition.partition)
-        leaderForPartition match {
-          case Some(partitionInfo) =>
-            val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-            if (currentBrokers.add(leaderId)) {
-              try {
-                metadataCache.getAliveEndpoint(leaderId, interBrokerListenerName) match {
-                  case Some(broker) =>
-                    addOrUpdateBroker(broker)
-                    brokerId = Some(leaderId)
-                  case None =>
-                    currentBrokers.remove(leaderId)
-                    trace(s"alive endpoint for broker with id: $leaderId not available. retrying")
-
-                }
-              } catch {
-                case _:BrokerEndPointNotAvailableException =>
-                  currentBrokers.remove(leaderId)
-                  trace(s"alive endpoint for broker with id: $leaderId not available. retrying")
-              }
-            }
-          case None =>
-            trace(s"couldn't find leader for partition: $topicPartition")
-        }
-        if (brokerId.isEmpty)
-          time.sleep(brokerNotAliveBackoffMs)
-      }
-      brokerId.get
-    }
-
-    for ((brokerId: Int, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
-      val txnMarker = new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
-      addRequestForBroker(brokerId, metadataPartition, txnMarker)
-    }
-    networkClient.wakeup()
-  }
-
-  def maybeAddPendingRequest(metadataPartition: Int, metadata: TransactionMetadata): Boolean = {
-    val existingMetadataToWrite = pendingTxnMap.putIfAbsent(PendingTxnKey(metadataPartition, metadata.pid), metadata)
-    existingMetadataToWrite.isEmpty
-  }
-
-  def removeCompletedTxn(metadataPartition: Int, pid: Long): Unit = {
-    pendingTxnMap.remove(PendingTxnKey(metadataPartition, pid))
-  }
-
-  def pendingTxnMetadata(metadataPartition: Int, pid: Long): Option[TransactionMetadata] = {
-    pendingTxnMap.get(PendingTxnKey(metadataPartition, pid))
-  }
-
-  def close(): Unit = {
-    brokerStateMap.clear()
-    pendingTxnMap.clear()
-    networkClient.close()
-  }
-
-  def removeStateForPartition(partition: Int): mutable.Iterable[Long] = {
-    brokerStateMap.foreach { case(_, brokerQueue) =>
-      brokerQueue.removeRequestsForPartition(partition)
-    }
-    pendingTxnMap.filter { case (key: PendingTxnKey, _) => key.txnTopicPartition == partition }
-      .map { case (key: PendingTxnKey, _) =>
-        pendingTxnMap.remove(key)
-        key.producerId
-      }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 1b7ea56..b7a2e80 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -17,28 +17,33 @@
 package kafka.coordinator.transaction
 
 
+import java.util
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
+import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
-import org.apache.kafka.common.requests.TransactionResult
+import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersRequest}
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
-
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 
 import collection.JavaConverters._
+import scala.collection.{concurrent, immutable, mutable}
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
             metrics: Metrics,
             metadataCache: MetadataCache,
+            txnStateManager: TransactionStateManager,
             txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
             time: Time): TransactionMarkerChannelManager = {
 
-
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       config.interBrokerSecurityProtocol,
       JaasContext.Type.SERVER,
@@ -47,7 +52,6 @@ object TransactionMarkerChannelManager {
       config.saslMechanismInterBrokerProtocol,
       config.saslInterBrokerHandshakeRequestEnable
     )
-    val threadName = "TxnMarkerSenderThread-" + config.brokerId
     val selector = new Selector(
       NetworkReceive.UNLIMITED,
       config.connectionsMaxIdleMs,
@@ -61,7 +65,7 @@ object TransactionMarkerChannelManager {
     val networkClient = new NetworkClient(
       selector,
       new ManualMetadataUpdater(),
-      threadName,
+      s"broker-${config.brokerId}-txn-marker-sender",
       1,
       50,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
@@ -71,79 +75,226 @@ object TransactionMarkerChannelManager {
       false,
       new ApiVersions
     )
-    val channel = new TransactionMarkerChannel(config.interBrokerListenerName, metadataCache, networkClient, time)
-
-    val sendThread: InterBrokerSendThread = {
-      networkClient.wakeup()
-      new InterBrokerSendThread(threadName, networkClient, requestGenerator(channel, txnMarkerPurgatory), time)
-    }
 
     new TransactionMarkerChannelManager(config,
       metadataCache,
+      networkClient,
+      txnStateManager,
       txnMarkerPurgatory,
-      sendThread,
-      channel)
+      time)
   }
 
-  private[transaction] def requestGenerator(transactionMarkerChannel: TransactionMarkerChannel,
-                                            txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker]): () => Iterable[RequestAndCompletionHandler] = {
-    () => transactionMarkerChannel.drainQueuedTransactionMarkers(txnMarkerPurgatory)
+  private[transaction] def requestGenerator(transactionMarkerChannelManager: TransactionMarkerChannelManager): () => Iterable[RequestAndCompletionHandler] = {
+    () => transactionMarkerChannelManager.drainQueuedTransactionMarkers()
   }
 }
 
+class TxnMarkerQueue(@volatile private var destination: Node) {
+
+  // keep track of the requests per txn topic partition so we can easily clear the queue
+  // during partition emigration
+  private val markersPerTxnTopicPartition: concurrent.Map[Int, BlockingQueue[TxnIdAndMarkerEntry]]
+  = concurrent.TrieMap.empty[Int, BlockingQueue[TxnIdAndMarkerEntry]]
+
+  def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
+    markersPerTxnTopicPartition.remove(partition)
+  }
+
+  def maybeUpdateNode(node: Node): Unit = {
+    destination = node
+  }
+
+  def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = {
+    val queue = markersPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
+    queue.add(txnIdAndMarker)
+  }
+
+  def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[TxnIdAndMarkerEntry]) => B): mutable.Iterable[B] =
+    markersPerTxnTopicPartition.filter { case(_, queue) => !queue.isEmpty }
+      .map { case(partition:Int, queue:BlockingQueue[TxnIdAndMarkerEntry]) => f(partition, queue) }
+
+  def node: Node = destination
 
+  // TODO: this function is only for metrics recording, not yet added
+  def totalNumMarkers(): Int = markersPerTxnTopicPartition.map { case(_, queue) => queue.size()}.sum
+
+  // visible for testing
+  def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size())
+}
 
 class TransactionMarkerChannelManager(config: KafkaConfig,
                                       metadataCache: MetadataCache,
+                                      networkClient: NetworkClient,
+                                      txnStateManager: TransactionStateManager,
                                       txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                                      interBrokerSendThread: InterBrokerSendThread,
-                                      transactionMarkerChannel: TransactionMarkerChannel) extends Logging {
+                                      time: Time) extends Logging {
+
+  private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = concurrent.TrieMap.empty[Int, TxnMarkerQueue]
+
+  private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
 
-  type WriteTxnMarkerCallback = Errors => Unit
+  // TODO: What is reasonable for this
+  private val brokerNotAliveBackoffMs = 10
+
+  private val txnMarkerSendThread: InterBrokerSendThread = {
+    new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
+  }
 
   def start(): Unit = {
-    interBrokerSendThread.start()
+    txnMarkerSendThread.start()
+    networkClient.wakeup()    // FIXME: is this really required?
   }
 
   def shutdown(): Unit = {
-    interBrokerSendThread.shutdown()
-    transactionMarkerChannel.close()
+    txnMarkerSendThread.shutdown()
+    markersQueuePerBroker.clear()
+  }
+
+  // visible for testing
+  private[transaction] def queueForBroker(brokerId: Int) = {
+    markersQueuePerBroker.get(brokerId)
+  }
+
+  // visible for testing
+  private[transaction] def senderThread = txnMarkerSendThread
+
+  private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) {
+    val brokerId = broker.id
+
+    // we do not synchronize on the update of the broker node with the enqueuing,
+    // since even if there is a race condition we will just retry
+    val brokerRequestQueue = markersQueuePerBroker.getOrElseUpdate(brokerId, new TxnMarkerQueue(broker))
+    brokerRequestQueue.maybeUpdateNode(broker)
+    brokerRequestQueue.addMarkers(txnTopicPartition, txnIdAndMarker)
+
+    trace(s"Added marker ${txnIdAndMarker.txnMarkerEntry} for transactional id ${txnIdAndMarker.txnId} to destination broker $brokerId")
   }
 
+  private[transaction] def drainQueuedTransactionMarkers(): Iterable[RequestAndCompletionHandler] = {
+    markersQueuePerBroker.map { case (brokerId: Int, brokerRequestQueue: TxnMarkerQueue) =>
+      val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
+      brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
+        queue.drainTo(txnIdAndMarkerEntries)
+      }
+      (brokerRequestQueue.node, txnIdAndMarkerEntries)
+    }
+      .filter { case (_, entries) => !entries.isEmpty}
+      .map { case (node, entries) =>
+        val markersToSend: java.util.List[TxnMarkerEntry] = entries.asScala.map(_.txnMarkerEntry).asJava
+        val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
+        RequestAndCompletionHandler(node, new WriteTxnMarkersRequest.Builder(markersToSend), requestCompletionHandler)
+      }
+  }
+
+  def addTxnMarkersToSend(transactionalId: String,
+                          coordinatorEpoch: Int,
+                          txnResult: TransactionResult,
+                          txnMetadata: TransactionMetadata,
+                          newMetadata: TransactionMetadataTransition): Unit = {
+
+    def appendToLogCallback(error: Errors): Unit = {
+      error match {
+        case Errors.NONE =>
+          trace(s"Completed sending transaction markers for $transactionalId as $txnResult")
+
+          txnStateManager.getTransactionState(transactionalId) match {
+            case Some(epochAndMetadata) =>
+              if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
+                debug(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId succeeded")
 
-  def addTxnMarkerRequest(txnTopicPartition: Int, metadata: TransactionMetadata, coordinatorEpoch: Int, completionCallback: WriteTxnMarkerCallback): Unit = {
-    val metadataToWrite = metadata synchronized metadata.copy()
+                // try to append to the transaction log
+                def retryAppendCallback(error: Errors): Unit =
+                  error match {
+                    case Errors.NONE =>
+                      trace(s"Completed transaction for $transactionalId with coordinator epoch $coordinatorEpoch, final state: state after commit: ${txnMetadata.state}")
 
-    if (!transactionMarkerChannel.maybeAddPendingRequest(txnTopicPartition, metadata))
-      // TODO: Not sure this is the correct response here?
-      completionCallback(Errors.INVALID_TXN_STATE)
-    else {
-      val delayedTxnMarker = new DelayedTxnMarker(metadataToWrite, completionCallback)
-      txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(metadata.pid))
+                    case Errors.NOT_COORDINATOR =>
+                      info(s"No longer the coordinator for transactionalId: $transactionalId while trying to append to transaction log, skip writing to transaction log")
 
-      val result = metadataToWrite.state match {
-        case PrepareCommit => TransactionResult.COMMIT
-        case PrepareAbort => TransactionResult.ABORT
-        case s => throw new IllegalStateException("Unexpected txn metadata state while writing markers: " + s)
+                    case Errors.COORDINATOR_NOT_AVAILABLE =>
+                      warn(s"Failed updating transaction state for $transactionalId when appending to transaction log due to ${error.exceptionName}. retrying")
+
+                      // retry appending
+                      txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback)
+
+                    case errors: Errors =>
+                      throw new IllegalStateException(s"Unexpected error ${errors.exceptionName} while appending to transaction log for $transactionalId")
+                  }
+
+                txnStateManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, retryAppendCallback)
+              } else {
+                info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction markers " +
+                  s"has been sent to brokers. The cached metadata have been changed to $epochAndMetadata since preparing to send markers")
+              }
+
+            case None =>
+              // this transactional id no longer exists, maybe the corresponding partition has already been migrated out.
+              // we will stop appending the completed log entry to transaction topic as the new leader should be doing it.
+              info(s"Updating $transactionalId's transaction state to $txnMetadata with coordinator epoch $coordinatorEpoch for $transactionalId failed after the transaction message " +
+                s"has been appended to the log. The partition ${txnStateManager.partitionFor(transactionalId)} may have migrated as the metadata is no longer in the cache")
+          }
+
+        case other =>
+          throw new IllegalStateException(s"Unexpected error ${other.exceptionName} before appending to txn log for $transactionalId")
       }
-      transactionMarkerChannel.addRequestToSend(txnTopicPartition,
-        metadataToWrite.pid,
-        metadataToWrite.producerEpoch,
-        result,
-        coordinatorEpoch,
-        metadataToWrite.topicPartitions.toSet)
     }
+
+    // watch for both the transactional id and the transaction topic partition id,
+    // so we can cancel all the delayed operations for the same partition id;
+    // NOTE this is only possible because the hashcode of Int / String never overlaps
+
+    // TODO: if the delayed txn marker will always have infinite timeout, we can replace it with a map
+    val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback)
+    val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
+    txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId, txnTopicPartition))
+
+    addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
   }
 
-  def removeCompleted(txnTopicPartition: Int, pid: Long): Unit = {
-    transactionMarkerChannel.removeCompletedTxn(txnTopicPartition, pid)
+  def addTxnMarkersToBrokerQueue(transactionalId: String, pid: Long, epoch: Short, result: TransactionResult, coordinatorEpoch: Int, topicPartitions: immutable.Set[TopicPartition]): Unit = {
+    val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
+    val partitionsByDestination: immutable.Map[Node, immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
+      var brokerNode: Option[Node] = None
+
+      // TODO: instead of retry until succeed, we can first put it into an unknown broker queue and let the sender thread to look for its broker and migrate them
+      while (brokerNode.isEmpty) {
+        brokerNode = metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
+
+        if (brokerNode.isEmpty) {
+          trace(s"Couldn't find leader endpoint for partition: $topicPartition, retrying.")
+          time.sleep(brokerNotAliveBackoffMs)
+        }
+      }
+      brokerNode.get
+    }
+
+    for ((broker: Node, topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
+      val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, new TxnMarkerEntry(pid, epoch, coordinatorEpoch, result, topicPartitions.toList.asJava))
+      addMarkersForBroker(broker, txnTopicPartition, txnIdAndMarker)
+    }
+
+    networkClient.wakeup()
   }
 
-  def removeStateForPartition(transactionStateTopicPartitionId: Int): Unit = {
-    transactionMarkerChannel.removeStateForPartition(transactionStateTopicPartitionId)
-      .foreach{pid =>
-        txnMarkerPurgatory.cancelForKey(pid)
+  def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
+    txnMarkerPurgatory.cancelForKey(txnTopicPartitionId)
+    markersQueuePerBroker.foreach { case(_, brokerQueue) =>
+      brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
+        for (entry: TxnIdAndMarkerEntry <- queue.asScala)
+          removeMarkersForTxnId(entry.txnId)
       }
+    }
+  }
+
+  def removeMarkersForTxnId(transactionalId: String): Unit = {
+    // we do not need to clear the queue since it should have
+    // already been drained by the sender thread
+    txnMarkerPurgatory.cancelForKey(transactionalId)
   }
 
+  def completeSendMarkersForTxnId(transactionalId: String): Unit = {
+    txnMarkerPurgatory.checkAndComplete(transactionalId)
+  }
 }
+
+case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry)

http://git-wip-us.apache.org/repos/asf/kafka/blob/794e6dbd/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 5d68325..5978a97 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -22,25 +22,23 @@ import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 import org.apache.kafka.common.requests.WriteTxnMarkersResponse
 
 import scala.collection.mutable
 import collection.JavaConversions._
 
-class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: TransactionMarkerChannel,
-                                                txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                                                txnTopicPartition: Int,
-                                                txnMarkerEntries: java.util.List[TxnMarkerEntry],
-                                                brokerId: Int) extends RequestCompletionHandler with Logging {
+class TransactionMarkerRequestCompletionHandler(brokerId: Int,
+                                                txnStateManager: TransactionStateManager,
+                                                txnMarkerChannelManager: TransactionMarkerChannelManager,
+                                                txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry]) extends RequestCompletionHandler with Logging {
   override def onComplete(response: ClientResponse): Unit = {
     val correlationId = response.requestHeader.correlationId
     if (response.wasDisconnected) {
       trace(s"Cancelled request $response due to node ${response.destination} being disconnected")
       // re-enqueue the markers
-      for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
-        transactionMarkerChannel.addRequestToSend(
-          txnTopicPartition,
+      for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+        val txnMarker = txnIdAndMarker.txnMarkerEntry
+        txnMarkerChannelManager.addTxnMarkersToBrokerQueue(txnIdAndMarker.txnId,
           txnMarker.producerId(),
           txnMarker.producerEpoch(),
           txnMarker.transactionResult(),
@@ -52,43 +50,89 @@ class TransactionMarkerRequestCompletionHandler(transactionMarkerChannel: Transa
 
       val writeTxnMarkerResponse = response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
-      for (txnMarker: TxnMarkerEntry <- txnMarkerEntries) {
-        val errors = writeTxnMarkerResponse.errors(txnMarker.producerId())
+      for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries) {
+        val transactionalId = txnIdAndMarker.txnId
+        val txnMarker = txnIdAndMarker.txnMarkerEntry
+        val errors = writeTxnMarkerResponse.errors(txnMarker.producerId)
 
         if (errors == null)
-          throw new IllegalStateException("WriteTxnMarkerResponse does not contain expected error map for pid " + txnMarker.producerId())
-
-        val retryPartitions: mutable.Set[TopicPartition] = mutable.Set.empty[TopicPartition]
-        for ((topicPartition: TopicPartition, error: Errors) <- errors) {
-          error match {
-            case Errors.NONE =>
-              transactionMarkerChannel.pendingTxnMetadata(txnTopicPartition, txnMarker.producerId()) match {
-                case None =>
-                  // TODO: probably need to respond with Errors.NOT_COORDINATOR
-                  throw new IllegalArgumentException(s"transaction metadata not found during write txn marker request. partition ${txnTopicPartition} has likely emigrated")
-                case Some(metadata) =>
-                  // do not synchronize on this metadata since it will only be accessed by the sender thread
-                  metadata.topicPartitions -= topicPartition
+          throw new IllegalStateException(s"WriteTxnMarkerResponse does not contain expected error map for pid ${txnMarker.producerId}")
+
+        txnStateManager.getTransactionState(transactionalId) match {
+          case None =>
+            info(s"Transaction topic partition for $transactionalId may likely has emigrated, as the corresponding metadata do not exist in the cache" +
+              s"any more; cancel sending transaction markers $txnMarker to the brokers")
+
+            // txn topic partition has likely emigrated, just cancel it from the purgatory
+            txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+
+          case Some(epochAndMetadata) =>
+            val txnMetadata = epochAndMetadata.transactionMetadata
+            val retryPartitions: mutable.Set[TopicPartition] = mutable.Set.empty[TopicPartition]
+            var abortSending: Boolean = false
+
+            if (epochAndMetadata.coordinatorEpoch != txnMarker.coordinatorEpoch) {
+              // coordinator epoch has changed, just cancel it from the purgatory
+              info(s"Transaction coordinator epoch for $transactionalId has changed from ${txnMarker.coordinatorEpoch} to " +
+                s"${epochAndMetadata.coordinatorEpoch}; cancel sending transaction markers $txnMarker to the brokers")
+
+              txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+              abortSending = true
+            } else {
+              txnMetadata synchronized {
+                for ((topicPartition: TopicPartition, error: Errors) <- errors) {
+                  error match {
+                    case Errors.NONE =>
+
+                      txnMetadata.removePartition(topicPartition)
+
+                    case Errors.CORRUPT_MESSAGE |
+                         Errors.MESSAGE_TOO_LARGE |
+                         Errors.RECORD_LIST_TOO_LARGE |
+                         Errors.INVALID_REQUIRED_ACKS => // these are all unexpected and fatal errors
+
+                      throw new IllegalStateException(s"Received fatal error ${error.exceptionName} while sending txn marker for $transactionalId")
+
+                    case Errors.UNKNOWN_TOPIC_OR_PARTITION |
+                         Errors.NOT_LEADER_FOR_PARTITION |
+                         Errors.NOT_ENOUGH_REPLICAS |
+                         Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => // these are retriable errors
+
+                      info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " +
+                        s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}")
+
+                      retryPartitions += topicPartition
+
+                    case Errors.INVALID_PRODUCER_EPOCH |
+                         Errors.TRANSACTION_COORDINATOR_FENCED => // producer or coordinator epoch has changed, this txn can now be ignored
+
+                      info(s"Sending $transactionalId's transaction marker for partition $topicPartition has permanently failed with error ${error.exceptionName} " +
+                        s"with the current coordinator epoch ${epochAndMetadata.coordinatorEpoch}; cancel sending any more transaction markers $txnMarker to the brokers")
+
+                      txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
+                      abortSending = true
+
+                    case other =>
+                      throw new IllegalStateException(s"Unexpected error ${other.exceptionName} while sending txn marker for $transactionalId")
+                  }
+                }
+              }
+            }
+
+            if (!abortSending) {
+              if (retryPartitions.nonEmpty) {
+                // re-enqueue with possible new leaders of the partitions
+                txnMarkerChannelManager.addTxnMarkersToBrokerQueue(
+                  transactionalId,
+                  txnMarker.producerId(),
+                  txnMarker.producerEpoch(),
+                  txnMarker.transactionResult,
+                  txnMarker.coordinatorEpoch(),
+                  retryPartitions.toSet)
+              } else {
+                txnMarkerChannelManager.completeSendMarkersForTxnId(transactionalId)
               }
-            case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_LEADER_FOR_PARTITION |
-                 Errors.NOT_ENOUGH_REPLICAS | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-              retryPartitions += topicPartition
-            case _ =>
-              throw new IllegalStateException("Writing txn marker request failed permanently for pid " + txnMarker.producerId())
-          }
-
-          if (retryPartitions.nonEmpty) {
-            // re-enqueue with possible new leaders of the partitions
-            transactionMarkerChannel.addRequestToSend(
-              txnTopicPartition,
-              txnMarker.producerId(),
-              txnMarker.producerEpoch(),
-              txnMarker.transactionResult,
-              txnMarker.coordinatorEpoch(),
-              retryPartitions.toSet)
-          }
-          val completed = txnMarkerPurgatory.checkAndComplete(txnMarker.producerId())
-          trace(s"Competed $completed transactions for producerId ${txnMarker.producerId()}")
+            }
         }
       }
     }


Mime
View raw message