flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Neumann <mneum...@sics.se>
Subject EventTime in streaming
Date Thu, 17 Sep 2015 10:20:39 GMT
After some work experience with the current solution I want to give some
feedback and maybe start a discussion about event time in streaming. This
is not about watermarks or any of the incoming improvements just some
observations from the current code.


*Starttime for EventTime:*

In the current implementation you can specify a start time if you don't it
defaults to 0.
The default is not feasible when using the typical milliseconds since 1970.
The *TimeTriggerPolicy* has the following implementation of
*preNotifyTrigger*:

@Override
> public synchronized Object[] preNotifyTrigger(DATA datapoint) {
> LinkedList<Object> fakeElements = new LinkedList<Object>();
> // check if there is more then one window border missed
> // use > here. In case >= would fit, the regular call will do the job.
> while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity)
> {
> startTime += granularity;
> fakeElements.add(startTime - 1);
> }
> return (Object[]) fakeElements.toArray();
> }


In practice this means using the default starttime will crash the program
(running our of memory) since it will create fake elements to close every
possible window since 1970.
So you need to set a starttime to make it run which is not that simple. In
production you could use the systemtime to initialize, but this might lead
to some problems when consuming events from e.g. Kafka with an older
timestamp. When debugging using old streams you need to know the lowest
timestamp of the stream to initialize.

What is the purpose of the fake elements? Is there a way to avoid the
memory problem of creating enormous amounts of empty windows?
Could we just use the timestamp of the first event processed as starttime
instead of having it as a parameter? I testing the following modification
of the above code at the moment, do you see any problem with that?

@Override
> public synchronized Object[] preNotifyTrigger(DATA datapoint) {
> LinkedList<Object> fakeElements = new LinkedList<Object>();
> // check if there is more then one window border missed
> // use > here. In case >= would fit, the regular call will do the job.
> // TODO modified here
> if(startTime == 0) startTime = timestampWrapper.getTimestamp(datapoint);
> while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity)
> {
> startTime += granularity;
> fakeElements.add(startTime - 1);
> }
> return (Object[]) fakeElements.toArray();
> }



*EventTime api confusion:*

I found several ways to use EventTime in my program but I find them not
very intuitive. Compare the two following lines of code both using the
Time.of helper one with event time and one with system time:

ds.window(Time.of(long windowSize, TimeUnit))
ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long
startTime))

Its weird that you cannot specify the TimeUnit when using the EventTimes
stamp. It would feel more natural if it would look like this (also without
the starttime):

ds.window(Time.of(long windowSize, TimeUnit, Timestamp
yourTimeStampExtractor))


At the moment I'm using the modified TimeTriggerPolicy direct leading to
this ugly piece of code:

.window(new TimeTriggerPolicyHack<DataPojo>(100000l, new
TimestampWrapper<DataPojo>(new EventTimeStampExtractor(), 0l)), new
TimeEvictionPolicy<DataPojo>(20000, new TimestampWrapper<DataPojo>(new
EventTimeStampExtractor(), 0l)))



cheers Martin

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message