From commits-return-9277-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Mar 27 18:21:25 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BA71A18064E for ; Tue, 27 Mar 2018 18:21:24 +0200 (CEST) Received: (qmail 50650 invoked by uid 500); 27 Mar 2018 16:21:23 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 50641 invoked by uid 99); 27 Mar 2018 16:21:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Mar 2018 16:21:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 22443820AC; Tue, 27 Mar 2018 16:21:23 +0000 (UTC) Date: Tue, 27 Mar 2018 16:21:22 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152216768263.24799.13143348782407314224@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: f4a39f9b8314847ffc7250f8cee3e87795b0aaeb X-Git-Newrev: 9eb32eaad5fa3863d60b84ef095fa041e83b7b47 X-Git-Rev: 9eb32eaad5fa3863d60b84ef095fa041e83b7b47 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9eb32ea KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563) 9eb32ea is described below commit 9eb32eaad5fa3863d60b84ef095fa041e83b7b47 Author: huxi AuthorDate: Wed Mar 28 00:21:18 2018 +0800 KAFKA-6446; KafkaProducer initTransactions() should timeout after max.block.ms (#4563) Currently the `initTransactions()` API blocks indefinitely if the broker cannot be reached. This patch changes the behavior to raise a `TimeoutException` after waiting for `max.block.ms`. Reviewers: Apurva Mehta , Jason Gustafson --- .../kafka/clients/producer/KafkaProducer.java | 25 +++++++-- .../kafka/clients/producer/internals/Sender.java | 2 +- .../internals/TransactionalRequestResult.java | 5 +- .../kafka/clients/producer/KafkaProducerTest.java | 61 ++++++++++++++++++++++ .../integration/kafka/api/TransactionsTest.scala | 17 +++++- 5 files changed, 103 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5fc9a1b..a5af5b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -256,6 +256,7 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; + private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -555,18 +556,36 @@ public class KafkaProducer implements Producer { * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * + * Note that this method will raise {@link TimeoutException} if the transactional state cannot + * be initialized before expiration of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} + * if interrupted. It is safe to retry in either case, but once the transactional state has been successfully + * initialized, this method should no longer be used. + * * @throws IllegalStateException if no transactional.id has been configured * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker * does not support transactions (i.e. if its version is lower than 0.11.0.0) * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for initialize the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void initTransactions() { throwIfNoTransactionManager(); - TransactionalRequestResult result = transactionManager.initializeTransactions(); - sender.wakeup(); - result.await(); + if (initTransactionsResult == null) { + initTransactionsResult = transactionManager.initializeTransactions(); + sender.wakeup(); + } + + try { + if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { + initTransactionsResult = null; + } else { + throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); + } + } catch (InterruptedException e) { + throw new InterruptException("Initialize transactions interrupted.", e); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7eea499..426b273 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -329,7 +329,7 @@ public class Sender implements Runnable { return false; AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); - while (true) { + while (running) { Node targetNode = null; try { if (nextRequestHandler.needsCoordinator()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index ff93da8..9c02e94 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -59,7 +59,10 @@ public final class TransactionalRequestResult { } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - return latch.await(timeout, unit); + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) + throw error(); + return success; } public RuntimeException error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9f70fd7..8bfc5e7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -548,4 +548,65 @@ public class KafkaProducerTest { // expected } } + + @Test(expected = TimeoutException.class) + public void testInitTransactionTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + fail("initTransactions() should have raised TimeoutException"); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } + + @Test(expected = KafkaException.class) + public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() { + Properties props = new Properties(); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "bad-transaction"); + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster("topic", 1); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + + Producer producer = new KafkaProducer<>( + new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())), + new StringSerializer(), new StringSerializer(), metadata, client); + try { + producer.initTransactions(); + } catch (TimeoutException e) { + // expected + } + // other transactional operations should not be allowed if we catch the error after initTransactions failed + try { + producer.beginTransaction(); + } finally { + producer.close(0, TimeUnit.MILLISECONDS); + } + } } diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 911808a..8435e5a 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -19,7 +19,7 @@ package kafka.api import java.lang.{Long => JLong} import java.util.Properties -import java.util.concurrent.{ExecutionException, TimeUnit} +import java.util.concurrent.TimeUnit import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig @@ -27,7 +27,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.ProducerFencedException import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.{After, Before, Test} @@ -532,6 +532,19 @@ class TransactionsTest extends KafkaServerTestHarness { } } + @Test(expected = classOf[KafkaException]) + def testConsecutivelyRunInitTransactions(): Unit = { + val producer = createTransactionalProducer(transactionalId = "normalProducer") + + try { + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } finally { + producer.close() + } + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { -- To stop receiving notification emails like this one, please contact jgus@apache.org.