kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7266: Fix MetricsTest.testMetrics flakiness using compression (#5485)
Date Mon, 13 Aug 2018 20:54:21 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 76a00d7  KAFKA-7266: Fix MetricsTest.testMetrics flakiness using compression (#5485)
76a00d7 is described below

commit 76a00d78be758153b6485818c0cb86423b69100b
Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
AuthorDate: Mon Aug 13 21:54:14 2018 +0100

    KAFKA-7266: Fix MetricsTest.testMetrics flakiness using compression (#5485)
    
    Increase record size and use compression for downconversion metrics test to ensure that
conversion time is above 1ms to avoid transient test failures.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../test/scala/integration/kafka/api/MetricsTest.scala  | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index fe75639..494bcce 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -44,7 +44,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false")
   this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false")
   this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
-  this.producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1000000")
+  // intentionally slow message down conversion via gzip compression to ensure we can measure
the time it takes
+  this.producerConfig.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
   override protected val serverSaslProperties =
     Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
@@ -78,7 +79,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
     // Produce and consume some records
     val numRecords = 10
-    val recordSize = 1000
+    val recordSize = 100000
     val producer = producers.head
     sendRecords(producer, numRecords, recordSize, tp)
 
@@ -198,17 +199,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
     verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
 
-    // Conversion time less than 1 millisecond is reported as zero, so retry with larger
batches until time > 0
-    var iteration = 0
-    TestUtils.retry(5000) {
-      val conversionTimeMs = yammerMetricValue(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").asInstanceOf[Double]
-      if (conversionTimeMs <= 0.0) {
-        iteration += 1
-        sendRecords(producers.head, 1000 * iteration, 100, tp)
-      }
-      assertTrue(s"Message conversion time not recorded $conversionTimeMs", conversionTimeMs
> 0.0)
-    }
-
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce",
value => value > 0.0)
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch",
value => value == 0.0)
 


Mime
View raw message