flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ninad <nni...@gmail.com>
Subject Re: Fink: KafkaProducer Data Loss
Date Wed, 31 May 2017 22:15:44 GMT
Thanks for the fix guys. I am trying to test this with 1.1.5, but still
seeing a data loss. I am not able to get much from logs except this:

Here's our use case:

1) Consume from Kafka
2) Apply session window
3) Send messages of window to Kafka

If there's a failure in step 3, because all kafka brokers are down, we see a
data loss. 

Here are relevant logs:

java.lang.Exception: Could not perform checkpoint 2 for operator
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: sink.http.sep (2/4).
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
	at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
	at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
	at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
operator in chain.
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
	... 8 more
Caused by: java.lang.Exception: Failed to snapshot function state of
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: sink.http.sep (2/4).
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
	... 9 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message