kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5260; Producer should not send AbortTxn unless transaction has actually begun
Date Tue, 30 May 2017 05:57:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 a1da4c534 -> 9dd7db2f5


KAFKA-5260; Producer should not send AbortTxn unless transaction has actually begun

Keep track of when a transaction has begun by setting a flag, `transactionStarted` when a
successfull `AddPartitionsToTxnResponse` or `AddOffsetsToTxnResponse` had been received. If
an `AbortTxnRequest` about to be sent and `transactionStarted` is false, don't send the request
and transition the state to `READY`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>,
Jason Gustafson <jason@confluent.io>

Closes #3126 from dguy/kafka-5260

(cherry picked from commit a8794d8a5d18bb4eaafceac1ef675243af945862)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9dd7db2f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9dd7db2f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9dd7db2f

Branch: refs/heads/0.11.0
Commit: 9dd7db2f5fb4b6c14ce72d2e87eea917016bf02b
Parents: a1da4c5
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon May 29 22:52:59 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon May 29 22:55:50 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  | 13 ++++
 .../internals/TransactionManagerTest.java       | 64 ++++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  1 -
 .../kafka/api/AuthorizerIntegrationTest.scala   | 25 +++++++-
 4 files changed, 99 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9dd7db2f/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index e5c6ec2..ec7ced2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -79,6 +79,7 @@ public class TransactionManager {
     private volatile State currentState = State.UNINITIALIZED;
     private volatile RuntimeException lastError = null;
     private volatile ProducerIdAndEpoch producerIdAndEpoch;
+    private volatile boolean transactionStarted = false;
 
     private enum State {
         UNINITIALIZED,
@@ -343,7 +344,16 @@ public class TransactionManager {
             return null;
         }
 
+        if (nextRequestHandler != null && nextRequestHandler.isEndTxn() &&
!transactionStarted) {
+            ((EndTxnHandler) nextRequestHandler).result.done();
+            if (currentState != State.FATAL_ERROR) {
+                completeTransaction();
+            }
+            return pendingRequests.poll();
+        }
+
         return nextRequestHandler;
+
     }
 
     synchronized void retry(TxnRequestHandler request) {
@@ -462,6 +472,7 @@ public class TransactionManager {
     private synchronized void completeTransaction() {
         transitionTo(State.READY);
         lastError = null;
+        transactionStarted = false;
         partitionsInTransaction.clear();
     }
 
@@ -686,6 +697,7 @@ public class TransactionManager {
             } else {
                 partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
                 pendingPartitionsToBeAddedToTransaction.clear();
+                transactionStarted = true;
                 result.done();
             }
         }
@@ -831,6 +843,7 @@ public class TransactionManager {
             if (error == Errors.NONE) {
                 // note the result is not completed until the TxnOffsetCommit returns
                 pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
+                transactionStarted = true;
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR)
{
                 lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
                 reenqueue();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dd7db2f/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index e9363d0..a1bd970 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -737,6 +737,70 @@ public class TransactionManagerTest {
         verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
     }
 
+    @Test
+    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception
{
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as the first order
of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION,
transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+        prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch,
pid);
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+    }
+
+    @Test
+    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception
{
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as the first order
of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION,
transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+
+        transactionManager.beginTransaction();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+        prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId,
pid, epoch);
+        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws
InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dd7db2f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fb69c50..dd6f18d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1550,7 +1550,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
-
     if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId)))
       sendResponseMaybeThrottle(request, requestThrottleMs =>
         addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dd7db2f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c464834..017c57f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
@@ -24,7 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -1037,7 +1037,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producer.initTransactions()
     producer.beginTransaction()
     producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
-    producer.flush()
     removeAllAcls()
     try {
       producer.commitTransaction()
@@ -1048,6 +1047,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
transactionalIdResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
new Resource(Topic, deleteTopic))
+    val producer = buildTransactionalProducer()
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
+    // try and add a partition resulting in TopicAuthorizationException
+    try {
+      producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, "1".getBytes)).get
+    } catch {
+      case e : ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
+    }
+    // now rollback
+    producer.abortTransaction()
+  }
+
+  @Test
   def shouldThrowTransactionalIdAuthorizationExceptionWhenNoTransactionAccessOnSendOffsetsToTxn():
Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
transactionalIdResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),
groupResource)


Mime
View raw message