apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Siyuan Hua <siy...@datatorrent.com>
Subject Re: [DISCUSSION] Custom Control Tuples
Date Wed, 02 Nov 2016 21:36:37 GMT
I will vote for approach 1.

First of all that one sounds easier to do to me. And I think idempotency is
important. It may run at the cost of higher latency but I think it is ok

And in addition, when in the future if users do need realtime control tuple
processing, we can always add the option on top of it.

So I vote for 1

Thanks,
Siyuan

On Wed, Nov 2, 2016 at 1:28 PM, Pradeep A. Dalvi <prad@apache.org> wrote:

> As a rule of thumb in any real time operating system, control tuples should
> always be handled using Priority Queues.
>
> We may try to control priorities by defining levels. And shall not
> be delivered at window boundaries.
>
> In short, control tuples shall never be treated as any other tuples in real
> time systems.
>
> On Thursday, November 3, 2016, David Yan <david@datatorrent.com> wrote:
>
> > Hi all,
> >
> > I would like to renew the discussion of control tuples.
> >
> > Last time, we were in a debate about whether:
> >
> > 1) the platform should enforce that control tuples are delivered at
> window
> > boundaries only
> >
> > or:
> >
> > 2) the platform should deliver control tuples just as other tuples and
> it's
> > the operator developers' choice whether to handle the control tuples as
> > they arrive or delay the processing till the next window boundary.
> >
> > To summarize the pros and cons:
> >
> > Approach 1: If processing control tuples results in changes of the
> behavior
> > of the operator, if idempotency needs to be preserved, the processing
> must
> > be done at window boundaries. This approach will save the operator
> > developers headache to ensure that. However, this will take away the
> > choices from the operator developer if they just need to process the
> > control tuples as soon as possible.
> >
> > Approach 2: The operator has a chance to immediately process control
> > tuples. This would be useful if latency is more valued than correctness.
> > However, if this would open the possibility for operator developers to
> > shoot themselves in the foot. This is especially true if there are
> multiple
> > input ports. as there is no easy way to guarantee processing order for
> > multiple input ports.
> >
> > We would like to arrive to a consensus and close this discussion soon
> this
> > time so we can start the work on this important feature.
> >
> > Thanks!
> >
> > David
> >
> > On Tue, Jun 28, 2016 at 10:04 AM, Vlad Rozov <v.rozov@datatorrent.com
> > <javascript:;>>
> > wrote:
> >
> > > It is not clear how operator will emit custom control tuple at window
> > > boundaries. One way is to cache/accumulate control tuples in the
> operator
> > > output port till window closes (END_WINDOW is inserted into the output
> > > sink) or only allow an operator to emit control tuples inside the
> > > endWindow(). The later is a slight variation of the operator output
> port
> > > caching behavior with the only difference that now the operator itself
> is
> > > responsible for caching/accumulating control tuples. Note that in many
> > > cases it will be necessary to postpone emitting payload tuples that
> > > logically come after the custom control tuple till the next window
> > begins.
> > >
> > > IMO, that too restrictive and in a case where input operator uses a
> push
> > > instead of a poll (for example, it provides an end point where remote
> > > agents may connect and publish/push data), control tuples may be used
> for
> > > connect/disconnect/watermark broadcast to (partitioned) downstream
> > > operators. In this case the platform just need to guarantee order
> barrier
> > > (any tuple emitted prior to a control tuple needs to be delivered prior
> > to
> > > the control tuple).
> > >
> > > Thank you,
> > >
> > > Vlad
> > >
> > >
> > >
> > > On 6/27/16 19:36, Amol Kekre wrote:
> > >
> > >> I agree with David. Allowing control tuples within a window (along
> with
> > >> data tuples) creates very dangerous situation where guarantees are
> > >> impacted. It is much safer to enable control tuples (send/receive) at
> > >> window boundaries (after END_WINDOW of window N, and before
> BEGIN_WINDOW
> > >> for window N+1). My take on David's list is
> > >>
> > >> 1. -> window boundaries -> Strong +1; there will be a big issue with
> > >> guarantees for operators with multiple ports. (see Thomas's response)
> > >> 2. -> All downstream windows -> +1, but there are situations; a caveat
> > >> could be "only to operators that implement control tuple
> > >> interface/listeners", which could effectively translates to "all
> > >> interested
> > >> downstream operators"
> > >> 3. Only Input operator can create control tuples -> -1; is restrictive
> > >> even
> > >> though most likely 95% of the time it will be input operators
> > >>
> > >> Thks,
> > >> Amol
> > >>
> > >>
> > >> On Mon, Jun 27, 2016 at 4:37 PM, Thomas Weise <thomas@datatorrent.com
> > <javascript:;>>
> > >> wrote:
> > >>
> > >> The windowing we discuss here is in general event time based, arrival
> > time
> > >>> is a special case of it.
> > >>>
> > >>> I don't think state changes can be made independent of the streaming
> > >>> window
> > >>> boundary as it would prevent idempotent processing and transitively
> > >>> exactly
> > >>> once. For that to work, tuples need to be presented to the operator
> in
> > a
> > >>> guaranteed order *within* the streaming window, which is not possible
> > >>> with
> > >>> multiple ports (and partitions).
> > >>>
> > >>> Thomas
> > >>>
> > >>> On Mon, Jun 27, 2016 at 2:53 PM, David Yan <david@datatorrent.com
> > <javascript:;>>
> > >>> wrote:
> > >>>
> > >>> I think for session tracking, if the session boundaries are allowed
> to
> > be
> > >>>> not aligned with the streaming window boundaries, the user will
> have a
> > >>>>
> > >>> much
> > >>>
> > >>>> bigger problem with idempotency. And in most cases, session tracking
> > is
> > >>>> event time based, not ingression time or processing time based,
so
> > this
> > >>>>
> > >>> may
> > >>>
> > >>>> never be a problem. But if that ever happens, the user can always
> > alter
> > >>>>
> > >>> the
> > >>>
> > >>>> default 500ms width.
> > >>>>
> > >>>> David
> > >>>>
> > >>>> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <
> v.rozov@datatorrent.com
> > <javascript:;>>
> > >>>> wrote:
> > >>>>
> > >>>> Ability to send custom control tuples within window may be useful,
> for
> > >>>>> example, for sessions tracking, where session boundaries are
not
> > >>>>>
> > >>>> aligned
> > >>>
> > >>>> with window boundaries and 500 ms latency is not acceptable for
an
> > >>>>> application.
> > >>>>>
> > >>>>> Thank you,
> > >>>>>
> > >>>>> Vlad
> > >>>>>
> > >>>>>
> > >>>>> On 6/25/16 10:52, Thomas Weise wrote:
> > >>>>>
> > >>>>> It should not matter from where the control tuple is triggered.
It
> > >>>>>>
> > >>>>> will
> > >>>
> > >>>> be
> > >>>>
> > >>>>> good to have a generic mechanism to propagate it and other
things
> can
> > >>>>>>
> > >>>>> be
> > >>>
> > >>>> accomplished outside the engine. For example, the new comprehensive
> > >>>>>> support
> > >>>>>> for windowing will all be in Malhar, nothing that the engine
needs
> > to
> > >>>>>>
> > >>>>> know
> > >>>>
> > >>>>> about it except that we need the control tuple for watermark
> > >>>>>>
> > >>>>> propagation
> > >>>
> > >>>> and idempotent processing.
> > >>>>>>
> > >>>>>> I also think the main difference to other tuples is the
need to
> send
> > >>>>>>
> > >>>>> it
> > >>>
> > >>>> to
> > >>>>
> > >>>>> all partitions. Which is similar to checkpoint window tuples,
but
> not
> > >>>>>>
> > >>>>> the
> > >>>>
> > >>>>> same. Here, we probably also need the ability for the user
to
> control
> > >>>>>> whether such tuple should traverse the entire DAG or not.
For a
> > batch
> > >>>>>>
> > >>>>> use
> > >>>>
> > >>>>> case, for example, we may want to send the end of file to the
next
> > >>>>>> operator, but not beyond, if the operator has asynchronous
> > processing
> > >>>>>> logic
> > >>>>>> in it.
> > >>>>>>
> > >>>>>> For any logic to be idempotent, the control tuple needs
to be
> > >>>>>>
> > >>>>> processed
> > >>>
> > >>>> at
> > >>>>
> > >>>>> a window boundary. Receiving the control tuple in the window
> callback
> > >>>>>> would
> > >>>>>> avoid having to track extra state in the operator. I don't
think
> > >>>>>>
> > >>>>> that's
> > >>>
> > >>>> a
> > >>>>
> > >>>>> major issue, but what is the use case for processing a control
> tuple
> > >>>>>> within
> > >>>>>> the window?
> > >>>>>>
> > >>>>>> Thomas
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> > >>>>>>
> > >>>>> pramod@datatorrent.com <javascript:;>>
> > >>>>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>> For the use cases you mentioned, I think 1) and 2) are
more likely
> > to
> > >>>>>>
> > >>>>>>> be controlled directly by the application, 3) and 4)
are more
> > likely
> > >>>>>>> going to be triggered externally and directly handled
by the
> engine
> > >>>>>>> and 3) is already being implemented that way (apexcore-163).
> > >>>>>>>
> > >>>>>>> The control tuples emitted by an operator would be
sent to all
> > >>>>>>> downstream partitions isn't it and that would be the
chief
> > >>>>>>>
> > >>>>>> distinction
> > >>>
> > >>>> compared to data (apart from the payload) which would get
> partitioned
> > >>>>>>> under normal circumstances. It would also be guaranteed
that
> > >>>>>>> downstream partitions will receive control tuples only
after the
> > data
> > >>>>>>> that was sent before it so we could send it immediately
when it
> is
> > >>>>>>> emitted as opposed to window boundaries.
> > >>>>>>>
> > >>>>>>> However during unification it is important to know
if these
> control
> > >>>>>>> tuples have been received from all upstream partitions
before
> > >>>>>>> proceeding with a control operation. One could wait
till end of
> the
> > >>>>>>> window but that introduces a delay however small, I
would like to
> > add
> > >>>>>>> to the proposal that the platform only hand over the
control
> tuple
> > to
> > >>>>>>> the unifier when it has been received from all upstream
> partitions
> > >>>>>>> much like how end window is processed but not wait
till the
> actual
> > >>>>>>>
> > >>>>>> end
> > >>>
> > >>>> of the window.
> > >>>>>>>
> > >>>>>>> Regd your concern about idempotency, we typically care
about
> > >>>>>>> idempotency at a window level and doing the above will
still
> allow
> > >>>>>>>
> > >>>>>> the
> > >>>
> > >>>> operators to preserve that easily.
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>>
> > >>>>>>> On Jun 24, 2016, at 11:22 AM, David Yan <david@datatorrent.com
> > <javascript:;>>
> > >>>>>>>
> > >>>>>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>>>>>
> > >>>>>>>> I would like to propose a new feature to the Apex
core engine --
> > the
> > >>>>>>>> support of custom control tuples. Currently, we
have control
> > tuples
> > >>>>>>>>
> > >>>>>>> such
> > >>>>
> > >>>>> as
> > >>>>>>>
> > >>>>>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but
we don't
> have
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> support for applications to insert their own control tuples. The
way
> > >>>>>>>> currently to get around this is to use data tuples
and have a
> > >>>>>>>>
> > >>>>>>> separate
> > >>>
> > >>>> port
> > >>>>>>>
> > >>>>>>> for such tuples that sends tuples to all partitions
of the
> > >>>>>>>>
> > >>>>>>> downstream
> > >>>
> > >>>> operators, which is not exactly developer friendly.
> > >>>>>>>>
> > >>>>>>>> We have already seen a number of use cases that
can use this
> > >>>>>>>>
> > >>>>>>> feature:
> > >>>
> > >>>> 1) Batch support: We need to tell all operators of the physical
DAG
> > >>>>>>>>
> > >>>>>>> when
> > >>>>
> > >>>>> a
> > >>>>>>>
> > >>>>>>> batch starts and ends, so the operators can do whatever
that is
> > >>>>>>>>
> > >>>>>>> needed
> > >>>
> > >>>> upon
> > >>>>>>>
> > >>>>>>> the start or the end of a batch.
> > >>>>>>>>
> > >>>>>>>> 2) Watermark: To support the concepts of event
time windowing,
> the
> > >>>>>>>> watermark control tuple is needed to tell which
windows should
> be
> > >>>>>>>> considered late.
> > >>>>>>>>
> > >>>>>>>> 3) Changing operator properties: We do have the
support of
> > changing
> > >>>>>>>> operator properties on the fly, but with a custom
control tuple,
> > the
> > >>>>>>>> command to change operator properties can be window
aligned for
> > all
> > >>>>>>>> partitions and also across the DAG.
> > >>>>>>>>
> > >>>>>>>> 4) Recording tuples: Like changing operator properties,
we do
> have
> > >>>>>>>>
> > >>>>>>> this
> > >>>>
> > >>>>> support now but only at the individual physical operator level,
and
> > >>>>>>>>
> > >>>>>>>> without
> > >>>>>>>
> > >>>>>>> control of which window to record tuples for. With
a custom
> control
> > >>>>>>>>
> > >>>>>>>> tuple,
> > >>>>>>>
> > >>>>>>> because a control tuple must belong to a window, all
operators in
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> DAG
> > >>>>>>>> can start (and stop) recording for the same windows.
> > >>>>>>>>
> > >>>>>>>> I can think of two options to achieve this:
> > >>>>>>>>
> > >>>>>>>> 1) new custom control tuple type that takes user's
serializable
> > >>>>>>>>
> > >>>>>>> object.
> > >>>>
> > >>>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control
> > >>>>>>>>
> > >>>>>>> tuples.
> > >>>
> > >>>> Please provide your feedback. Thank you.
> > >>>>>>>>
> > >>>>>>>> David
> > >>>>>>>>
> > >>>>>>>>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message