kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
Date Fri, 09 Feb 2018 00:23:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357749#comment-16357749
] 

ASF GitHub Bot commented on KAFKA-6501:
---------------------------------------

hachikuji closed pull request #4539: KAFKA-6501: Dynamic broker config tests updates and metrics
fix
URL: https://github.com/apache/kafka/pull/4539
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 144632c6c5d..8a17528bfb7 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -24,7 +24,6 @@ import java.util.concurrent._
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.metrics.KafkaMetricsGroup
-import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction,
CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
@@ -40,6 +39,10 @@ import scala.reflect.ClassTag
 object RequestChannel extends Logging {
   private val requestLogger = Logger("kafka.request.logger")
 
+  val RequestQueueSizeMetric = "RequestQueueSize"
+  val ResponseQueueSizeMetric = "ResponseQueueSize"
+  val ProcessorMetricTag = "processor"
+
   def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
 
   sealed trait BaseRequest
@@ -241,15 +244,16 @@ object RequestChannel extends Logging {
 }
 
 class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+  import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
 
-  newGauge("RequestQueueSize", new Gauge[Int] {
+  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
       def value = requestQueue.size
   })
 
-  newGauge("ResponseQueueSize", new Gauge[Int]{
+  newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
     def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
   def addProcessor(processor: Processor): Unit = {
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
+
+    newGauge(ResponseQueueSizeMetric,
+      new Gauge[Int] {
+        def value = processor.responseQueueSize
+      },
+      Map(ProcessorMetricTag -> processor.id.toString)
+    )
   }
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
+    removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room in the queue
for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index fef412b7198..d37b5231594 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
 }
 
+private[kafka] object Processor {
+  val IdlePercentMetricName = "IdlePercent"
+  val NetworkProcessorMetricTag = "networkProcessor"
+  val ListenerMetricTag = "listener"
+}
+
 /**
  * Thread that processes all requests from a single connection. There are N of these running
in parallel
  * each of which has its own selector
@@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int,
                                memoryPool: MemoryPool,
                                logContext: LogContext) extends AbstractServerThread(connectionQuotas)
with KafkaMetricsGroup {
 
+  import Processor._
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
       case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap
{ case (localHost, localPort) =>
@@ -471,18 +478,11 @@ private[kafka] class Processor(val id: Int,
   private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
 
   private[kafka] val metricTags = mutable.LinkedHashMap(
-    "listener" -> listenerName.value,
-    "networkProcessor" -> id.toString
+    ListenerMetricTag -> listenerName.value,
+    NetworkProcessorMetricTag -> id.toString
   ).asJava
 
-  newGauge("ResponseQueueSize",
-    new Gauge[Int] {
-      def value = responseQueue.size()
-    },
-    Map("processor" -> id.toString)
-  )
-
-  newGauge("IdlePercent",
+  newGauge(IdlePercentMetricName,
     new Gauge[Double] {
       def value = {
         Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics",
metricTags))).fold(0.0)(_.value)
@@ -490,7 +490,7 @@ private[kafka] class Processor(val id: Int,
     },
     // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the
equivalent Selector metric
     // also includes the listener name)
-    Map("networkProcessor" -> id.toString)
+    Map(NetworkProcessorMetricTag -> id.toString)
   )
 
   private val selector = createSelector(
@@ -742,6 +742,7 @@ private[kafka] class Processor(val id: Int,
       close(channel.id)
     }
     selector.close()
+    removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString))
   }
 
   // 'protected` to allow override for testing
@@ -792,7 +793,6 @@ private[kafka] class Processor(val id: Int,
 
   override def shutdown(): Unit = {
     super.shutdown()
-    removeMetric("ResponseQueueSize", Map("processor" -> id.toString))
     removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
   }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 312123c3ab6..aa085857560 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -91,7 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId:
Stri
     }
   }
 
-  private def getFetcherId(topic: String, partitionId: Int) : Int = {
+  // Visibility for testing
+  private[server] def getFetcherId(topic: String, partitionId: Int) : Int = {
     lock synchronized {
       Utils.abs(31 * topic.hashCode() + partitionId) % numFetchersPerBroker
     }
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index b4a015de576..eb2d835d876 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -103,12 +103,11 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
+  private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
     inReadLock(partitionMetadataLock) {
-      aliveNodes.get(brokerId).map { nodeMap =>
-        nodeMap.getOrElse(listenerName,
-          throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have
listener with name `$listenerName`"))
-      }
+      // Returns None if broker is not alive or if the broker does not have a listener named
`listenerName`.
+      // Since listeners can be added dynamically, a broker with a missing listener could
be a transient error.
+      aliveNodes.get(brokerId).flatMap(_.get(listenerName))
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
@@ -203,6 +202,11 @@ class MetadataCache(brokerId: Int) extends Logging {
         aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
         aliveNodes(broker.id) = nodes.asScala
       }
+      aliveNodes.get(brokerId).foreach { listenerMap =>
+        val listeners = listenerMap.keySet
+        if (!aliveNodes.values.forall(_.keySet == listeners))
+          error(s"Listeners are not identical across brokers: $aliveNodes")
+      }
 
       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index cb2ac5244a0..4cdc9894327 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,11 +26,14 @@ import java.util.{Collections, Properties}
 import java.util.concurrent._
 import javax.management.ObjectName
 
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.coordinator.group.OffsetConfig
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
+import kafka.network.{Processor, RequestChannel}
 import kafka.utils._
 import kafka.utils.Implicits._
 import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
@@ -92,6 +95,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
     super.setUp()
 
+    clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new
ones can be tested
+
     (0 until numServers).foreach { brokerId =>
 
       val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile1))
@@ -102,10 +107,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
       props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
-      props.put(KafkaConfig.LogSegmentBytesProp, "2000")
-      props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
+      props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling
on config update
+      props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing
threads
+      props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default
value to trigger a new metric
       props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
-      props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret")
+      props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret") //
for testing secret rotation
 
       props ++= sslProperties1
       addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@@ -261,9 +267,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp,
"2"))
 
-    // Verify cleaner config was updated
+    // Verify cleaner config was updated. Wait for one of the configs to be updated and verify
+    // that all other others were updated at the same time since they are reconfigured together
     val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
-    assertEquals(2, newCleanerConfig.numThreads)
+    TestUtils.waitUntilTrue(() => newCleanerConfig.numThreads == 2, "Log cleaner not reconfigured")
     assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
     assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
     assertEquals(300000, newCleanerConfig.ioBufferSize)
@@ -291,7 +298,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
     val props = new Properties
-    props.put(KafkaConfig.LogSegmentBytesProp, "10000")
+    props.put(KafkaConfig.LogSegmentBytesProp, "4000")
     props.put(KafkaConfig.LogRollTimeMillisProp, TimeUnit.HOURS.toMillis(2).toString)
     props.put(KafkaConfig.LogRollTimeJitterMillisProp, TimeUnit.HOURS.toMillis(1).toString)
     props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "100000")
@@ -312,7 +319,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     props.put(KafkaConfig.LogPreAllocateProp, true.toString)
     props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString)
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
-    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp,
"10000"))
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp,
"4000"))
 
     // Verify that all broker defaults have been updated
     servers.foreach { server =>
@@ -325,7 +332,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config))
     assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig)
     val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw
new IllegalStateException("Log not found"))
-    TestUtils.waitUntilTrue(() => log.config.segmentSize == 10000, "Existing topic config
using defaults not updated")
+    TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config
using defaults not updated")
     props.asScala.foreach { case (k, v) =>
       val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
       val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v
@@ -335,7 +342,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     consumerThread.waitForMatchingRecords(record => record.timestampType == TimestampType.LOG_APPEND_TIME)
 
     // Verify that the new config is actually used for new segments of existing logs
-    TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 9000), "Log segment
size increase not applied")
+    TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 3000), "Log segment
size increase not applied")
 
     // Verify that overridden topic configs are not updated when broker default is updated
     val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
0))
@@ -383,19 +390,20 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     // For others, thread count should be configuredCount * threadMultiplier * numBrokers
     val threadMultiplier = Map(
       requestHandlerPrefix -> 1,
-      networkThreadPrefix ->  2, // 2 endpoints
+      networkThreadPrefix -> 2, // 2 endpoints
       fetcherThreadPrefix -> (servers.size - 1)
     )
 
     // Tolerate threads left over from previous tests
-    def leftOverThreadCount(prefix: String, perBrokerCount: Int) : Int = {
+    def leftOverThreadCount(prefix: String, perBrokerCount: Int): Int = {
       val count = matchingThreads(prefix).size - perBrokerCount * servers.size * threadMultiplier(prefix)
       if (count > 0) count else 0
     }
+
     val leftOverThreads = Map(
       requestHandlerPrefix -> leftOverThreadCount(requestHandlerPrefix, servers.head.config.numIoThreads),
-      networkThreadPrefix ->  leftOverThreadCount(networkThreadPrefix, servers.head.config.numNetworkThreads),
-      fetcherThreadPrefix ->  leftOverThreadCount(fetcherThreadPrefix, servers.head.config.numReplicaFetchers)
+      networkThreadPrefix -> leftOverThreadCount(networkThreadPrefix, servers.head.config.numNetworkThreads),
+      fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, servers.head.config.numReplicaFetchers)
     )
 
     def maybeVerifyThreadPoolSize(propName: String, size: Int, threadPrefix: String): Unit
= {
@@ -404,21 +412,26 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
       if (expectedCountPerBroker > 0)
         verifyThreads(threadPrefix, expectedCountPerBroker, ignoreCount)
     }
+
     def reducePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int
= {
       val newSize = if (currentSize / 2 == 0) 1 else currentSize / 2
       resizeThreadPool(propName, newSize, threadPrefix)
       newSize
     }
+
     def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: String):
Int = {
-      resizeThreadPool(propName, currentSize * 2, threadPrefix)
-      currentSize * 2
+      val newSize = currentSize * 2 - 1
+      resizeThreadPool(propName, newSize, threadPrefix)
+      newSize
     }
+
     def resizeThreadPool(propName: String, newSize: Int, threadPrefix: String): Unit = {
       val props = new Properties
       props.put(propName, newSize.toString)
       reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString))
       maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
     }
+
     def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String,
mayReceiveDuplicates: Boolean): Unit = {
       maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix)
       val numRetries = if (mayReceiveDuplicates) 100 else 0
@@ -444,6 +457,57 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
+
+    verifyProcessorMetrics()
+    verifyMarkPartitionsForTruncation()
+  }
+
+  private def isProcessorMetric(metricName: MetricName): Boolean = {
+    val mbeanName = metricName.getMBeanName
+    mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=")
+  }
+
+  private def clearLeftOverProcessorMetrics(): Unit = {
+    val metricsFromOldTests = Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(isProcessorMetric)
+    metricsFromOldTests.foreach(Metrics.defaultRegistry.removeMetric)
+  }
+
+  // Verify that metrics from processors that were removed have been deleted.
+  // Since processor ids are not reused, it is sufficient to check metrics count
+  // based on the current number of processors
+  private def verifyProcessorMetrics(): Unit = {
+    val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners
+
+    val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala
+      .filter(_.tags.containsKey(Processor.NetworkProcessorMetricTag))
+      .groupBy(_.tags.get(Processor.NetworkProcessorMetricTag))
+    assertEquals(numProcessors, kafkaMetrics.size)
+
+    Metrics.defaultRegistry.allMetrics.keySet.asScala
+      .filter(isProcessorMetric)
+      .groupBy(_.getName)
+      .foreach { case (name, set) => assertEquals(s"Metrics not deleted $name", numProcessors,
set.size) }
+  }
+
+  // Verify that replicaFetcherManager.markPartitionsForTruncation uses the current fetcher
thread size
+  // to obtain partition assignment
+  private def verifyMarkPartitionsForTruncation(): Unit = {
+    val leaderId = 0
+    val partitions = (0 until numPartitions).map(i => new TopicPartition(topic, i)).filter
{ tp =>
+      zkClient.getLeaderForPartition(tp) == Some(leaderId)
+    }
+    assertTrue(s"Partitons not found with leader $leaderId", partitions.nonEmpty)
+    partitions.foreach { tp =>
+      (1 to 2).foreach { i =>
+        val replicaFetcherManager = servers(i).replicaManager.replicaFetcherManager
+        val truncationOffset = tp.partition
+        replicaFetcherManager.markPartitionsForTruncation(leaderId, tp, truncationOffset)
+        val fetcherThreads = replicaFetcherManager.fetcherThreadMap.filter(_._2.partitionStates.contains(tp))
+        assertEquals(1, fetcherThreads.size)
+        assertEquals(replicaFetcherManager.getFetcherId(tp.topic, tp.partition), fetcherThreads.head._1.fetcherId)
+        assertEquals(truncationOffset, fetcherThreads.head._2.partitionStates.stateValue(tp).fetchOffset)
+      }
+    }
   }
 
   @Test
@@ -672,7 +736,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
   private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
                                    saslMechanisms: Seq[String]): Unit = {
     val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
-    val producer1 = createProducer(listenerName, securityProtocol, saslMechanism)
+    val producer1 = createProducer(listenerName, securityProtocol, saslMechanism, retries
= 1000)
     val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism,
       s"remove-listener-group-$securityProtocol")
     verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
@@ -716,7 +780,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 
   private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]):
Unit = {
     val mechanism = saslMechanism.getOrElse("")
-    val producer = createProducer(securityProtocol.name, securityProtocol, mechanism)
+    val retries = 1000 // since it may take time for metadata to be updated on all brokers
+    val producer = createProducer(securityProtocol.name, securityProtocol, mechanism, retries)
     val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism,
       s"add-listener-group-$securityProtocol-$mechanism")
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
@@ -785,11 +850,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     props
   }
 
-  private def createProducer(listenerName: String, securityProtocol: SecurityProtocol,
-                             saslMechanism: String): KafkaProducer[String, String] = {
+  private def createProducer(listenerName: String,
+                             securityProtocol: SecurityProtocol,
+                             saslMechanism: String,
+                             retries: Int): KafkaProducer[String, String] = {
     val bootstrapServers =  TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
     val producer = TestUtils.createNewProducer(bootstrapServers,
-      acks = -1, retries = 0,
+      acks = -1, retries = retries,
       securityProtocol = securityProtocol,
       keySerializer = new StringSerializer,
       valueSerializer = new StringSerializer,
@@ -834,12 +901,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
                                    topic: String): Unit = {
     val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i",
s"value$i"))
     producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
-
-    val records = new ArrayBuffer[ConsumerRecord[String, String]]
+    var received = 0
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50).asScala
-      records.size == numRecords
-    }, s"Consumed ${records.size} records until timeout instead of the expected $numRecords
records")
+      received += consumer.poll(50).count
+      received >= numRecords
+    }, s"Consumed $received records until timeout instead of the expected $numRecords records")
+    assertEquals(numRecords, received)
   }
 
   private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 383c1e238c7..0ee73659780 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import java.util
 import util.Arrays.asList
 
-import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -289,14 +288,10 @@ class MetadataCacheTest {
       brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
-    try {
-      val result = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
-      fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol,
$result was returned instead")
-    }
-    catch {
-      case _: BrokerEndPointNotAvailableException => //expected
-    }
-
+    val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    assertEquals(1, topicMetadata.size)
+    assertEquals(1, topicMetadata.head.partitionMetadata.size)
+    assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId)
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add test to verify markPartitionsForTruncation after fetcher thread pool resize 
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-6501
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6501
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.2.0
>
>
> Follow-on task from KAFKA-6242



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message