Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 623B2200D2D for ; Fri, 27 Oct 2017 16:49:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60CBA160BDC; Fri, 27 Oct 2017 14:49:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5CEF1609E9 for ; Fri, 27 Oct 2017 16:49:56 +0200 (CEST) Received: (qmail 72787 invoked by uid 500); 27 Oct 2017 14:49:55 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 72778 invoked by uid 99); 27 Oct 2017 14:49:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Oct 2017 14:49:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4962DFB3D; Fri, 27 Oct 2017 14:49:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Ensure that the producer in testAlterReplicaLogDirs is always closed Date: Fri, 27 Oct 2017 14:49:55 +0000 (UTC) archived-at: Fri, 27 Oct 2017 14:49:57 -0000 Repository: kafka Updated Branches: refs/heads/trunk 603d4e5d9 -> 4e8ad90b9 MINOR: Ensure that the producer in testAlterReplicaLogDirs is always closed Failure to close the producer could cause a transient failure, more details below. The request timeout was only 2 seconds, exceptions thrown were not propagated and the producer would not be closed. If the exception was thrown during `send`, we did not increment `numMessages` allowing the test to pass. I have increased the timeout to 10 seconds and made sure that exceptions are propagated. Example of the error: ```text kafka.api.SaslSslAdminClientIntegrationTest > classMethod STARTED kafka.api.SaslSslAdminClientIntegrationTest > classMethod FAILED java.lang.AssertionError: Found unexpected threads, allThreads=Set(metrics-meter-tick-thread-2, Signal Dispatcher, main, Reference Handler, scala-execution-context-global-164, kafka-producer-network-thread | producer-1, scala-execution-context-global-166, Test worker, scala-execution-context-global-1249, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 2, Finalizer, /0:0:0:0:0:0:0:1:58910 to /0:0:0:0:0:0:0:1:43025 workers Thread 3, scala-execution-context-global-163, metrics-meter-tick-thread-1) ``` Author: Ismael Juma Reviewers: Rajini Sivaram Closes #4144 from ijuma/ensure-producer-is-closed-test-alter-replica-log-dirs Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4e8ad90b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4e8ad90b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4e8ad90b Branch: refs/heads/trunk Commit: 4e8ad90b94f28affd8e29db136df5e6e7c6c6c15 Parents: 603d4e5 Author: Ismael Juma Authored: Fri Oct 27 15:49:42 2017 +0100 Committer: Ismael Juma Committed: Fri Oct 27 15:49:42 2017 +0100 ---------------------------------------------------------------------- .../kafka/api/AdminClientIntegrationTest.scala | 75 +++++++++----------- .../unit/kafka/zk/ZooKeeperTestHarness.scala | 8 +-- 2 files changed, 39 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 583ba7a..d92ca30 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -21,7 +21,7 @@ import java.util.{Collections, Properties} import java.util.Arrays.asList import java.util.concurrent.{ExecutionException, TimeUnit} import java.io.File -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import org.apache.kafka.clients.admin.KafkaAdminClientTest import org.apache.kafka.common.utils.{Time, Utils} @@ -45,6 +45,8 @@ import org.junit.Assert._ import scala.util.Random import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} /** * An integration test of the KafkaAdminClient. @@ -278,13 +280,11 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition()) assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir) } - - client.close() } @Test def testAlterReplicaLogDirs(): Unit = { - val adminClient = AdminClient.create(createConfig()) + client = AdminClient.create(createConfig()) val topic = "topic" val tp = new TopicPartition(topic, 0) val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap @@ -300,23 +300,21 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { }.toMap // Verify that replica can be created in the specified log directory - adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).values().asScala.values.foreach { future => - try { - future.get() - fail("Future should fail with ReplicaNotAvailableException") - } catch { - case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[ReplicaNotAvailableException]) - } + val futures = client.alterReplicaLogDirs(firstReplicaAssignment.asJava, + new AlterReplicaLogDirsOptions).values.asScala.values + futures.foreach { future => + val exception = intercept[ExecutionException](future.get) + assertTrue(exception.getCause.isInstanceOf[ReplicaNotAvailableException]) } - TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties()) + TestUtils.createTopic(zkUtils, topic, 1, brokerCount, servers, new Properties) servers.foreach { server => val logDir = server.logManager.getLog(tp).get.dir.getParent assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir) } // Verify that replica can be moved to the specified log directory after the topic has been created - adminClient.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get() + client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get servers.foreach { server => TestUtils.waitUntilTrue(() => { val logDir = server.logManager.getLog(tp).get.dir.getParent @@ -326,48 +324,45 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { // Verify that replica can be moved to the specified log directory while the producer is sending messages val running = new AtomicBoolean(true) - @volatile var numMessages = 0 - val thread = new Thread() { - override def run(): Unit = { - val producer = TestUtils.createNewProducer( - TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol), - securityProtocol = securityProtocol, - trustStoreFile = trustStoreFile, - retries = 0, // Producer should not have to retry when broker is moving replica between log directories. - requestTimeoutMs = 2000, - acks = -1 - ) - - while (running.get()) { + val numMessages = new AtomicInteger + import scala.concurrent.ExecutionContext.Implicits._ + val producerFuture = Future { + val producer = TestUtils.createNewProducer( + TestUtils.getBrokerListStrFromServers(servers, protocol = securityProtocol), + securityProtocol = securityProtocol, + trustStoreFile = trustStoreFile, + retries = 0, // Producer should not have to retry when broker is moving replica between log directories. + requestTimeoutMs = 10000, + acks = -1 + ) + try { + while (running.get) { val future = producer.send(new ProducerRecord(topic, s"xxxxxxxxxxxxxxxxxxxx-$numMessages".getBytes)) - numMessages += 1 - future.get() + numMessages.incrementAndGet() + future.get(10, TimeUnit.SECONDS) } - producer.close() - } + numMessages.get + } finally producer.close() } try { - thread.start() - TestUtils.waitUntilTrue(() => numMessages > 100, "timed out waiting for message produce", 6000L) - adminClient.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions()).all().get() + TestUtils.waitUntilTrue(() => numMessages.get > 100, "timed out waiting for message produce", 6000L) + client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get servers.foreach { server => TestUtils.waitUntilTrue(() => { val logDir = server.logManager.getLog(tp).get.dir.getParent firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir }, "timed out waiting for replica movement", 6000L) } - } finally { - running.set(false) - thread.join() - } + } finally running.set(false) + + val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS)) // Verify that all messages that are produced can be consumed - val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, numMessages, securityProtocol, trustStoreFile) + val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, securityProtocol, trustStoreFile) consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) => - assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value())) + assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value)) } - adminClient.close() } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/4e8ad90b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 0a7e631..85a5596 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -81,7 +81,7 @@ object ZooKeeperTestHarness { */ @BeforeClass def setUpClass() { - verifyNoUnexpectedThreads() + verifyNoUnexpectedThreads("@BeforeClass") } /** @@ -89,18 +89,18 @@ object ZooKeeperTestHarness { */ @AfterClass def tearDownClass() { - verifyNoUnexpectedThreads() + verifyNoUnexpectedThreads("@AfterClass") } /** * Verifies that threads which are known to cause transient failures in subsequent tests * have been shutdown. */ - def verifyNoUnexpectedThreads() { + def verifyNoUnexpectedThreads(context: String) { def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName) val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads => threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s))) } - assertTrue(s"Found unexpected threads, allThreads=$threads", noUnexpected) + assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected) } }