apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Yan <da...@datatorrent.com>
Subject Re: [DISCUSSION] Custom Control Tuples
Date Wed, 30 Nov 2016 22:06:45 GMT
I have created an umbrella ticket for control tuple support:

https://issues.apache.org/jira/browse/APEXCORE-579

Currently it has two subtasks. Please have a look at them and see whether
I'm missing anything or if you have anything to add. You are welcome to add
more subtasks or comment on the existing subtasks.

We would like to kick start the implementation soon.

Thanks!

David

On Mon, Nov 28, 2016 at 5:22 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> +1 for the plan.
>
> I would be interested in contributing to this feature.
>
> ~ Bhupesh
>
> On Nov 29, 2016 03:26, "Sandesh Hegde" <sandesh@datatorrent.com> wrote:
>
> > I am interested in contributing to this feature.
> >
> > On Mon, Nov 28, 2016 at 1:54 PM David Yan <david@datatorrent.com> wrote:
> >
> > > I think we should probably go ahead with option 1 since this works with
> > > most use cases and prevents developers from shooting themselves in the
> > foot
> > > in terms of idempotency.
> > >
> > > We can have a configuration property that enables option 2 later if we
> > have
> > > concrete use cases that call for it.
> > >
> > > Please share your thoughts if you think you don't agree with this plan.
> > > Also, please indicate if you're interested in contributing to this
> > feature.
> > >
> > > David
> > >
> > > On Sun, Nov 27, 2016 at 9:02 PM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > It appears that option 1 is more favored due to unavailability of a
> use
> > > > case which could use option 2.
> > > >
> > > > However, option 2 is problematic in specific cases, like presence of
> > > > multiple input ports for example. In case of a linear DAG where
> control
> > > > tuples are flowing in order with the data tuples, it should not be
> > > > difficult to guarantee idempotency. For example, cases where there
> > could
> > > be
> > > > multiple changes in behavior of an operator during a single window,
> it
> > > > should not wait until end window for these changes to take effect.
> > Since,
> > > > we don't have a concrete use case right now, perhaps we do not want
> to
> > go
> > > > that road. This feature should be available through a platform
> > attribute
> > > > (may be at a later point in time) where the default is option 1.
> > > >
> > > > I think option 1 is suitable for a starting point in the
> implementation
> > > of
> > > > this feature and we should proceed with it.
> > > >
> > > > ~ Bhupesh
> > > >
> > > >
> > > >
> > > > On Fri, Nov 11, 2016 at 12:59 AM, David Yan <david@datatorrent.com>
> > > wrote:
> > > >
> > > > > Good question Tushar. The callback should be called only once.
> > > > > The way to implement this is to keep a list of control tuple hashes
> > for
> > > > the
> > > > > given streaming window and only do the callback when the operator
> has
> > > not
> > > > > seen it before.
> > > > >
> > > > > Other thoughts?
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Nov 10, 2016 at 9:32 AM, Tushar Gosavi <
> > tushar@datatorrent.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > What would be the behaviour in case where we have a DAG with
> > > following
> > > > > > operators, the number in bracket is number of partitions, X is
> NxM
> > > > > > partitioning.
> > > > > > A(1) X B(4) X C(2)
> > > > > >
> > > > > > If A sends a control tuple, it will be sent to all 4 partition of
> > B,
> > > > > > and from each partition from B it goes to C, i.e each partition
> of
> > C
> > > > > > will receive same control tuple originated from A multiple times
> > > > > > (number of upstream partitions of C). In this case will the
> > callback
> > > > > > function get called multiple times or just once.
> > > > > >
> > > > > > -Tushar.
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 4, 2016 at 12:14 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > Since each input port has its own incoming control tuple, I
> would
> > > > > imagine
> > > > > > > there would be an additional DefaultInputPort.processControl
> > method
> > > > > that
> > > > > > > operator developers can override.
> > > > > > > If we go for option 1, my thinking is that the control tuples
> > would
> > > > > > always
> > > > > > > be delivered at the next window boundary, even if the emit
> method
> > > is
> > > > > > called
> > > > > > > within a window.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Thu, Nov 3, 2016 at 1:46 AM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> I have a question regarding the callback for a control tuple.
> > Will
> > > > it
> > > > > be
> > > > > > >> similar to InputPort::process() method? Something like
> > > > > > >> InputPort::processControlTuple(t)
> > > > > > >> ? Or will it be a method of the operator similar to
> > beginWindow()?
> > > > > > >>
> > > > > > >> When we say that the control tuple will be delivered at window
> > > > > boundary,
> > > > > > >> does that mean all control tuples emitted in that window will
> be
> > > > > > processed
> > > > > > >> together at the end of the window? This would imply that there
> > is
> > > no
> > > > > > >> ordering among regular tuples and control tuples.
> > > > > > >>
> > > > > > >> I think we should get started with the option 1 - control
> tuples
> > > at
> > > > > > window
> > > > > > >> boundary, which seems to handle most of the use cases. For
> some
> > > > cases
> > > > > > which
> > > > > > >> require option 2, we can always build on this.
> > > > > > >>
> > > > > > >> ~ Bhupesh
> > > > > > >>
> > > > > > >> On Thu, Nov 3, 2016 at 1:35 PM, Thomas Weise <thw@apache.org>
> > > > wrote:
> > > > > > >>
> > > > > > >> > I don't see how that would work. Suppose you have a file
> > > splitter
> > > > > and
> > > > > > >> > multiple partitions of block readers. The "end of file"
> event
> > > > cannot
> > > > > > be
> > > > > > >> > processed downstream until all block readers are done. I
> also
> > > > think
> > > > > > that
> > > > > > >> > this is related to the batch demarcation discussion and
> there
> > > > should
> > > > > > be a
> > > > > > >> > single generalized mechanism to support this.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Wed, Nov 2, 2016 at 10:51 PM, Pramod Immaneni <
> > > > > > pramod@datatorrent.com
> > > > > > >> >
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Suppose I am processing data in a file and I want to do
> > > > something
> > > > > at
> > > > > > >> the
> > > > > > >> > > end of a file at the output operator, I would send an end
> > file
> > > > > > control
> > > > > > >> > > tuple and act on it when I receive it at the output. In a
> > > single
> > > > > > >> window I
> > > > > > >> > > may end up processing multiple files and if I don't have
> > > > multiple
> > > > > > ports
> > > > > > >> > and
> > > > > > >> > > logical paths through the DAG (multiple partitions are
> ok).
> > I
> > > > can
> > > > > > >> process
> > > > > > >> > > end of each file immediately and also know what file was
> > > closed
> > > > > > without
> > > > > > >> > > sending extra identification information in the end file
> > > which I
> > > > > > would
> > > > > > >> > need
> > > > > > >> > > if I am collecting all of them and processing at the end
> of
> > > the
> > > > > > window.
> > > > > > >> > >
> > > > > > >> > > On Wed, Nov 2, 2016 at 2:45 PM, Thomas Weise <
> > thw@apache.org>
> > > > > > wrote:
> > > > > > >> > >
> > > > > > >> > > > The use cases listed in the original discussion don't
> call
> > > for
> > > > > > option
> > > > > > >> > 2.
> > > > > > >> > > It
> > > > > > >> > > > seems to come with additional complexity and
> > implementation
> > > > > cost.
> > > > > > >> > > >
> > > > > > >> > > > Can those in favor of option 2 please also provide the
> use
> > > > case
> > > > > > for
> > > > > > >> it.
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Thomas
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Nov 2, 2016 at 10:36 PM, Siyuan Hua <
> > > > > > siyuan@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > I will vote for approach 1.
> > > > > > >> > > > >
> > > > > > >> > > > > First of all that one sounds easier to do to me. And I
> > > think
> > > > > > >> > > idempotency
> > > > > > >> > > > is
> > > > > > >> > > > > important. It may run at the cost of higher latency
> but
> > I
> > > > > think
> > > > > > it
> > > > > > >> is
> > > > > > >> > > ok
> > > > > > >> > > > >
> > > > > > >> > > > > And in addition, when in the future if users do need
> > > > realtime
> > > > > > >> control
> > > > > > >> > > > tuple
> > > > > > >> > > > > processing, we can always add the option on top of it.
> > > > > > >> > > > >
> > > > > > >> > > > > So I vote for 1
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks,
> > > > > > >> > > > > Siyuan
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <
> > > > > > prad@apache.org>
> > > > > > >> > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > As a rule of thumb in any real time operating
> system,
> > > > > control
> > > > > > >> > tuples
> > > > > > >> > > > > should
> > > > > > >> > > > > > always be handled using Priority Queues.
> > > > > > >> > > > > >
> > > > > > >> > > > > > We may try to control priorities by defining levels.
> > And
> > > > > shall
> > > > > > >> not
> > > > > > >> > > > > > be delivered at window boundaries.
> > > > > > >> > > > > >
> > > > > > >> > > > > > In short, control tuples shall never be treated as
> any
> > > > other
> > > > > > >> tuples
> > > > > > >> > > in
> > > > > > >> > > > > real
> > > > > > >> > > > > > time systems.
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Thursday, November 3, 2016, David Yan <
> > > > > > david@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hi all,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I would like to renew the discussion of control
> > > tuples.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Last time, we were in a debate about whether:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 1) the platform should enforce that control tuples
> > are
> > > > > > >> delivered
> > > > > > >> > at
> > > > > > >> > > > > > window
> > > > > > >> > > > > > > boundaries only
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > or:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > 2) the platform should deliver control tuples just
> > as
> > > > > other
> > > > > > >> > tuples
> > > > > > >> > > > and
> > > > > > >> > > > > > it's
> > > > > > >> > > > > > > the operator developers' choice whether to handle
> > the
> > > > > > control
> > > > > > >> > > tuples
> > > > > > >> > > > as
> > > > > > >> > > > > > > they arrive or delay the processing till the next
> > > window
> > > > > > >> > boundary.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > To summarize the pros and cons:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Approach 1: If processing control tuples results
> in
> > > > > changes
> > > > > > of
> > > > > > >> > the
> > > > > > >> > > > > > behavior
> > > > > > >> > > > > > > of the operator, if idempotency needs to be
> > preserved,
> > > > the
> > > > > > >> > > processing
> > > > > > >> > > > > > must
> > > > > > >> > > > > > > be done at window boundaries. This approach will
> > save
> > > > the
> > > > > > >> > operator
> > > > > > >> > > > > > > developers headache to ensure that. However, this
> > will
> > > > > take
> > > > > > >> away
> > > > > > >> > > the
> > > > > > >> > > > > > > choices from the operator developer if they just
> > need
> > > to
> > > > > > >> process
> > > > > > >> > > the
> > > > > > >> > > > > > > control tuples as soon as possible.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Approach 2: The operator has a chance to
> immediately
> > > > > process
> > > > > > >> > > control
> > > > > > >> > > > > > > tuples. This would be useful if latency is more
> > valued
> > > > > than
> > > > > > >> > > > > correctness.
> > > > > > >> > > > > > > However, if this would open the possibility for
> > > operator
> > > > > > >> > developers
> > > > > > >> > > > to
> > > > > > >> > > > > > > shoot themselves in the foot. This is especially
> > true
> > > if
> > > > > > there
> > > > > > >> > are
> > > > > > >> > > > > > multiple
> > > > > > >> > > > > > > input ports. as there is no easy way to guarantee
> > > > > processing
> > > > > > >> > order
> > > > > > >> > > > for
> > > > > > >> > > > > > > multiple input ports.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > We would like to arrive to a consensus and close
> > this
> > > > > > >> discussion
> > > > > > >> > > soon
> > > > > > >> > > > > > this
> > > > > > >> > > > > > > time so we can start the work on this important
> > > feature.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks!
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > David
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <
> > > > > > >> > > > v.rozov@datatorrent.com
> > > > > > >> > > > > > > <javascript:;>>
> > > > > > >> > > > > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > It is not clear how operator will emit custom
> > > control
> > > > > > tuple
> > > > > > >> at
> > > > > > >> > > > window
> > > > > > >> > > > > > > > boundaries. One way is to cache/accumulate
> control
> > > > > tuples
> > > > > > in
> > > > > > >> > the
> > > > > > >> > > > > > operator
> > > > > > >> > > > > > > > output port till window closes (END_WINDOW is
> > > inserted
> > > > > > into
> > > > > > >> the
> > > > > > >> > > > > output
> > > > > > >> > > > > > > > sink) or only allow an operator to emit control
> > > tuples
> > > > > > inside
> > > > > > >> > the
> > > > > > >> > > > > > > > endWindow(). The later is a slight variation of
> > the
> > > > > > operator
> > > > > > >> > > output
> > > > > > >> > > > > > port
> > > > > > >> > > > > > > > caching behavior with the only difference that
> now
> > > the
> > > > > > >> operator
> > > > > > >> > > > > itself
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > > responsible for caching/accumulating control
> > tuples.
> > > > > Note
> > > > > > >> that
> > > > > > >> > in
> > > > > > >> > > > > many
> > > > > > >> > > > > > > > cases it will be necessary to postpone emitting
> > > > payload
> > > > > > >> tuples
> > > > > > >> > > that
> > > > > > >> > > > > > > > logically come after the custom control tuple
> till
> > > the
> > > > > > next
> > > > > > >> > > window
> > > > > > >> > > > > > > begins.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > IMO, that too restrictive and in a case where
> > input
> > > > > > operator
> > > > > > >> > > uses a
> > > > > > >> > > > > > push
> > > > > > >> > > > > > > > instead of a poll (for example, it provides an
> end
> > > > point
> > > > > > >> where
> > > > > > >> > > > remote
> > > > > > >> > > > > > > > agents may connect and publish/push data),
> control
> > > > > tuples
> > > > > > may
> > > > > > >> > be
> > > > > > >> > > > used
> > > > > > >> > > > > > for
> > > > > > >> > > > > > > > connect/disconnect/watermark broadcast to
> > > > (partitioned)
> > > > > > >> > > downstream
> > > > > > >> > > > > > > > operators. In this case the platform just need
> to
> > > > > > guarantee
> > > > > > >> > order
> > > > > > >> > > > > > barrier
> > > > > > >> > > > > > > > (any tuple emitted prior to a control tuple
> needs
> > to
> > > > be
> > > > > > >> > delivered
> > > > > > >> > > > > prior
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > > the control tuple).
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thank you,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Vlad
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >> I agree with David. Allowing control tuples
> > within
> > > a
> > > > > > window
> > > > > > >> > > (along
> > > > > > >> > > > > > with
> > > > > > >> > > > > > > >> data tuples) creates very dangerous situation
> > where
> > > > > > >> guarantees
> > > > > > >> > > are
> > > > > > >> > > > > > > >> impacted. It is much safer to enable control
> > tuples
> > > > > > >> > > (send/receive)
> > > > > > >> > > > > at
> > > > > > >> > > > > > > >> window boundaries (after END_WINDOW of window
> N,
> > > and
> > > > > > before
> > > > > > >> > > > > > BEGIN_WINDOW
> > > > > > >> > > > > > > >> for window N+1). My take on David's list is
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> 1. -> window boundaries -> Strong +1; there
> will
> > > be a
> > > > > big
> > > > > > >> > issue
> > > > > > >> > > > with
> > > > > > >> > > > > > > >> guarantees for operators with multiple ports.
> > (see
> > > > > > Thomas's
> > > > > > >> > > > > response)
> > > > > > >> > > > > > > >> 2. -> All downstream windows -> +1, but there
> are
> > > > > > >> situations;
> > > > > > >> > a
> > > > > > >> > > > > caveat
> > > > > > >> > > > > > > >> could be "only to operators that implement
> > control
> > > > > tuple
> > > > > > >> > > > > > > >> interface/listeners", which could effectively
> > > > > translates
> > > > > > to
> > > > > > >> > "all
> > > > > > >> > > > > > > >> interested
> > > > > > >> > > > > > > >> downstream operators"
> > > > > > >> > > > > > > >> 3. Only Input operator can create control
> tuples
> > ->
> > > > -1;
> > > > > > is
> > > > > > >> > > > > restrictive
> > > > > > >> > > > > > > >> even
> > > > > > >> > > > > > > >> though most likely 95% of the time it will be
> > input
> > > > > > >> operators
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> Thks,
> > > > > > >> > > > > > > >> Amol
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> > > > > > >> > > > > thomas@datatorrent.com
> > > > > > >> > > > > > > <javascript:;>>
> > > > > > >> > > > > > > >> wrote:
> > > > > > >> > > > > > > >>
> > > > > > >> > > > > > > >> The windowing we discuss here is in general
> event
> > > > time
> > > > > > >> based,
> > > > > > >> > > > > arrival
> > > > > > >> > > > > > > time
> > > > > > >> > > > > > > >>> is a special case of it.
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>> I don't think state changes can be made
> > > independent
> > > > of
> > > > > > the
> > > > > > >> > > > > streaming
> > > > > > >> > > > > > > >>> window
> > > > > > >> > > > > > > >>> boundary as it would prevent idempotent
> > processing
> > > > and
> > > > > > >> > > > transitively
> > > > > > >> > > > > > > >>> exactly
> > > > > > >> > > > > > > >>> once. For that to work, tuples need to be
> > > presented
> > > > to
> > > > > > the
> > > > > > >> > > > operator
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > a
> > > > > > >> > > > > > > >>> guaranteed order *within* the streaming
> window,
> > > > which
> > > > > is
> > > > > > >> not
> > > > > > >> > > > > possible
> > > > > > >> > > > > > > >>> with
> > > > > > >> > > > > > > >>> multiple ports (and partitions).
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>> Thomas
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <
> > > > > > >> > > > david@datatorrent.com
> > > > > > >> > > > > > > <javascript:;>>
> > > > > > >> > > > > > > >>> wrote:
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>> I think for session tracking, if the session
> > > > > boundaries
> > > > > > are
> > > > > > >> > > > allowed
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > be
> > > > > > >> > > > > > > >>>> not aligned with the streaming window
> > boundaries,
> > > > the
> > > > > > user
> > > > > > >> > > will
> > > > > > >> > > > > > have a
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>> much
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> bigger problem with idempotency. And in most
> > > cases,
> > > > > > >> session
> > > > > > >> > > > > tracking
> > > > > > >> > > > > > > is
> > > > > > >> > > > > > > >>>> event time based, not ingression time or
> > > processing
> > > > > > time
> > > > > > >> > > based,
> > > > > > >> > > > so
> > > > > > >> > > > > > > this
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>> may
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> never be a problem. But if that ever happens,
> > the
> > > > > user
> > > > > > can
> > > > > > >> > > > always
> > > > > > >> > > > > > > alter
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>> the
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> default 500ms width.
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>> David
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > > > > > >> > > > > > v.rozov@datatorrent.com
> > > > > > >> > > > > > > <javascript:;>>
> > > > > > >> > > > > > > >>>> wrote:
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>> Ability to send custom control tuples within
> > > window
> > > > > > may be
> > > > > > >> > > > useful,
> > > > > > >> > > > > > for
> > > > > > >> > > > > > > >>>>> example, for sessions tracking, where
> session
> > > > > > boundaries
> > > > > > >> > are
> > > > > > >> > > > not
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>> aligned
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> with window boundaries and 500 ms latency is
> > not
> > > > > > >> acceptable
> > > > > > >> > > for
> > > > > > >> > > > an
> > > > > > >> > > > > > > >>>>> application.
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>>> Thank you,
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>>> Vlad
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > > > > >> > > > > > > >>>>>
> > > > > > >> > > > > > > >>>>> It should not matter from where the control
> > > tuple
> > > > is
> > > > > > >> > > triggered.
> > > > > > >> > > > > It
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> will
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> be
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> good to have a generic mechanism to
> propagate
> > it
> > > > and
> > > > > > >> other
> > > > > > >> > > > things
> > > > > > >> > > > > > can
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> be
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> accomplished outside the engine. For example,
> > the
> > > > new
> > > > > > >> > > > > comprehensive
> > > > > > >> > > > > > > >>>>>> support
> > > > > > >> > > > > > > >>>>>> for windowing will all be in Malhar,
> nothing
> > > that
> > > > > the
> > > > > > >> > engine
> > > > > > >> > > > > needs
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> know
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> about it except that we need the control
> tuple
> > > for
> > > > > > >> > watermark
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> propagation
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> and idempotent processing.
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>> I also think the main difference to other
> > > tuples
> > > > is
> > > > > > the
> > > > > > >> > need
> > > > > > >> > > > to
> > > > > > >> > > > > > send
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> it
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> to
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> all partitions. Which is similar to
> checkpoint
> > > > > window
> > > > > > >> > tuples,
> > > > > > >> > > > but
> > > > > > >> > > > > > not
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> the
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> same. Here, we probably also need the
> ability
> > > for
> > > > > the
> > > > > > >> user
> > > > > > >> > to
> > > > > > >> > > > > > control
> > > > > > >> > > > > > > >>>>>> whether such tuple should traverse the
> entire
> > > DAG
> > > > > or
> > > > > > >> not.
> > > > > > >> > > For
> > > > > > >> > > > a
> > > > > > >> > > > > > > batch
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> use
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> case, for example, we may want to send the
> end
> > > of
> > > > > > file to
> > > > > > >> > the
> > > > > > >> > > > > next
> > > > > > >> > > > > > > >>>>>> operator, but not beyond, if the operator
> has
> > > > > > >> asynchronous
> > > > > > >> > > > > > > processing
> > > > > > >> > > > > > > >>>>>> logic
> > > > > > >> > > > > > > >>>>>> in it.
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>> For any logic to be idempotent, the control
> > > tuple
> > > > > > needs
> > > > > > >> to
> > > > > > >> > > be
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> processed
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> at
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> a window boundary. Receiving the control
> tuple
> > > in
> > > > > the
> > > > > > >> > window
> > > > > > >> > > > > > callback
> > > > > > >> > > > > > > >>>>>> would
> > > > > > >> > > > > > > >>>>>> avoid having to track extra state in the
> > > > operator.
> > > > > I
> > > > > > >> don't
> > > > > > >> > > > think
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> that's
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> a
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> major issue, but what is the use case for
> > > > > processing a
> > > > > > >> > > control
> > > > > > >> > > > > > tuple
> > > > > > >> > > > > > > >>>>>> within
> > > > > > >> > > > > > > >>>>>> the window?
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>> Thomas
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod
> > > Immaneni
> > > > <
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>> pramod@datatorrent.com <javascript:;>>
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> wrote:
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>> For the use cases you mentioned, I think 1)
> > and
> > > > 2)
> > > > > > are
> > > > > > >> > more
> > > > > > >> > > > > likely
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > >>>>>>
> > > > > > >> > > > > > > >>>>>>> be controlled directly by the application,
> > 3)
> > > > and
> > > > > 4)
> > > > > > >> are
> > > > > > >> > > more
> > > > > > >> > > > > > > likely
> > > > > > >> > > > > > > >>>>>>> going to be triggered externally and
> > directly
> > > > > > handled
> > > > > > >> by
> > > > > > >> > > the
> > > > > > >> > > > > > engine
> > > > > > >> > > > > > > >>>>>>> and 3) is already being implemented that
> way
> > > > > > >> > > (apexcore-163).
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> The control tuples emitted by an operator
> > > would
> > > > be
> > > > > > sent
> > > > > > >> > to
> > > > > > >> > > > all
> > > > > > >> > > > > > > >>>>>>> downstream partitions isn't it and that
> > would
> > > be
> > > > > the
> > > > > > >> > chief
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>> distinction
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> compared to data (apart from the payload)
> which
> > > > would
> > > > > > get
> > > > > > >> > > > > > partitioned
> > > > > > >> > > > > > > >>>>>>> under normal circumstances. It would also
> be
> > > > > > guaranteed
> > > > > > >> > > that
> > > > > > >> > > > > > > >>>>>>> downstream partitions will receive control
> > > > tuples
> > > > > > only
> > > > > > >> > > after
> > > > > > >> > > > > the
> > > > > > >> > > > > > > data
> > > > > > >> > > > > > > >>>>>>> that was sent before it so we could send
> it
> > > > > > immediately
> > > > > > >> > > when
> > > > > > >> > > > it
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > >>>>>>> emitted as opposed to window boundaries.
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> However during unification it is important
> > to
> > > > know
> > > > > > if
> > > > > > >> > these
> > > > > > >> > > > > > control
> > > > > > >> > > > > > > >>>>>>> tuples have been received from all
> upstream
> > > > > > partitions
> > > > > > >> > > before
> > > > > > >> > > > > > > >>>>>>> proceeding with a control operation. One
> > could
> > > > > wait
> > > > > > >> till
> > > > > > >> > > end
> > > > > > >> > > > of
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > >>>>>>> window but that introduces a delay however
> > > > small,
> > > > > I
> > > > > > >> would
> > > > > > >> > > > like
> > > > > > >> > > > > to
> > > > > > >> > > > > > > add
> > > > > > >> > > > > > > >>>>>>> to the proposal that the platform only
> hand
> > > over
> > > > > the
> > > > > > >> > > control
> > > > > > >> > > > > > tuple
> > > > > > >> > > > > > > to
> > > > > > >> > > > > > > >>>>>>> the unifier when it has been received from
> > all
> > > > > > upstream
> > > > > > >> > > > > > partitions
> > > > > > >> > > > > > > >>>>>>> much like how end window is processed but
> > not
> > > > wait
> > > > > > till
> > > > > > >> > the
> > > > > > >> > > > > > actual
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>> end
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> of the window.
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> Regd your concern about idempotency, we
> > > > typically
> > > > > > care
> > > > > > >> > > about
> > > > > > >> > > > > > > >>>>>>> idempotency at a window level and doing
> the
> > > > above
> > > > > > will
> > > > > > >> > > still
> > > > > > >> > > > > > allow
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>> the
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> operators to preserve that easily.
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> Thanks
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <
> > > > > > >> > > > david@datatorrent.com
> > > > > > >> > > > > > > <javascript:;>>
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>> wrote:
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> Hi all,
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> I would like to propose a new feature to
> > the
> > > > Apex
> > > > > > core
> > > > > > >> > > > engine
> > > > > > >> > > > > --
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > >>>>>>>> support of custom control tuples.
> > Currently,
> > > we
> > > > > > have
> > > > > > >> > > control
> > > > > > >> > > > > > > tuples
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> such
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> as
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and
> so
> > > on,
> > > > > > but we
> > > > > > >> > > don't
> > > > > > >> > > > > > have
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> the
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> support for applications to insert their own
> > > > control
> > > > > > >> tuples.
> > > > > > >> > > The
> > > > > > >> > > > > way
> > > > > > >> > > > > > > >>>>>>>> currently to get around this is to use
> data
> > > > > tuples
> > > > > > and
> > > > > > >> > > have
> > > > > > >> > > > a
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> separate
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> port
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> for such tuples that sends tuples to all
> > > > > partitions
> > > > > > of
> > > > > > >> > the
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> downstream
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> operators, which is not exactly developer
> > > friendly.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> We have already seen a number of use
> cases
> > > that
> > > > > can
> > > > > > >> use
> > > > > > >> > > this
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> feature:
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> 1) Batch support: We need to tell all
> operators
> > > of
> > > > > the
> > > > > > >> > > physical
> > > > > > >> > > > > DAG
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> when
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> a
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> batch starts and ends, so the operators
> can
> > do
> > > > > > whatever
> > > > > > >> > > that
> > > > > > >> > > > is
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> needed
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> upon
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> the start or the end of a batch.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> 2) Watermark: To support the concepts of
> > > event
> > > > > time
> > > > > > >> > > > windowing,
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > >>>>>>>> watermark control tuple is needed to tell
> > > which
> > > > > > >> windows
> > > > > > >> > > > should
> > > > > > >> > > > > > be
> > > > > > >> > > > > > > >>>>>>>> considered late.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> 3) Changing operator properties: We do
> have
> > > the
> > > > > > >> support
> > > > > > >> > of
> > > > > > >> > > > > > > changing
> > > > > > >> > > > > > > >>>>>>>> operator properties on the fly, but with
> a
> > > > custom
> > > > > > >> > control
> > > > > > >> > > > > tuple,
> > > > > > >> > > > > > > the
> > > > > > >> > > > > > > >>>>>>>> command to change operator properties can
> > be
> > > > > window
> > > > > > >> > > aligned
> > > > > > >> > > > > for
> > > > > > >> > > > > > > all
> > > > > > >> > > > > > > >>>>>>>> partitions and also across the DAG.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> 4) Recording tuples: Like changing
> operator
> > > > > > >> properties,
> > > > > > >> > we
> > > > > > >> > > > do
> > > > > > >> > > > > > have
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> this
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> support now but only at the individual
> > physical
> > > > > > operator
> > > > > > >> > > level,
> > > > > > >> > > > > and
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> without
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> control of which window to record tuples
> > for.
> > > > > With a
> > > > > > >> > custom
> > > > > > >> > > > > > control
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> tuple,
> > > > > > >> > > > > > > >>>>>>>
> > > > > > >> > > > > > > >>>>>>> because a control tuple must belong to a
> > > window,
> > > > > all
> > > > > > >> > > > operators
> > > > > > >> > > > > in
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> the
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> DAG
> > > > > > >> > > > > > > >>>>>>>> can start (and stop) recording for the
> same
> > > > > > windows.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> I can think of two options to achieve
> this:
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> 1) new custom control tuple type that
> takes
> > > > > user's
> > > > > > >> > > > > serializable
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> object.
> > > > > > >> > > > > > > >>>>
> > > > > > >> > > > > > > >>>>> 2) piggy back the current BEGIN_WINDOW and
> > > > > END_WINDOW
> > > > > > >> > control
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>> tuples.
> > > > > > >> > > > > > > >>>
> > > > > > >> > > > > > > >>>> Please provide your feedback. Thank you.
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>> David
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >>>>>>>>
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message