apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pramod Immaneni <pra...@datatorrent.com>
Subject Re: [DISCUSSION] Custom Control Tuples
Date Wed, 02 Nov 2016 21:46:17 GMT
With option 2, users can still do idempotent processing by delaying their
processing of the control tuples to end window. They have the flexibility
with this option. In the usual scenarios, you will have one port and given
that control tuples will be sent to all partitions, all the data sent
before the control tuple will arrive before the control tuple for all
downstream partitions and users can still do idempotent processing. For the
case with multiple ports you can delay processing till end window.

On Wed, Nov 2, 2016 at 2:33 PM, Amol Kekre <amol@datatorrent.com> wrote:

> A feature that incurs risk with processing order, and more so with
> idempotency is a big enough reason to worry about with option 2. Is there
> is a critical use case that needs this feature?
>
> Thks
> Amol
>
>
> On Wed, Nov 2, 2016 at 1:25 PM, Pramod Immaneni <pramod@datatorrent.com>
> wrote:
>
> > I like approach 2 as it gives more flexibility and also allows for
> > low-latency options. I think the following are important as well.
> >
> > 1. Delivering control tuples to all downstream partitions.
> > 2. What mechanism will the operator developer use to send the control
> > tuple? Will it be an additional mehod on the output port?
> >
> > Thanks
> >
> > On Wed, Nov 2, 2016 at 1:16 PM, David Yan <david@datatorrent.com> wrote:
> >
> > > Hi all,
> > >
> > > I would like to renew the discussion of control tuples.
> > >
> > > Last time, we were in a debate about whether:
> > >
> > > 1) the platform should enforce that control tuples are delivered at
> > window
> > > boundaries only
> > >
> > > or:
> > >
> > > 2) the platform should deliver control tuples just as other tuples and
> > it's
> > > the operator developers' choice whether to handle the control tuples as
> > > they arrive or delay the processing till the next window boundary.
> > >
> > > To summarize the pros and cons:
> > >
> > > Approach 1: If processing control tuples results in changes of the
> > behavior
> > > of the operator, if idempotency needs to be preserved, the processing
> > must
> > > be done at window boundaries. This approach will save the operator
> > > developers headache to ensure that. However, this will take away the
> > > choices from the operator developer if they just need to process the
> > > control tuples as soon as possible.
> > >
> > > Approach 2: The operator has a chance to immediately process control
> > > tuples. This would be useful if latency is more valued than
> correctness.
> > > However, if this would open the possibility for operator developers to
> > > shoot themselves in the foot. This is especially true if there are
> > multiple
> > > input ports. as there is no easy way to guarantee processing order for
> > > multiple input ports.
> > >
> > > We would like to arrive to a consensus and close this discussion soon
> > this
> > > time so we can start the work on this important feature.
> > >
> > > Thanks!
> > >
> > > David
> > >
> > > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <v.rozov@datatorrent.com>
> > > wrote:
> > >
> > > > It is not clear how operator will emit custom control tuple at window
> > > > boundaries. One way is to cache/accumulate control tuples in the
> > operator
> > > > output port till window closes (END_WINDOW is inserted into the
> output
> > > > sink) or only allow an operator to emit control tuples inside the
> > > > endWindow(). The later is a slight variation of the operator output
> > port
> > > > caching behavior with the only difference that now the operator
> itself
> > is
> > > > responsible for caching/accumulating control tuples. Note that in
> many
> > > > cases it will be necessary to postpone emitting payload tuples that
> > > > logically come after the custom control tuple till the next window
> > > begins.
> > > >
> > > > IMO, that too restrictive and in a case where input operator uses a
> > push
> > > > instead of a poll (for example, it provides an end point where remote
> > > > agents may connect and publish/push data), control tuples may be used
> > for
> > > > connect/disconnect/watermark broadcast to (partitioned) downstream
> > > > operators. In this case the platform just need to guarantee order
> > barrier
> > > > (any tuple emitted prior to a control tuple needs to be delivered
> prior
> > > to
> > > > the control tuple).
> > > >
> > > > Thank you,
> > > >
> > > > Vlad
> > > >
> > > >
> > > >
> > > > On 6/27/16 19:36, Amol Kekre wrote:
> > > >
> > > >> I agree with David. Allowing control tuples within a window (along
> > with
> > > >> data tuples) creates very dangerous situation where guarantees are
> > > >> impacted. It is much safer to enable control tuples (send/receive)
> at
> > > >> window boundaries (after END_WINDOW of window N, and before
> > BEGIN_WINDOW
> > > >> for window N+1). My take on David's list is
> > > >>
> > > >> 1. -> window boundaries -> Strong +1; there will be a big issue
with
> > > >> guarantees for operators with multiple ports. (see Thomas's
> response)
> > > >> 2. -> All downstream windows -> +1, but there are situations;
a
> caveat
> > > >> could be "only to operators that implement control tuple
> > > >> interface/listeners", which could effectively translates to "all
> > > >> interested
> > > >> downstream operators"
> > > >> 3. Only Input operator can create control tuples -> -1; is
> restrictive
> > > >> even
> > > >> though most likely 95% of the time it will be input operators
> > > >>
> > > >> Thks,
> > > >> Amol
> > > >>
> > > >>
> > > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <
> thomas@datatorrent.com
> > >
> > > >> wrote:
> > > >>
> > > >> The windowing we discuss here is in general event time based,
> arrival
> > > time
> > > >>> is a special case of it.
> > > >>>
> > > >>> I don't think state changes can be made independent of the
> streaming
> > > >>> window
> > > >>> boundary as it would prevent idempotent processing and transitively
> > > >>> exactly
> > > >>> once. For that to work, tuples need to be presented to the operator
> > in
> > > a
> > > >>> guaranteed order *within* the streaming window, which is not
> possible
> > > >>> with
> > > >>> multiple ports (and partitions).
> > > >>>
> > > >>> Thomas
> > > >>>
> > > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <david@datatorrent.com>
> > > >>> wrote:
> > > >>>
> > > >>> I think for session tracking, if the session boundaries are allowed
> > to
> > > be
> > > >>>> not aligned with the streaming window boundaries, the user
will
> > have a
> > > >>>>
> > > >>> much
> > > >>>
> > > >>>> bigger problem with idempotency. And in most cases, session
> tracking
> > > is
> > > >>>> event time based, not ingression time or processing time based,
so
> > > this
> > > >>>>
> > > >>> may
> > > >>>
> > > >>>> never be a problem. But if that ever happens, the user can
always
> > > alter
> > > >>>>
> > > >>> the
> > > >>>
> > > >>>> default 500ms width.
> > > >>>>
> > > >>>> David
> > > >>>>
> > > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> > v.rozov@datatorrent.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Ability to send custom control tuples within window may be
useful,
> > for
> > > >>>>> example, for sessions tracking, where session boundaries
are not
> > > >>>>>
> > > >>>> aligned
> > > >>>
> > > >>>> with window boundaries and 500 ms latency is not acceptable
for an
> > > >>>>> application.
> > > >>>>>
> > > >>>>> Thank you,
> > > >>>>>
> > > >>>>> Vlad
> > > >>>>>
> > > >>>>>
> > > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > > >>>>>
> > > >>>>> It should not matter from where the control tuple is triggered.
> It
> > > >>>>>>
> > > >>>>> will
> > > >>>
> > > >>>> be
> > > >>>>
> > > >>>>> good to have a generic mechanism to propagate it and other
things
> > can
> > > >>>>>>
> > > >>>>> be
> > > >>>
> > > >>>> accomplished outside the engine. For example, the new
> comprehensive
> > > >>>>>> support
> > > >>>>>> for windowing will all be in Malhar, nothing that
the engine
> needs
> > > to
> > > >>>>>>
> > > >>>>> know
> > > >>>>
> > > >>>>> about it except that we need the control tuple for watermark
> > > >>>>>>
> > > >>>>> propagation
> > > >>>
> > > >>>> and idempotent processing.
> > > >>>>>>
> > > >>>>>> I also think the main difference to other tuples is
the need to
> > send
> > > >>>>>>
> > > >>>>> it
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> all partitions. Which is similar to checkpoint window
tuples, but
> > not
> > > >>>>>>
> > > >>>>> the
> > > >>>>
> > > >>>>> same. Here, we probably also need the ability for the
user to
> > control
> > > >>>>>> whether such tuple should traverse the entire DAG
or not. For a
> > > batch
> > > >>>>>>
> > > >>>>> use
> > > >>>>
> > > >>>>> case, for example, we may want to send the end of file
to the
> next
> > > >>>>>> operator, but not beyond, if the operator has asynchronous
> > > processing
> > > >>>>>> logic
> > > >>>>>> in it.
> > > >>>>>>
> > > >>>>>> For any logic to be idempotent, the control tuple
needs to be
> > > >>>>>>
> > > >>>>> processed
> > > >>>
> > > >>>> at
> > > >>>>
> > > >>>>> a window boundary. Receiving the control tuple in the
window
> > callback
> > > >>>>>> would
> > > >>>>>> avoid having to track extra state in the operator.
I don't think
> > > >>>>>>
> > > >>>>> that's
> > > >>>
> > > >>>> a
> > > >>>>
> > > >>>>> major issue, but what is the use case for processing a
control
> > tuple
> > > >>>>>> within
> > > >>>>>> the window?
> > > >>>>>>
> > > >>>>>> Thomas
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > > >>>>>>
> > > >>>>> pramod@datatorrent.com>
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>> For the use cases you mentioned, I think 1) and 2)
are more
> likely
> > > to
> > > >>>>>>
> > > >>>>>>> be controlled directly by the application, 3)
and 4) are more
> > > likely
> > > >>>>>>> going to be triggered externally and directly
handled by the
> > engine
> > > >>>>>>> and 3) is already being implemented that way (apexcore-163).
> > > >>>>>>>
> > > >>>>>>> The control tuples emitted by an operator would
be sent to all
> > > >>>>>>> downstream partitions isn't it and that would
be the chief
> > > >>>>>>>
> > > >>>>>> distinction
> > > >>>
> > > >>>> compared to data (apart from the payload) which would get
> > partitioned
> > > >>>>>>> under normal circumstances. It would also be guaranteed
that
> > > >>>>>>> downstream partitions will receive control tuples
only after
> the
> > > data
> > > >>>>>>> that was sent before it so we could send it immediately
when it
> > is
> > > >>>>>>> emitted as opposed to window boundaries.
> > > >>>>>>>
> > > >>>>>>> However during unification it is important to
know if these
> > control
> > > >>>>>>> tuples have been received from all upstream partitions
before
> > > >>>>>>> proceeding with a control operation. One could
wait till end of
> > the
> > > >>>>>>> window but that introduces a delay however small,
I would like
> to
> > > add
> > > >>>>>>> to the proposal that the platform only hand over
the control
> > tuple
> > > to
> > > >>>>>>> the unifier when it has been received from all
upstream
> > partitions
> > > >>>>>>> much like how end window is processed but not
wait till the
> > actual
> > > >>>>>>>
> > > >>>>>> end
> > > >>>
> > > >>>> of the window.
> > > >>>>>>>
> > > >>>>>>> Regd your concern about idempotency, we typically
care about
> > > >>>>>>> idempotency at a window level and doing the above
will still
> > allow
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> operators to preserve that easily.
> > > >>>>>>>
> > > >>>>>>> Thanks
> > > >>>>>>>
> > > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <david@datatorrent.com
> >
> > > >>>>>>>
> > > >>>>>> wrote:
> > > >>>
> > > >>>> Hi all,
> > > >>>>>>>>
> > > >>>>>>>> I would like to propose a new feature to the
Apex core engine
> --
> > > the
> > > >>>>>>>> support of custom control tuples. Currently,
we have control
> > > tuples
> > > >>>>>>>>
> > > >>>>>>> such
> > > >>>>
> > > >>>>> as
> > > >>>>>>>
> > > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on,
but we don't
> > have
> > > >>>>>>>>
> > > >>>>>>> the
> > > >>>
> > > >>>> support for applications to insert their own control tuples.
The
> way
> > > >>>>>>>> currently to get around this is to use data
tuples and have a
> > > >>>>>>>>
> > > >>>>>>> separate
> > > >>>
> > > >>>> port
> > > >>>>>>>
> > > >>>>>>> for such tuples that sends tuples to all partitions
of the
> > > >>>>>>>>
> > > >>>>>>> downstream
> > > >>>
> > > >>>> operators, which is not exactly developer friendly.
> > > >>>>>>>>
> > > >>>>>>>> We have already seen a number of use cases
that can use this
> > > >>>>>>>>
> > > >>>>>>> feature:
> > > >>>
> > > >>>> 1) Batch support: We need to tell all operators of the physical
> DAG
> > > >>>>>>>>
> > > >>>>>>> when
> > > >>>>
> > > >>>>> a
> > > >>>>>>>
> > > >>>>>>> batch starts and ends, so the operators can do
whatever that is
> > > >>>>>>>>
> > > >>>>>>> needed
> > > >>>
> > > >>>> upon
> > > >>>>>>>
> > > >>>>>>> the start or the end of a batch.
> > > >>>>>>>>
> > > >>>>>>>> 2) Watermark: To support the concepts of event
time windowing,
> > the
> > > >>>>>>>> watermark control tuple is needed to tell
which windows should
> > be
> > > >>>>>>>> considered late.
> > > >>>>>>>>
> > > >>>>>>>> 3) Changing operator properties: We do have
the support of
> > > changing
> > > >>>>>>>> operator properties on the fly, but with a
custom control
> tuple,
> > > the
> > > >>>>>>>> command to change operator properties can
be window aligned
> for
> > > all
> > > >>>>>>>> partitions and also across the DAG.
> > > >>>>>>>>
> > > >>>>>>>> 4) Recording tuples: Like changing operator
properties, we do
> > have
> > > >>>>>>>>
> > > >>>>>>> this
> > > >>>>
> > > >>>>> support now but only at the individual physical operator
level,
> and
> > > >>>>>>>>
> > > >>>>>>>> without
> > > >>>>>>>
> > > >>>>>>> control of which window to record tuples for.
With a custom
> > control
> > > >>>>>>>>
> > > >>>>>>>> tuple,
> > > >>>>>>>
> > > >>>>>>> because a control tuple must belong to a window,
all operators
> in
> > > >>>>>>>>
> > > >>>>>>> the
> > > >>>
> > > >>>> DAG
> > > >>>>>>>> can start (and stop) recording for the same
windows.
> > > >>>>>>>>
> > > >>>>>>>> I can think of two options to achieve this:
> > > >>>>>>>>
> > > >>>>>>>> 1) new custom control tuple type that takes
user's
> serializable
> > > >>>>>>>>
> > > >>>>>>> object.
> > > >>>>
> > > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW
control
> > > >>>>>>>>
> > > >>>>>>> tuples.
> > > >>>
> > > >>>> Please provide your feedback. Thank you.
> > > >>>>>>>>
> > > >>>>>>>> David
> > > >>>>>>>>
> > > >>>>>>>>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message