apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: Watermark tuples in Apex
Date Fri, 10 Mar 2017 16:11:25 GMT

On Fri, Mar 10, 2017 at 8:56 PM, Thomas Weise <thw@apache.org> wrote:

> -->
> On Tue, Mar 7, 2017 at 10:46 PM, Bhupesh Chawda <bhupesh@apache.org>
> wrote:
> > Hi All,
> >
> > Watermark tuples in Apex are very tightly coupled to event time
> processing.
> > For this reason, usually they are modeled as having a timestamp.
> >
> > public interface WatermarkTuple
> > {
> >   long getTimestamp();
> > }
> >
> > Even though, watermarks are meant for such time related processing, I
> think
> > we should expand the concept of watermarks for the following types:
> >
> > 1. Labelled watermarks
> > This could be useful in scenarios where instead of a timestamp (which is
> an
> > ordered field), we have categorical values. For example, consider tuples
> > which are labeled by city names. For each city, we want to have separate
> > windows and isolate the processing. If the watermark returns a different
> > city name, we end the previous window and start a new one. Or, in this
> case
> > we could make use of both high and low watermarks indicating the start
> and
> > end of a city's data. This could mean having multiple windows' data
> > incoming at the same time.
> >
> >
> To me city looks like a key and you are trying to make the case that each
> key should have a separate watermark. That is the case discussed on the
> Flink/Beam list that David referred to. I think we should not mix the
> concepts of watermark and key.
> ​Yes, city is a key here. This is similar to the discussion on per key
event time progress on Flink dev. But the ask here is to have a non-time
watermark which indicates when a particular key is finalized. In other
words, if a source does not have timestamps, when should the finalization
be done for that data? What would a watermark tuple look like in that case?
To answer this, we may have to think about how is the data "windowed"
downstream. In this case, there could be a window per key, rather than a
time window.
If you look at the file batch changes that we discussed on other thread, it
is doing the same thing where we have the key = window = filename. Why
can't we make it simpler by saying that the watermark would be per key,
instead of modelling the windows as file ids and setting the option as a
time option on the windowed operator.

> > 2. Ordered watermarks
> > Instead of having the ordered field as time, why not consider something
> > like an Ordered Watermark. TimeBased Watermarks could extend from that.
> > An ordered watermark could be used in case we have a sequence of data
> > tuples and we need to demarcate every n tuples. Even though we can say
> that
> > every n tuples the window is definitely closed, but the decision is made
> > only when the upstream sends the watermark tuple. The windowed operator
> > does not have any clue about it. It blindly opens and closes windows
> based
> > on watermarks received from upstream. This could mean different windows
> may
> > have different values of n.
> >
> > Please let me know your thoughts on this.
> >
> >
> Watermarks are already ordered and the state management is built based on
> that.
> ​

> Is your concern just the naming?
> ​Yes. Actually this is to avoid confusion for users since we are using a
time window to create a window based on some other ordered field.

> Thanks,
> Thomas

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message