kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5147; Add missing synchronization to TransactionManager
Date Thu, 25 May 2017 23:25:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 7dde914aa -> c62793eab


KAFKA-5147; Add missing synchronization to TransactionManager

The basic idea is that exactly three collections, ie. `pendingRequests`, `newPartitionsToBeAddedToTransaction`,
and `partitionsInTransaction` are accessed from the context of application threads. The first
two are modified from the application threads, and the last is read from those threads.

So to make the `TransactionManager` truly thread safe, we have to ensure that all accesses
to these three members are done in a synchronized block. I inspected the code, and I believe
this patch puts the synchronization in all the correct places.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3132 from apurvam/KAFKA-5147-transaction-manager-synchronization-fixes

(cherry picked from commit 02c0c3b01730bdbff8a09d355c1b017715c7ce10)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c62793ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c62793ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c62793ea

Branch: refs/heads/0.11.0
Commit: c62793eab35d166c33c018d931fa4a40024a2524
Parents: 7dde914
Author: Apurva Mehta <apurva@confluent.io>
Authored: Thu May 25 16:17:18 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu May 25 16:25:46 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  | 34 +++++++++++---------
 .../internals/TransactionalRequestResult.java   |  2 +-
 2 files changed, 20 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c62793ea/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index d674697..e5c6ec2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -240,23 +240,23 @@ public class TransactionManager {
         return transactionalId != null;
     }
 
-    public boolean isCompletingTransaction() {
+    public synchronized boolean isCompletingTransaction() {
         return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
     }
 
-    public boolean isInTransaction() {
-        return currentState == State.IN_TRANSACTION || isCompletingTransaction();
+    public synchronized boolean isInErrorState() {
+        return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
     }
 
-    public boolean isInErrorState() {
-        return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
+    synchronized boolean isInTransaction() {
+        return currentState == State.IN_TRANSACTION || isCompletingTransaction();
     }
 
-    public synchronized void transitionToAbortableError(RuntimeException exception) {
+    synchronized void transitionToAbortableError(RuntimeException exception) {
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
 
-    public synchronized void transitionToFatalError(RuntimeException exception) {
+    synchronized void transitionToFatalError(RuntimeException exception) {
         transitionTo(State.FATAL_ERROR, exception);
     }
 
@@ -383,17 +383,17 @@ public class TransactionManager {
     }
 
     // visible for testing
-    boolean transactionContainsPartition(TopicPartition topicPartition) {
+    synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
         return isInTransaction() && partitionsInTransaction.contains(topicPartition);
     }
 
     // visible for testing
-    boolean hasPendingOffsetCommits() {
+    synchronized boolean hasPendingOffsetCommits() {
         return isInTransaction() && !pendingTxnOffsetCommits.isEmpty();
     }
 
     // visible for testing
-    boolean isReadyForTransaction() {
+    synchronized boolean isReadyForTransaction() {
         return isTransactional() && currentState == State.READY;
     }
 
@@ -443,7 +443,7 @@ public class TransactionManager {
         return false;
     }
 
-    private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey)
{
+    private synchronized void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type,
String coordinatorKey) {
         switch (type) {
             case GROUP:
                 consumerGroupCoordinator = null;
@@ -459,7 +459,7 @@ public class TransactionManager {
         pendingRequests.add(new FindCoordinatorHandler(builder));
     }
 
-    private void completeTransaction() {
+    private synchronized void completeTransaction() {
         transitionTo(State.READY);
         lastError = null;
         partitionsInTransaction.clear();
@@ -516,8 +516,10 @@ public class TransactionManager {
         }
 
         void reenqueue() {
-            this.isRetry = true;
-            pendingRequests.add(this);
+            synchronized (TransactionManager.this) {
+                this.isRetry = true;
+                pendingRequests.add(this);
+            }
         }
 
         @Override
@@ -534,7 +536,9 @@ public class TransactionManager {
                     fatalError(response.versionMismatch());
                 } else if (response.hasResponse()) {
                     log.trace("Got transactional response for request:" + requestBuilder());
-                    handleResponse(response.responseBody());
+                    synchronized (TransactionManager.this) {
+                        handleResponse(response.responseBody());
+                    }
                 } else {
                     fatalError(new KafkaException("Could not execute transactional request
for unknown reasons"));
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c62793ea/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
index 840cb1e..ff93da8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
@@ -24,7 +24,7 @@ public final class TransactionalRequestResult {
     static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new
CountDownLatch(0));
 
     private final CountDownLatch latch;
-    private RuntimeException error = null;
+    private volatile RuntimeException error = null;
 
     public TransactionalRequestResult() {
         this(new CountDownLatch(1));


Mime
View raw message