flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Triggering events
Date Mon, 30 Nov 2015 14:32:40 GMT
Hi,
the problem here is that the system needs to be aware that Watermarks will be flowing through
the system. You can either do this via:

env.setStreamTimeCharacteristic(EventTime);

or:

env.getConfig().enableTimestamps();

I know, not very intuitive.

Cheers,
Aljoscha

> On 30 Nov 2015, at 14:47, Niels Basjes <Niels@basjes.nl> wrote:
> 
> Hi,
> 
> I'm experimenting with a custom Windowing setup over clickstream data.
> I want the timestamps of this clickstream data to be the timestamps 'when the event occurred'
and in the Windows I need to trigger on these times.
> 
> For testing I created a source roughly like this:
>     public class ManualTimeEventSource extends RichEventTimeSourceFunction<Long>
{
>                     ctx.collectWithTimestamp(event, event.timestamp);
> 
> But none of the triggers were called so I started digging through the code.
> Then I figured I apparently needed to add the watermarks myself, so I added a line:
>                     ctx.emitWatermark(new Watermark(event.timestamp));
> 
> But now I get:
> 
> Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark
cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> 	at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
> 	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
> 	... 9 more
> 
> This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in Flink or
in my code?
> 
> What is the right way to trigger the events in my Windowing setup?
> 
> 
> 
> P.S. I'm binding my Java application against Flink version 0.10.1
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes


Mime
View raw message