kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5368) Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps are valid
Date Tue, 02 Jan 2018 17:35:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308408#comment-16308408
] 

ASF GitHub Bot commented on KAFKA-5368:
---------------------------------------

guozhangwang closed pull request #4365: KAFKA-5368: Add test for skipped-records metric
URL: https://github.com/apache/kafka/pull/4365
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8bcd6fb4ed4..42504655117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,10 +23,12 @@
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -37,6 +39,7 @@
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -825,6 +828,51 @@ public boolean conditionMet() {
         }
     }
 
+    @Test
+    public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+
+        final Properties config = configProps(false);
+        config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
+        final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config),
false);
+
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+        final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(),
t1p1.partition()));
+        thread.taskManager().setAssignmentMetadata(
+            Collections.singletonMap(
+                new TaskId(0, t1p1.partition()),
+                assignedPartitions),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[],
byte[]>) thread.consumer;
+        mockConsumer.assign(Collections.singleton(t1p1));
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+
+        final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total",
"stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        long offset = -1;
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(),
++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        thread.runOnce(-1);
+        assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+    }
+
     private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State
state) {
         assertEquals(state.name(), metadata.threadState());
         assertTrue(metadata.activeTasks().isEmpty());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Streams skipped-records-rate sensor produces nonzero values when the timestamps
are valid
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5368
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5368
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Hamidreza Afzali
>            Assignee: Hamidreza Afzali
>             Fix For: 0.11.0.0
>
>
> Kafka Streams skipped-records-rate sensor produces nonzero values even when the timestamps
are valid and records are processed. The values are equal to poll-rate.
> Related issue: KAFKA-5055 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message