apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <tho...@datatorrent.com>
Subject Re: [DISCUSSION] Custom Control Tuples
Date Mon, 27 Jun 2016 23:37:20 GMT
The windowing we discuss here is in general event time based, arrival time
is a special case of it.

I don't think state changes can be made independent of the streaming window
boundary as it would prevent idempotent processing and transitively exactly
once. For that to work, tuples need to be presented to the operator in a
guaranteed order *within* the streaming window, which is not possible with
multiple ports (and partitions).

Thomas

On Mon, Jun 27, 2016 at 2:53 PM, David Yan <david@datatorrent.com> wrote:

> I think for session tracking, if the session boundaries are allowed to be
> not aligned with the streaming window boundaries, the user will have a much
> bigger problem with idempotency. And in most cases, session tracking is
> event time based, not ingression time or processing time based, so this may
> never be a problem. But if that ever happens, the user can always alter the
> default 500ms width.
>
> David
>
> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov <v.rozov@datatorrent.com>
> wrote:
>
> > Ability to send custom control tuples within window may be useful, for
> > example, for sessions tracking, where session boundaries are not aligned
> > with window boundaries and 500 ms latency is not acceptable for an
> > application.
> >
> > Thank you,
> >
> > Vlad
> >
> >
> > On 6/25/16 10:52, Thomas Weise wrote:
> >
> >> It should not matter from where the control tuple is triggered. It will
> be
> >> good to have a generic mechanism to propagate it and other things can be
> >> accomplished outside the engine. For example, the new comprehensive
> >> support
> >> for windowing will all be in Malhar, nothing that the engine needs to
> know
> >> about it except that we need the control tuple for watermark propagation
> >> and idempotent processing.
> >>
> >> I also think the main difference to other tuples is the need to send it
> to
> >> all partitions. Which is similar to checkpoint window tuples, but not
> the
> >> same. Here, we probably also need the ability for the user to control
> >> whether such tuple should traverse the entire DAG or not. For a batch
> use
> >> case, for example, we may want to send the end of file to the next
> >> operator, but not beyond, if the operator has asynchronous processing
> >> logic
> >> in it.
> >>
> >> For any logic to be idempotent, the control tuple needs to be processed
> at
> >> a window boundary. Receiving the control tuple in the window callback
> >> would
> >> avoid having to track extra state in the operator. I don't think that's
> a
> >> major issue, but what is the use case for processing a control tuple
> >> within
> >> the window?
> >>
> >> Thomas
> >>
> >>
> >>
> >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> pramod@datatorrent.com>
> >> wrote:
> >>
> >> For the use cases you mentioned, I think 1) and 2) are more likely to
> >>> be controlled directly by the application, 3) and 4) are more likely
> >>> going to be triggered externally and directly handled by the engine
> >>> and 3) is already being implemented that way (apexcore-163).
> >>>
> >>> The control tuples emitted by an operator would be sent to all
> >>> downstream partitions isn't it and that would be the chief distinction
> >>> compared to data (apart from the payload) which would get partitioned
> >>> under normal circumstances. It would also be guaranteed that
> >>> downstream partitions will receive control tuples only after the data
> >>> that was sent before it so we could send it immediately when it is
> >>> emitted as opposed to window boundaries.
> >>>
> >>> However during unification it is important to know if these control
> >>> tuples have been received from all upstream partitions before
> >>> proceeding with a control operation. One could wait till end of the
> >>> window but that introduces a delay however small, I would like to add
> >>> to the proposal that the platform only hand over the control tuple to
> >>> the unifier when it has been received from all upstream partitions
> >>> much like how end window is processed but not wait till the actual end
> >>> of the window.
> >>>
> >>> Regd your concern about idempotency, we typically care about
> >>> idempotency at a window level and doing the above will still allow the
> >>> operators to preserve that easily.
> >>>
> >>> Thanks
> >>>
> >>> On Jun 24, 2016, at 11:22 AM, David Yan <david@datatorrent.com> wrote:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> I would like to propose a new feature to the Apex core engine -- the
> >>>> support of custom control tuples. Currently, we have control tuples
> such
> >>>>
> >>> as
> >>>
> >>>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
> >>>> support for applications to insert their own control tuples. The way
> >>>> currently to get around this is to use data tuples and have a separate
> >>>>
> >>> port
> >>>
> >>>> for such tuples that sends tuples to all partitions of the downstream
> >>>> operators, which is not exactly developer friendly.
> >>>>
> >>>> We have already seen a number of use cases that can use this feature:
> >>>>
> >>>> 1) Batch support: We need to tell all operators of the physical DAG
> when
> >>>>
> >>> a
> >>>
> >>>> batch starts and ends, so the operators can do whatever that is needed
> >>>>
> >>> upon
> >>>
> >>>> the start or the end of a batch.
> >>>>
> >>>> 2) Watermark: To support the concepts of event time windowing, the
> >>>> watermark control tuple is needed to tell which windows should be
> >>>> considered late.
> >>>>
> >>>> 3) Changing operator properties: We do have the support of changing
> >>>> operator properties on the fly, but with a custom control tuple, the
> >>>> command to change operator properties can be window aligned for all
> >>>> partitions and also across the DAG.
> >>>>
> >>>> 4) Recording tuples: Like changing operator properties, we do have
> this
> >>>> support now but only at the individual physical operator level, and
> >>>>
> >>> without
> >>>
> >>>> control of which window to record tuples for. With a custom control
> >>>>
> >>> tuple,
> >>>
> >>>> because a control tuple must belong to a window, all operators in the
> >>>> DAG
> >>>> can start (and stop) recording for the same windows.
> >>>>
> >>>> I can think of two options to achieve this:
> >>>>
> >>>> 1) new custom control tuple type that takes user's serializable
> object.
> >>>>
> >>>> 2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples.
> >>>>
> >>>> Please provide your feedback. Thank you.
> >>>>
> >>>> David
> >>>>
> >>>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message