kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5282; Use a factory method to create producers/consumers and close them in tearDown
Date Fri, 02 Jun 2017 09:27:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ca54eaef4 -> 99a3f8165


KAFKA-5282; Use a factory method to create producers/consumers and close them in tearDown

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

This patch had conflicts when merged, resolved by
Committer: Ismael Juma <ismael@juma.me.uk>

Closes #3129 from vahidhashemian/KAFKA-5282


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99a3f816
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99a3f816
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99a3f816

Branch: refs/heads/0.11.0
Commit: 99a3f816579a5c8d92d8624ccbbdbdb63f519c7b
Parents: ca54eae
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Jun 2 10:24:42 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jun 2 10:27:25 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/TransactionsTest.scala            | 374 +++++++++----------
 1 file changed, 180 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/99a3f816/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0a082ed..9aceec8 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -24,23 +24,31 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer,
OffsetAndMetadata}
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ProducerFencedException
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Buffer}
 import scala.concurrent.ExecutionException
 
 class TransactionsTest extends KafkaServerTestHarness {
   val numServers = 3
+  val transactionalProducerCount = 2
+  val transactionalConsumerCount = 1
+  val nonTransactionalConsumerCount = 1
+
   val topic1 = "topic1"
   val topic2 = "topic2"
 
+  val transactionalProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
 
-  override def generateConfigs : Seq[KafkaConfig] = {
+  override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_,
serverProps()))
   }
 
@@ -52,49 +60,54 @@ class TransactionsTest extends KafkaServerTestHarness {
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
     TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
     TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
+
+    for (_ <- 0 until transactionalProducerCount)
+      transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer",
servers)
+    for (_ <- 0 until transactionalConsumerCount)
+      transactionalConsumers += transactionalConsumer("transactional-group")
+    for (_ <- 0 until nonTransactionalConsumerCount)
+      nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group")
   }
 
   @After
   override def tearDown(): Unit = {
+    transactionalProducers.foreach(_.close())
+    transactionalConsumers.foreach(_.close())
+    nonTransactionalConsumers.foreach(_.close())
     super.tearDown()
   }
 
   @Test
   def testBasicTransactions() = {
-    val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id",
servers)
-    val consumer = transactionalConsumer("transactional-group")
-    val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group")
-    try {
-      producer.initTransactions()
+    val producer = transactionalProducers(0)
+    val consumer = transactionalConsumers(0)
+    val unCommittedConsumer = nonTransactionalConsumers(0)
 
-      producer.beginTransaction()
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2",
willBeCommitted = false))
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4",
willBeCommitted = false))
-      producer.flush()
-      producer.abortTransaction()
+    producer.initTransactions()
 
-      producer.beginTransaction()
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = true))
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = true))
-      producer.commitTransaction()
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2",
willBeCommitted = false))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4",
willBeCommitted = false))
+    producer.flush()
+    producer.abortTransaction()
 
-      consumer.subscribe(List(topic1, topic2))
-      unCommittedConsumer.subscribe(List(topic1, topic2))
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = true))
+    producer.commitTransaction()
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
+    consumer.subscribe(List(topic1, topic2).asJava)
+    unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
 
-      val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
-      val expectedValues = List("1", "2", "3", "4").toSet
-      allRecords.zipWithIndex.foreach { case (record, i) =>
-        assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
-      }
-    } finally {
-      consumer.close()
-      producer.close()
-      unCommittedConsumer.close()
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
+    }
+
+    val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+    val expectedValues = List("1", "2", "3", "4").toSet
+    allRecords.foreach { record =>
+      assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
     }
   }
 
@@ -109,16 +122,15 @@ class TransactionsTest extends KafkaServerTestHarness {
     //     transactions, we should not have any duplicates or missing messages since we should
process in the input
     //     messages exactly once.
 
-    val transactionalId = "foobar-id"
     val consumerGroupId = "foobar-consumer-group"
     val numSeedMessages = 500
 
     TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers)
 
-    val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer = transactionalProducers(0)
 
     val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages
/ 4)
-    consumer.subscribe(List(topic1))
+    consumer.subscribe(List(topic1).asJava)
     producer.initTransactions()
 
     var shouldCommit = false
@@ -130,13 +142,13 @@ class TransactionsTest extends KafkaServerTestHarness {
         producer.beginTransaction()
         shouldCommit = !shouldCommit
 
-        records.zipWithIndex.foreach { case (record, i) =>
+        records.foreach { record =>
           val key = new String(record.key(), "UTF-8")
           val value = new String(record.value(), "UTF-8")
           producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key,
value, willBeCommitted = shouldCommit))
         }
 
-        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId)
+        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroupId)
         if (shouldCommit) {
           producer.commitTransaction()
           recordsProcessed += records.size
@@ -150,18 +162,16 @@ class TransactionsTest extends KafkaServerTestHarness {
        }
       }
     } finally {
-      producer.close()
       consumer.close()
     }
 
-    // In spite of random aborts, we should still have exactly 1000 messages in topic2. Ie.
we should not
+    // In spite of random aborts, we should still have exactly 1000 messages in topic2. I.e.
we should not
     // re-copy or miss any messages from topic1, since the consumed offsets were committed
transactionally.
-    val verifyingConsumer = transactionalConsumer("foobargroup")
-    verifyingConsumer.subscribe(List(topic2))
+    val verifyingConsumer = transactionalConsumers(0)
+    verifyingConsumer.subscribe(List(topic2).asJava)
     val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map
{ record =>
       TestUtils.assertCommittedAndGetValue(record).toInt
     }
-    verifyingConsumer.close()
     val valueSet = valueSeq.toSet
     assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size)
     assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size,
valueSet.size)
@@ -169,190 +179,166 @@ class TransactionsTest extends KafkaServerTestHarness {
 
   @Test
   def testFencingOnCommit() = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
-
-      try {
-        producer1.commitTransaction()
-        fail("Should not be able to commit transactions from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          // good!
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
+
+    try {
+      producer1.commitTransaction()
+      fail("Should not be able to commit transactions from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction()  // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnSendOffsets() = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
-
-      try {
-        producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) ->
new OffsetAndMetadata(110L)),  "foobarGroup")
-        fail("Should not be able to send offsets from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          // good!
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
+
+    try {
+      producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new
OffsetAndMetadata(110L)).asJava,
+        "foobarGroup")
+      fail("Should not be able to send offsets from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction()  // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnSend() {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true)).get()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true)).get()
-
-      try {
-        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
"1", "5", willBeCommitted = false))
-        val recordMetadata = result.get()
-        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}.
Grab the logs!!")
-        servers.foreach { case (server) =>
-          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
-        }
-        fail("Should not be able to send messages from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          producer1.close()
-        case e : ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true)).get()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true)).get()
+
+    try {
+      val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
"1", "5", willBeCommitted = false))
+      val recordMetadata = result.get()
+      error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}.
Grab the logs!!")
+      servers.foreach { server =>
+        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
       }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+      fail("Should not be able to send messages from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        producer1.close()
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction() // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnAddPartitions(): Unit = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
+
+    producer1.initTransactions()
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
+    producer1.abortTransaction()
+
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
+      .get(20, TimeUnit.SECONDS)
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
+      .get(20, TimeUnit.SECONDS)
 
+    try {
       producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1",
willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3",
willBeCommitted = false))
-      producer1.abortTransaction()
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4",
willBeCommitted = true))
-        .get(20, TimeUnit.SECONDS)
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4",
willBeCommitted = true))
-        .get(20, TimeUnit.SECONDS)
-
-      try {
-        producer1.beginTransaction()
-        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
"1", "5", willBeCommitted = false))
-        val recordMetadata = result.get()
-        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}.
Grab the logs!!")
-        servers.foreach { case (server) =>
-          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
-        }
-        fail("Should not be able to send messages from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-        case e : ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
+      val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
"1", "5", willBeCommitted = false))
+      val recordMetadata = result.get()
+      error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}.
Grab the logs!!")
+      servers.foreach { case (server) =>
+        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
       }
+      fail("Should not be able to send messages from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
 
-      producer2.commitTransaction()  // ok
+    producer2.commitTransaction()  // ok
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
@@ -388,10 +374,10 @@ class TransactionsTest extends KafkaServerTestHarness {
       groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
   }
 
-  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
     val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50)
+      records ++= consumer.poll(50).asScala
       records.size == numRecords
     }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
     records


Mime
View raw message