kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7223: Add late-record metrics (#5742)
Date Fri, 12 Oct 2018 16:14:09 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d07fcd7  KAFKA-7223: Add late-record metrics (#5742)
d07fcd7 is described below

commit d07fcd782d258b005cc2dca636952507dacba282
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Oct 12 11:12:51 2018 -0500

    KAFKA-7223: Add late-record metrics (#5742)
    
    Add late record metrics, as specified in KIP-328
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 build.gradle                                       |   1 +
 gradle/dependencies.gradle                         |   2 +
 .../streams/kstream/internals/metrics/Sensors.java |  36 +++++
 .../processor/internals/PartitionGroup.java        |  12 +-
 .../streams/processor/internals/StreamTask.java    |   3 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |  29 +++-
 .../internals/KStreamWindowAggregateTest.java      | 170 +++++++++++++--------
 .../processor/internals/PartitionGroupTest.java    | 103 +++++++------
 .../org/apache/kafka/test/StreamsTestUtils.java    |   2 +
 .../apache/kafka/streams/TopologyTestDriver.java   |  10 +-
 10 files changed, 248 insertions(+), 120 deletions(-)

diff --git a/build.gradle b/build.gradle
index 95f3eb3..e78d2ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -974,6 +974,7 @@ project(':streams') {
     testCompile libs.junit
     testCompile libs.easymock
     testCompile libs.bcpkix
+    testCompile libs.hamcrest
 
     testRuntimeOnly project(':streams:test-utils')
     testRuntime libs.slf4jlog4j
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e22885e..e11ded1 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
   jetty: "9.4.12.v20180830",
   jersey: "2.27",
   jmh: "1.21",
+  hamcrest: "1.3",
   log4j: "1.2.17",
   scalaLogging: "3.9.0",
   jaxb: "2.3.0",
@@ -117,6 +118,7 @@ libs += [
   jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   junit: "junit:junit:$versions.junit",
+  hamcrest: "org.hamcrest:hamcrest-all:1.3",
   kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
   kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 04c7150..a85bbb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.streams.kstream.internals.metrics;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
+import java.util.Map;
+
 public class Sensors {
     private Sensors() {}
 
@@ -39,4 +44,35 @@ public class Sensors {
         );
         return sensor;
     }
+
+    public static Sensor recordLatenessSensor(final InternalProcessorContext context) {
+        final StreamsMetricsImpl metrics = context.metrics();
+
+        final Sensor sensor = metrics.taskLevelSensor(
+            context.taskId().toString(),
+            "record-lateness",
+            Sensor.RecordingLevel.DEBUG
+        );
+
+        final Map<String, String> tags = metrics.tagMap(
+            "task-id", context.taskId().toString()
+        );
+        sensor.add(
+            new MetricName(
+                "record-lateness-avg",
+                "stream-processor-node-metrics",
+                "The average observed lateness of records.",
+                tags),
+            new Avg()
+        );
+        sensor.add(
+            new MetricName(
+                "record-lateness-max",
+                "stream-processor-node-metrics",
+                "The max observed lateness of records.",
+                tags),
+            new Max()
+        );
+        return sensor;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 7020253..1fdd454 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,6 +39,7 @@ import java.util.Set;
 public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
+    private final Sensor recordLatenessSensor;
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
 
     private long streamTime;
@@ -61,9 +63,10 @@ public class PartitionGroup {
         }
     }
 
-    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
+    PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor
recordLatenessSensor) {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
+        this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
         allBuffered = false;
         streamTime = RecordQueue.UNKNOWN;
@@ -95,7 +98,12 @@ public class PartitionGroup {
                 }
 
                 // always update the stream time to the record's timestamp yet to be processed
if it is larger
-                streamTime = Math.max(streamTime, record.timestamp);
+                if (record.timestamp > streamTime) {
+                    streamTime = record.timestamp;
+                    recordLatenessSensor.record(0);
+                } else {
+                    recordLatenessSensor.record(streamTime - record.timestamp);
+                }
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2ad0acc..247a156 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor;
 
 /**
  * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread
for processing.
@@ -234,7 +235,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
 
         recordInfo = new PartitionGroup.RecordInfo();
-        partitionGroup = new PartitionGroup(partitionQueues);
+        partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl));
         processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 419c861..1074f02f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
@@ -55,7 +54,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -110,7 +111,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         final StoreBuilder<SessionStore<String, Long>> storeBuilder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME,
ofMillis(GAP_MS * 3)),
                                                                                         
        Serdes.String(),
                                                                                         
        Serdes.Long())
-            .withLoggingDisabled();
+                                                                            .withLoggingDisabled();
 
         if (enableCaching) {
             storeBuilder.withCachingEnabled();
@@ -335,9 +336,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setStreamTime(20);
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
         processor.process("A", "1");
+        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+        processor.process("A", "1");
         LogCaptureAppender.unregister(appender);
 
-        final Metric dropMetric = metrics.metrics().get(new MetricName(
+        final MetricName dropMetric = new MetricName(
             "late-record-drop-total",
             "stream-processor-node-metrics",
             "The total number of occurrence of late-record-drop operations.",
@@ -346,8 +349,24 @@ public class KStreamSessionWindowAggregateProcessorTest {
                 mkEntry("task-id", "0_0"),
                 mkEntry("processor-node-id", "TESTING_NODE")
             )
-        ));
-        assertEquals(1.0, dropMetric.metricValue());
+        );
+
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0));
+
         assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A]
topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
+        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A]
topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 8ae6284..236cd8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@ import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
 import org.junit.Test;
 
 import java.util.List;
@@ -51,9 +53,10 @@ import static java.time.Duration.ofMillis;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -70,7 +73,7 @@ public class KStreamWindowAggregateTest {
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String,
String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
@@ -128,7 +131,7 @@ public class KStreamWindowAggregateTest {
 
         final KTable<Windowed<String>, String> table1 = builder
             .stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String,
String, WindowStore<Bytes, byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
 
@@ -137,7 +140,7 @@ public class KStreamWindowAggregateTest {
 
         final KTable<Windowed<String>, String> table2 = builder
             .stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.<String,
String, WindowStore<Bytes, byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
 
@@ -231,8 +234,9 @@ public class KStreamWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final String topic = "topic";
 
-        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(),
Serdes.String()));
-        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+        builder
+            .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
             .aggregate(
                 MockInitializer.STRING_INIT,
@@ -258,15 +262,15 @@ public class KStreamWindowAggregateTest {
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(),
Serdes.String()));
         stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
-            .aggregate(
-                () -> "",
-                MockAggregator.toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
-            )
-            .toStream()
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("output");
+               .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
+               .aggregate(
+                   () -> "",
+                   MockAggregator.toStringInstance("+"),
+                   Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+               )
+               .toStream()
+               .map((key, value) -> new KeyValue<>(key.toString(), value))
+               .to("output");
 
         LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
@@ -281,17 +285,13 @@ public class KStreamWindowAggregateTest {
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            final MetricName metricName = new MetricName(
-                "late-record-drop-total",
-                "stream-processor-node-metrics",
-                "The total number of occurrence of late-record-drop operations.",
-                mkMap(
-                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
-                    mkEntry("task-id", "0_0"),
-                    mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
-                )
+            assertLatenessMetrics(
+                driver,
+                is(7.0), // how many events get dropped
+                is(100.0), // k:0 is 100ms late, since its time is 0, but it arrives at stream
time 100.
+                is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8
             );
-            assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
+
             assertThat(appender.getMessages(), hasItems(
                 "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
                 "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
@@ -316,59 +316,101 @@ public class KStreamWindowAggregateTest {
         final String topic = "topic";
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(),
Serdes.String()));
-        stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L)))
-            .aggregate(
-                () -> "",
-                MockAggregator.toStringInstance("+"),
-                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
-            )
-            .toStream()
-            .map((key, value) -> new KeyValue<>(key.toString(), value))
-            .to("output");
+        stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+               .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
+               .aggregate(
+                   () -> "",
+                   MockAggregator.toStringInstance("+"),
+                   Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+               )
+               .toStream()
+               .map((key, value) -> new KeyValue<>(key.toString(), value))
+               .to("output");
 
         LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props,
0L)) {
-            driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
-            driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
-            driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
-            driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
-            driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
-            driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
-            driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+            driver.pipeInput(recordFactory.create(topic, "k", "100", 200L));
+            driver.pipeInput(recordFactory.create(topic, "k", "0", 100L));
+            driver.pipeInput(recordFactory.create(topic, "k", "1", 101L));
+            driver.pipeInput(recordFactory.create(topic, "k", "2", 102L));
+            driver.pipeInput(recordFactory.create(topic, "k", "3", 103L));
+            driver.pipeInput(recordFactory.create(topic, "k", "4", 104L));
+            driver.pipeInput(recordFactory.create(topic, "k", "5", 105L));
             driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
             LogCaptureAppender.unregister(appender);
 
-            final MetricName metricName = new MetricName(
-                "late-record-drop-total",
-                "stream-processor-node-metrics",
-                "The total number of occurrence of late-record-drop operations.",
-                mkMap(
-                    mkEntry("client-id", "topology-test-driver-virtual-thread"),
-                    mkEntry("task-id", "0_0"),
-                    mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
-                )
-            );
-            assertThat(driver.metrics().get(metricName).metricValue(), equalTo(7.0));
+            assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
+
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0]
offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
             ));
 
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100",
100);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100",
100);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5",
5);
-            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6",
6);
+            OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100",
200);
             assertThat(driver.readOutput("output"), nullValue());
         }
     }
 
+    private void assertLatenessMetrics(final TopologyTestDriver driver,
+                                       final Matcher<Object> dropTotal,
+                                       final Matcher<Object> maxLateness,
+                                       final Matcher<Object> avgLateness) {
+        final MetricName dropMetric = new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+            )
+        );
+
+        assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal);
+
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+            )
+        );
+
+        assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0));
+
+        final MetricName latenessMaxMetric = new MetricName(
+            "record-lateness-max",
+            "stream-processor-node-metrics",
+            "The max observed lateness of records.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0")
+            )
+        );
+        assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness);
+
+        final MetricName latenessAvgMetric = new MetricName(
+            "record-lateness-avg",
+            "stream-processor-node-metrics",
+            "The average observed lateness of records.",
+            mkMap(
+                mkEntry("client-id", "topology-test-driver-virtual-thread"),
+                mkEntry("task-id", "0_0")
+            )
+        );
+        assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness);
+    }
+
     private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver)
{
         return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 2df4f66..c84bbc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -17,7 +17,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -65,7 +69,19 @@ public class PartitionGroupTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
-    private final PartitionGroup group = new PartitionGroup(mkMap(mkEntry(partition1, queue1),
mkEntry(partition2, queue2)));
+    private final Metrics metrics = new Metrics();
+    private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value",
"", "", mkMap());
+
+    private final PartitionGroup group = new PartitionGroup(
+        mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)),
+        getValueSensor(metrics, lastLatenessValue)
+    );
+
+    private static Sensor getValueSensor(final Metrics metrics, final MetricName metricName)
{
+        final Sensor lastRecordedValue = metrics.sensor(metricName.name());
+        lastRecordedValue.add(metricName, new Value());
+        return lastRecordedValue;
+    }
 
     @Test
     public void testTimeTracking() {
@@ -90,10 +106,9 @@ public class PartitionGroupTest {
         // 2:[2, 4, 6]
         // st: -1 since no records was being processed yet
 
-        assertEquals(6, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
+        verifyBuffered(6, 3, 3);
         assertEquals(-1L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         StampedRecord record;
         final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -104,11 +119,9 @@ public class PartitionGroupTest {
         // 2:[2, 4, 6]
         // st: 2
         assertEquals(partition1, info.partition());
-        assertEquals(1L, record.timestamp);
-        assertEquals(5, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
-        assertEquals(1L, group.timestamp());
+        verifyTimes(record, 1L, 1L);
+        verifyBuffered(5, 2, 3);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -116,11 +129,9 @@ public class PartitionGroupTest {
         // 2:[4, 6]
         // st: 3
         assertEquals(partition2, info.partition());
-        assertEquals(2L, record.timestamp);
-        assertEquals(4, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(2L, group.timestamp());
+        verifyTimes(record, 2L, 2L);
+        verifyBuffered(4, 2, 2);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -131,10 +142,9 @@ public class PartitionGroupTest {
         // 1:[3, 5, 2, 4]
         // 2:[4, 6]
         // st: 3 (non-decreasing, so adding 2 doesn't change it)
-        assertEquals(6, group.numBuffered());
-        assertEquals(4, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
+        verifyBuffered(6, 4, 2);
         assertEquals(2L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -142,11 +152,9 @@ public class PartitionGroupTest {
         // 2:[4, 6]
         // st: 4 as partition st is now {5, 4}
         assertEquals(partition1, info.partition());
-        assertEquals(3L, record.timestamp);
-        assertEquals(5, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        verifyTimes(record, 3L, 3L);
+        verifyBuffered(5, 3, 2);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -154,11 +162,9 @@ public class PartitionGroupTest {
         // 2:[6]
         // st: 5 as partition st is now {5, 6}
         assertEquals(partition2, info.partition());
-        assertEquals(4L, record.timestamp);
-        assertEquals(4, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        verifyTimes(record, 4L, 4L);
+        verifyBuffered(4, 3, 1);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
@@ -166,11 +172,9 @@ public class PartitionGroupTest {
         // 2:[6]
         // st: 5
         assertEquals(partition1, info.partition());
-        assertEquals(5L, record.timestamp);
-        assertEquals(3, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 5L, 5L);
+        verifyBuffered(3, 2, 1);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -178,11 +182,9 @@ public class PartitionGroupTest {
         // 2:[6]
         // st: 5
         assertEquals(partition1, info.partition());
-        assertEquals(2L, record.timestamp);
-        assertEquals(2, group.numBuffered());
-        assertEquals(1, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 2L, 5L);
+        verifyBuffered(2, 1, 1);
+        assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -190,11 +192,9 @@ public class PartitionGroupTest {
         // 2:[6]
         // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known
time of 4)
         assertEquals(partition1, info.partition());
-        assertEquals(4L, record.timestamp);
-        assertEquals(1, group.numBuffered());
-        assertEquals(0, group.numBuffered(partition1));
-        assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        verifyTimes(record, 4L, 5L);
+        verifyBuffered(1, 0, 1);
+        assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one more record, time should not be advanced
         record = group.nextRecord(info);
@@ -202,11 +202,20 @@ public class PartitionGroupTest {
         // 2:[]
         // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of
4 and 6.)
         assertEquals(partition2, info.partition());
-        assertEquals(6L, record.timestamp);
-        assertEquals(0, group.numBuffered());
-        assertEquals(0, group.numBuffered(partition1));
-        assertEquals(0, group.numBuffered(partition2));
-        assertEquals(6L, group.timestamp());
+        verifyTimes(record, 6L, 6L);
+        verifyBuffered(0, 0, 0);
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+
+    }
+
+    private void verifyTimes(final StampedRecord record, final long recordTime, final long
streamTime) {
+        assertEquals(recordTime, record.timestamp);
+        assertEquals(streamTime, group.timestamp());
+    }
 
+    private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered,
final int partitionTwoBuffered) {
+        assertEquals(totalBuffered, group.numBuffered());
+        assertEquals(partitionOneBuffered, group.numBuffered(partition1));
+        assertEquals(partitionTwoBuffered, group.numBuffered(partition2));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1d64316..1dcebb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -50,6 +51,7 @@ public final class StreamsTestUtils {
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
         props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name);
         props.putAll(additional);
         return props;
     }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index d10a45c..2abfd63 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -255,7 +257,13 @@ public class TopologyTestDriver implements Closeable {
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
-        metrics = new Metrics();
+
+        final MetricConfig metricConfig = new MetricConfig()
+            .samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+            .timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
+
+        metrics = new Metrics(metricConfig, mockWallClockTime);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
             "topology-test-driver-virtual-thread"


Mime
View raw message