flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <metrob...@gmail.com>
Subject Re: DeserializationSchema isEndOfStream usage?
Date Mon, 11 Jan 2016 22:04:33 GMT
Hi David,

In theory isEndOfStream() is absolutely the right way to go for stopping data sources in Flink.
That its not working as expected is a bug. I have a pending pull request for adding a Kafka
0.9 connector, which fixes this issue as well (for all supported Kafka versions).

Sorry for the inconvenience. If you want, you can check out the branch of the PR and build
Flink yourself to get the fix.
I hope that I can merge the connector to master this week, then, the fix will be available
in 1.0-SNAPSHOT as well.


Sent from my iPhone

> On 11.01.2016, at 21:39, David Kim <david.kim@braintreepayments.com> wrote:
> Hello all,
> I saw that DeserializationSchema has an API "isEndOfStream()". 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
> Can isEndOfStream be utilized to somehow terminate a streaming flink job?
> I was under the impression that if we return "true" we can control when a stream can
close. The use case I had in mind was controlling when unit/integration tests would terminate
a flink job. We can rely on the fact that a test/spec would know how many items it expects
to consume and then switch isEndOfStream to return true.
> Am I misunderstanding the intention for isEndOfStream? 
> I also set a breakpoint on isEndOfStream and saw that it never was hit when using "FlinkKafkaConsumer082"
to pass in a DeserializationSchema implementation.
> Currently testing on 1.0-SNAPSHOT.
> Cheers!
> David

View raw message