kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6486) TimeWindows causes unordered calls to windowed aggregation functions
Date Mon, 19 Mar 2018 03:45:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404345#comment-16404345
] 

ASF GitHub Bot commented on KAFKA-6486:
---------------------------------------

mjsax closed pull request #4628: KAFKA-6486: Implemented LinkedHashMap in TimeWindows
URL: https://github.com/apache/kafka/pull/4628
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index f9090c594cb..c2b910df5b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -105,7 +105,7 @@ public TimeWindows advanceBy(final long advanceMs) {
     @Override
     public Map<Long, TimeWindow> windowsFor(final long timestamp) {
         long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs;
-        final Map<Long, TimeWindow> windows = new HashMap<>();
+        final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
         while (windowStart <= timestamp) {
             final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
             windows.put(windowStart, window);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index 09ac173b737..f260bee62ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.junit.Test;
 
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -117,4 +121,15 @@ public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() {
     public void cannotCompareTimeWindowWithDifferentWindowType() {
         window.overlap(sessionWindow);
     }
+
+    @Test
+    public void shouldReturnMatchedWindowsOrderedByTimestamp() {
+        final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
+        final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+
+        final Long[] expected = matched.keySet().toArray(new Long[matched.size()]);
+        assertEquals(expected[0].longValue(), 10L);
+        assertEquals(expected[1].longValue(), 15L);
+        assertEquals(expected[2].longValue(), 20L);
+    }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> TimeWindows causes unordered calls to windowed aggregation functions
> --------------------------------------------------------------------
>
>                 Key: KAFKA-6486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6486
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Valentino Proietti
>            Priority: Minor
>             Fix For: 1.2.0
>
>         Attachments: KAFKA-6486.patch
>
>
> This is not a real bug but it causes some weird behaviour, at least in my opinion.
> The TimeWindows has a method called windowsFor() that uses and returns an HashMap:
>     @Override
>     *public* Map<Long, TimeWindow> windowsFor(*final* *long* timestamp) {
>         *long* windowStart = (Math._max_(0, timestamp - sizeMs + advanceMs) / advanceMs)
* advanceMs;
>         *final* Map<Long, TimeWindow> windows = *new* HashMap<>();
>         ....
> the HashMap does not preserve the order of insertion and this ends up later in calls
to any streams windowed aggregation functions that are not ordered by window time as I would
expect.
> A simple solution is to replace the HashMap with a LinkedHashMap and that's what I did.
> Anyway replacing it directly in your code can save hours of debugging to understand
what's happening.
> Thank you 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message