apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Yan <da...@datatorrent.com>
Subject Re: Watermark generation in Windowed Operators
Date Mon, 26 Sep 2016 19:45:06 GMT
Actually on a second thought, the join accumulation interface for the
non-keyed join operator can support theta joins if you have the proper
accumulation implementation for that.

David

On Mon, Sep 26, 2016 at 12:42 PM, David Yan <david@datatorrent.com> wrote:

> Chinmay,
>
> Just to clarify, the Join Operator does not support theta joins. It only
> supports equi-joins on either the Window, or both the Window and the Key.
>
> David
>
> On Sat, Sep 17, 2016 at 1:30 AM, Chinmay Kolhatkar <
> chinmay@datatorrent.com> wrote:
>
>> Thanks for the information guys.
>>
>> David, I can take a look at heuristic watermark if I get any free cycles.
>>
>> Shunxin, does the Join operator that you're implementing support theta
>> join
>> or is it subset of the theta join?
>>
>> Thanks,
>> Chinmay.
>>
>>
>>
>> On Sat, Sep 17, 2016 at 1:21 AM, David Yan <david@datatorrent.com> wrote:
>>
>> > Hi Shunxin,
>> >
>> > If the watermark code in your PR is not behaving the way it should,
>> please
>> > do change it. Thanks!
>> >
>> > David
>> >
>> > On Fri, Sep 16, 2016 at 11:36 AM, Shunxin Lu <lushunxin@gmail.com>
>> wrote:
>> >
>> > > Hi David,
>> > >
>> > > Thanks for the clarification. Should we update the watermark for join
>> > > operator when there's a watermark arrived from one of the input
>> streams
>> > > even if the watermark from another input stream is not arrived yet?
>> > >
>> > > Shunxin
>> > >
>> > > On Fri, Sep 16, 2016 at 10:59 AM, David Yan <david@datatorrent.com>
>> > wrote:
>> > >
>> > > > Actually, that's not entirely true. Here are the points about the
>> > > watermark
>> > > > tuple generation of the join operator:
>> > > >
>> > > > 1) We keep the timestamp of the latest watermark for each input port
>> > > >
>> > > > 2) We keep another timestamp that is equal to minimum of all the
>> > > timestamps
>> > > > mentioned in (1).
>> > > >
>> > > > 3) Upon arrival of a watermark from an input port, we update the
>> > > timestamp
>> > > > mentioned in (1), and evaluate (2). If the value of (2) changes, we
>> > > > generate the watermark tuple with the timestamp that is equal to the
>> > new
>> > > > value of (2).
>> > > >
>> > > > 4) That means initially, the watermark is only generated when we
>> have
>> > > seen
>> > > > a watermark for all input ports. And the fact that we take the
>> smallest
>> > > > timestamp in (2) means we only consider a window as late only if all
>> > > input
>> > > > streams say that particular window is late.
>> > > >
>> > > > David
>> > > >
>> > > >
>> > > > On Fri, Sep 16, 2016 at 10:42 AM, Shunxin Lu <lushunxin@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Chinmay,
>> > > > >
>> > > > > Base on the discussion I had with David, and David please correct
>> me
>> > > if I
>> > > > > am wrong, the watermark for Windowed Join Operator should be
>> indeed
>> > > > > depending on all the input streams. If a tuple is considered
late
>> for
>> > > one
>> > > > > input stream, it should also be considered late for the whole
join
>> > > > > operator. That's why in the AbstractWindowedJoinOperator, it
>> always
>> > > > selects
>> > > > > the watermark with the smallest timestamp from all the latest
>> > > watermarks
>> > > > > coming from upstreams as its current watermark, so that it can
>> make
>> > > sure
>> > > > > that it's always keeping the strictest watermark to eliminate
late
>> > > > tuples.
>> > > > >
>> > > > > Shunxin
>> > > > >
>> > > > > On Fri, Sep 16, 2016 at 10:02 AM, David Yan <
>> david@datatorrent.com>
>> > > > wrote:
>> > > > >
>> > > > > > I think in theory, the watermark should be sent by the input
>> > operator
>> > > > > since
>> > > > > > the input should have the knowledge of the criteria of lateness
>> > since
>> > > > it
>> > > > > > can depend on many factors like the time of the day, the
source
>> of
>> > > the
>> > > > > data
>> > > > > > (e.g. mobile data), that the WindowedOperator should in
general
>> > make
>> > > no
>> > > > > > assumption about.
>> > > > > >
>> > > > > > However, I think it's possible to implement some kind of
>> watermark
>> > > > > > generation in the WindowedOperator itself if that knowledge
is
>> not
>> > > > > > available from the input. It's actually already doing that
if
>> you
>> > > call
>> > > > > > the setFixedWatermark
>> > > > > > method, which will generate a watermark tuple, with a timestamp
>> > that
>> > > is
>> > > > > > based on the derived time from the streaming window id,
>> downstream
>> > > for
>> > > > > each
>> > > > > > streaming window. It's possible to add the support of heuristic
>> > > > watermark
>> > > > > > generation as well and you're welcome to take that up.
>> > > > > >
>> > > > > > For the Windowed Join operator, the watermark generated
for
>> > > downstream
>> > > > > > depends on the watermark arriving from each input stream,
and
>> it's
>> > > not
>> > > > > just
>> > > > > > a simple propagate. Shunxin can comment more on this.
>> > > > > >
>> > > > > > David
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Sep 15, 2016 at 11:21 PM, Chinmay Kolhatkar <
>> > > > chinmay@apache.org>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi All,
>> > > > > > >
>> > > > > > > I was looking at Windowed Operator APIs and have to
mention
>> > they're
>> > > > > > pretty
>> > > > > > > nicely done.
>> > > > > > >
>> > > > > > > I have a question related to watermark generation.
>> > > > > > >
>> > > > > > > What I understood is that for completion of processing
of an
>> > event
>> > > > > window
>> > > > > > > one has provision for sending of watermark tuple from
some
>> > previous
>> > > > > stage
>> > > > > > > in the DAG. I want to know who should be doing that
and when
>> > should
>> > > > be
>> > > > > it
>> > > > > > > done.
>> > > > > > >
>> > > > > > > For e.g. I saw a PR of Windows Join Operator in apex-malhar
>> and I
>> > > > would
>> > > > > > > like to use it in my application. Can someone give
me an
>> example
>> > of
>> > > > > how a
>> > > > > > > DAG will look like with this operator which has a stage
which
>> > > > generates
>> > > > > > > watermark? And how should that stage decide on when
to
>> generate a
>> > > > > > watermark
>> > > > > > > tuple?
>> > > > > > >
>> > > > > > > -Chinmay.
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message