flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Fink: KafkaProducer Data Loss
Date Thu, 15 Jun 2017 09:35:01 GMT
Hi Ninad,

I discussed a bit with Gordon and we have some follow-up questions and some theories as to
what could be happening.

Regarding your first case (the one where data loss is observed): How do you ensure that you
only shut down the brokers once Flink has read all the data that you expect it to read? And,
how do you ensure that the offset that Flink checkpoints in step 3) is the offset that corresponds
to the end of your test data?

Regarding your second case (the one where data loss is not observed): What is the difference
between steps 3) and 5)? I’m asking because in Flink session windows are merged eagerly
when events arrived, not when the Trigger fires. Also, what does “firing” in step 3) mean
as opposed to “being evaluated” in step 5). If a Trigger fires this should mean that the
window is in fact being evaluated.

Best,
Aljoscha

> On 11. Jun 2017, at 16:14, ninad <nninad@gmail.com> wrote:
> 
> Thanks Gordon.
> 
> On Jun 11, 2017 9:11 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing List archive.]"
<[hidden email]> wrote:
> Hi Ninad,
> 
> Thanks for the logs!
> Just to let you know, I’ll continue to investigate this early next week.
> 
> Cheers,
> Gordon
> 
> On 8 June 2017 at 7:08:23 PM, ninad ([hidden email]) wrote:
> 
>> I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.  
>> 
>> Here are the details: 
>> 
>> *tmOneCloudera583.log* 
>> 
>> Received session window task: 
>> *2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task  
>> - TriggerWindow(ProcessingTimeSessionWindows(30000), 
>> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},

>> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> 
>> Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from 
>> CREATED to DEPLOYING. 
>> 
>> Finished checkpoint 2 (Synchronous part)  
>> 2017-06-08 15:15:51,982 DEBUG 
>> org.apache.flink.streaming.runtime.tasks.StreamTask - 
>> TriggerWindow(ProcessingTimeSessionWindows(30000), 
>> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},

>> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) -> 
>> Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint 
>> 2.Alignment duration: 0 ms, snapshot duration 215 ms 
>> * 
>> 
>> The task failed before the verification of completed checkpoint could be 
>> verified. 
>> i.e, I don't see the log saying "Notification of complete checkpoint for 
>> task TriggerWindow" for checkpoint 2. 
>> 
>> *jmCloudera583.log* 
>> 
>> Job Manager received acks for checkpoint 2 
>> 
>> *2017-06-08 15:15:51,898 DEBUG 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>> acknowledge message for checkpoint 2 from task 
>> 3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16. 
>> 2017-06-08 15:15:51,982 DEBUG 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received 
>> acknowledge message for checkpoint 2 from task 
>> 3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*. 
>> 
>> Job Manager tried to restore from checkpoint 2. 
>> 
>> *2017-06-08 15:16:02,111 INFO  
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
>> Found 1 checkpoints in ZooKeeper. 
>> 2017-06-08 15:16:02,111 INFO  
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
>> Trying to retrieve checkpoint 2. 
>> 2017-06-08 15:16:02,122 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring 
>> from latest valid checkpoint: Checkpoint 2 @ 149693476 
>> 6105 for 3f5aef5e15a23bce627c05c94760fb16.* 
>> 
>> *tmTwocloudera583.log* 
>> 
>> Task Manager tried to restore the data and was successful.  
>> 
>> *2017-06-08 15:16:02,258 DEBUG 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring 
>> snapshot from state handles: 
>> [KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,

>> endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556, 
>> 13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598, 
>> 14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774, 
>> 14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950, 
>> 14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126, 
>> 15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834, 
>> 28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010, 
>> 29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346, 
>> 40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522, 
>> 40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564, 
>> 41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740, 
>> 41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900, 
>> 41916]}, data=File State: 
>> hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb

>> [41932 bytes]}].* 
>> 
>> But apparently, the retore state didn't have all the messages the window had 
>> received. Because 
>> a few messages were not replayed, and the kafka sink didn't receive all the 
>> messages. 
>> 
>> Attaching the files here. 
>> 
>> jmCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
 
>> tmOneCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
 
>> tmTwoCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>
 
>> 
>> BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6, 
>> but don't see that post here. I did receive an email thought. Hope you guys 
>> saw that.  
>> 
>> Thanks for your patience guys.  
>> 
>> 
>> 
>> -- 
>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html

>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13620.html
> To unsubscribe from Fink: KafkaProducer Data Loss, click here.
> NAML
> 
> View this message in context: Re: Fink: KafkaProducer Data Loss
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message