flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erdem Agaoglu <erdem.agao...@gmail.com>
Subject Re: Kafka partition alignment for event time
Date Thu, 25 Feb 2016 10:36:32 GMT
Hi Robert,

I switched to SNAPSHOT and confirm that it works. Thanks!

On Thu, Feb 25, 2016 at 10:50 AM, Robert Metzger <rmetzger@apache.org>
wrote:

> Hi Erdem,
>
> FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT
> should already contain the fix and it'll be in 1.0.0 (for which I'll post a
> release candidate today) as well.
>
> On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <erdem.agaoglu@gmail.com>
> wrote:
>
>> Thanks Stephan
>>
>> On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> You are right, the checkpoints should contain all offsets.
>>>
>>> I created a Ticket for this:
>>> https://issues.apache.org/jira/browse/FLINK-3440
>>>
>>>
>>>
>>>
>>> On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <erdem.agaoglu@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> On a related and a more exaggerated setup, our kafka-producer (flume)
>>>> seems
>>>> to send data to a single partition at a time and switches it every few
>>>> minutes. So when i run my flink datastream program for the first time,
>>>> it
>>>> starts on the *largest* offsets and shows something like this:
>>>>
>>>> . Fetched the following start offsets [FetchPartition {partition=7,
>>>> offset=15118832832}]
>>>> . Fetched the following start offsets [FetchPartition {partition=1,
>>>> offset=15203613236}]
>>>> . Fetched the following start offsets [FetchPartition {partition=2,
>>>> offset=15366811664}]
>>>> . Fetched the following start offsets [FetchPartition {partition=0,
>>>> offset=15393999709}]
>>>> . Fetched the following start offsets [FetchPartition {partition=8,
>>>> offset=15319475583}]
>>>> . Fetched the following start offsets [FetchPartition {partition=5,
>>>> offset=15482889767}]
>>>> . Fetched the following start offsets [FetchPartition {partition=6,
>>>> offset=15113885928}]
>>>> . Fetched the following start offsets [FetchPartition {partition=3,
>>>> offset=15182701991}]
>>>> . Fetched the following start offsets [FetchPartition {partition=4,
>>>> offset=15186569356}]
>>>>
>>>> For that instance flume happens to be sending data to partition-6 only,
>>>> so
>>>> other consumers sit idly. Working with default paralellism 4, only one
>>>> of
>>>> the 4 threads is able to source data and checkpointing logs reflect
>>>> that:
>>>>
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, 15114275927, -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>> Committing offsets [-915623761776, -915623761776, -915623761776,
>>>> -915623761776, -915623761776, -915623761776, -915623761776,
>>>> -915623761776,
>>>> -915623761776] to offset store: FLINK_ZOOKEEPER
>>>>
>>>> This also means checkpoint will only contain the offset for
>>>> partition-6. So
>>>> if program is stopped and restarted at a later time, it restores the
>>>> offset
>>>> for partition-6 only and other partitions are started at the largest
>>>> offset.
>>>> So it's able to process unseen data in partition-6 but not others. Say
>>>> if
>>>> flume produces data to partition-3 when flink program is stopped,
>>>> they're
>>>> lost, while the data in partition-6 is not. This generally causes
>>>> multiple
>>>> (late-)windows to be fired after restart, because we now generate
>>>> watermarks
>>>> off partition-3 which says the windows of the unseen data in
>>>> partition-6 are
>>>> already complete.
>>>>
>>>> This also has a side effect of windows not triggering unless some
>>>> rebalancing is done beforehand. Since only 1 of the 4 threads will
>>>> source
>>>> data and generate watermarks, window triggers won't get watermarks from
>>>> other 3 sources and wait long past the watermarks generated from the
>>>> single
>>>> source.
>>>>
>>>> I know producers shouldn't work like that, but consumers shouldn't
>>>> care. I
>>>> think it may also create some edge cases even if things were not as
>>>> extreme
>>>> as ours. If checkpoints could contain offsets of all of the partitions
>>>> regardless of their contents, probably storing start offsets in first
>>>> run, i
>>>> guess that would solve the problems around restarting.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4998.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>
>>>
>>
>>
>> --
>> erdem agaoglu
>>
>
>


-- 
erdem agaoglu

Mime
View raw message