kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-5144) MinTimestampTracker does not correctly add timestamps lower than the current max
Date Mon, 01 May 2017 06:26:04 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990619#comment-15990619
] 

Matthias J. Sax commented on KAFKA-5144:
----------------------------------------

It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp for non-processed
records in the buffer. The undocumented usage pattern is, that `addElement()` and `removeElement()`
are (must be) called in the same order. Your tests don't follow this pattern. The logic is
about tracking the current partition time as minimum of whatever is in the buffer (also considering
out-of-order records) (cf. KAFKA-3514)

Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them consecutively
to the timestamp tracker. Thus, when we process those record one by one, is the following
steps (note, I am not sure if "partition time" in this example is correct; cf. below):

 - process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
 - process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
 - process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum is {{12}},
not {{15}} !!)
 - process {{12}}, queue {{20}}, partition time {{12}}
 - process {{20}}, queue empty, partition time {{20}}

The tracker will have the following states when adding the records one by one (this happens
before processing begins):
 - add {{5}}: {{5}}
 - add {{10}}: {{5, 10}}
 - add {{15}}: {{5, 10, 15}}
 - add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
 - add {{20}}: {{5, 10, 12, 20}}

During processing, we {{poll}} the head record from the queue, call {{removeElement}} on the
tracker afterwards. Thus, we get (not we start with tracker state {{5, 10, 12, 20}}):
 - poll from queue {{5}}, tracker after remove {{10, 12, 20}}
 - poll from queue {{10}}, tracker after remove {{12, 20}}
 - poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call {{removeElement(15)}}
we keep {{12}}, this allows us to use {{12}} for two records)
 - poll from queue {{12}}, tracker after remove {{20}}
 - poll from queue {{20}}, tracker after remove empty

However, I am not sure if we actually advance "partition time" as indented -- we might want
to call {{timeTracker.get()}} ({{RecordQueue}} L124) before {{timeTracker.removeElement(elem)}}
({{RecordQueue}} L120) -- it seem the "partition time" in my example above is not what Streams
computes atm (even if the example seems as it would follow what is the intended "partition
time"): if I am not wrong, Streams would give:

 - process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12, 20}}, partition
time {{10}}
 - process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, partition time {{12}}
 - process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, partition time {{12}}
 - process {{12}}, queue {{20}}, tracker after remove {{20}},  partition time {{20}}
 - process {{20}}, queue empty, tracker after remove empty, partition time {{20}} (from {{lastKnownTime}})


> MinTimestampTracker does not correctly add timestamps lower than the current max
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5144
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5144
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements greater than the
added element.
> Perhaps I've missed something and this is intended behaviour but I can't find any evidence
for that in comments or tests.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message