apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Thu, 05 Jan 2017 21:04:56 GMT
Agreed Thomas.
I was referring to the persona of the operator developer. The user of the
operator would not be doing anything related to the propagation of control
tuples. Actually, the behavior of the operator wrt. propagation of control
tuples would be part of the operator documentation.

Also, we are providing options for the developer to route the flow of
control tuples in code during the development of the operator. The
annotations would actually help achieve it in a easier way.

~ Bhupesh

On Jan 5, 2017 21:40, "Thomas Weise" <thw@apache.org> wrote:

I think it is important to be clear on the roles with regard to this
functionality. The user of the operator should not have to do anything to
get it to work. So while I suggested to consider attributes earlier, there
should not be any need for the user to set those. The operator needs to
work as is.

The persona concerned with propagation of control tuples is the operator
developer. I think the clear way for the operator developer to override the
propagation behavior is in code and if that is possible there is no need
for other things such as attributes or other port level settings.

Thomas


On Wed, Jan 4, 2017 at 10:20 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> I think we all agree on the use case for selective propagation. The
> question is about where to have the control - at the operator level or at
> the port level.
>
> For this ability, we have the following options:
>
>    1. Operator disables the propagation on selected output ports. Other
>    output ports propagate by default.
>    2. Operator disables propagation for the entire operator (by means of
an
>    attribute). Operator developer explicitly emits the received control
> tuples
>    on selected output ports.
>
> If the decision is to completely block the propagation, then Option 2 is
> easier to use as just an attribute needs to be set, as opposed to Option 1
> where user needs to set the annotation on each output port.
>
> However, if selective propagation is needed, Option 1 would just need the
> user to disable propagation on certain ports; rest are propagated by
> default, while Option 2 requires the user to explicitly emit the control
> tuples.
> ~ Bhupesh
>
>
> On Thu, Jan 5, 2017 at 3:46 AM, Thomas Weise <thw@apache.org> wrote:
>
> > Yes, I think that for any of these cases the operator developer will
turn
> > of implicit propagation for the operator and then write the code to
route
> > or create control tuples as needed.
> >
> > Thomas
> >
> > On Wed, Jan 4, 2017 at 12:59 PM, Amol Kekre <amol@datatorrent.com>
> wrote:
> >
> > > I agree that by default the propagation must be implicit, i.e. if the
> > > operator does nothing, the control tuple propagates. I do think users
> > > should have control on deciding to "not propagate" or "create new" and
> in
> > > these cases they would need to do something explicit (override)?
> > >
> > > The following cases come to mind
> > > 1. Sole consumer of a particular control signal (for example end of
> file)
> > > 2. Creator of a particular control signal (start of file, or a signal
> to
> > > pause on something etc.)
> > > 3. One port on a data pipeline and other port for meta-data pipeline
> > >
> > > In the above cases emit will be decided on an output port. #1 is only
> > place
> > > where all output ports will disable the tuple, #2 and #3 most likely
> will
> > > be selective.
> > >
> > > Thks
> > > Amol
> > >
> > >
> > > On Wed, Jan 4, 2017 at 12:25 PM, Thomas Weise <thw@apache.org> wrote:
> > >
> > > > I think there is (1) implicit propagation just like other control
> > tuples
> > > > where the operator code isn't involved and (2) where the operator
> > > developer
> > > > wants to decide how control tuples are created or routed and will
> > receive
> > > > and emit them on the output ports as desired.
> > > >
> > > > I don't see a use case for hybrid approaches? Maybe propagation does
> > not
> > > > need to be tied to ports at all, maybe just by annotation at the
> > operator
> > > > level?
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <
> > bhupesh@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Wouldn't having this with output ports give a finer control on the
> > > > > propagation of control tuples?
> > > > > We might have an operator with two output ports each of which
> creates
> > > two
> > > > > different pipelines downstream. We would be able to say that one
> > > pipeline
> > > > > gets the control tuples and the other doesn't.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > > On Jan 4, 2017 11:55 PM, "Thomas Weise" <thw@apache.org> wrote:
> > > > >
> > > > > I'm referring to the operator that needs to make the decision to
> > > > propagate
> > > > > or not. The tuples come from an input port, so it seems
appropriate
> > to
> > > > say
> > > > > "don't propagate control tuples from this port". No matter how
many
> > > > output
> > > > > ports there are.
> > > > >
> > > > > Output ports are there for an operator to emit new tuples, in the
> > case
> > > > you
> > > > > are discussing you don't emit new control tuples.
> > > > >
> > > > > Thomas
> > > > >
> > > > >
> > > > > On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Thomas,
> > > > > >
> > > > > > Are you suggesting an attribute on the input port for
controlling
> > the
> > > > > > propagation of control tuples to downstream operators?
> > > > > > I think it should be better to do it on the output port since
the
> > > > > decision
> > > > > > to block the propagation will be made at the upstream operator
> > rather
> > > > > than
> > > > > > at the downstream.
> > > > > > Also, we need another way of controlling the propagation at run
> > time
> > > > and
> > > > > > hence I was thinking about the method call on the output port,
in
> > > > > addition
> > > > > > to the annotation on the output port (which is the static way).
> > > > > >
> > > > > > Please correct me if I have misunderstood your question.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <thw@apache.org>
> > wrote:
> > > > > >
> > > > > > > Wouldn't it be more intuitive to control this with an
attribute
> > on
> > > > the
> > > > > > > input port?
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Pramod,
> > > > > > > >
> > > > > > > > I was thinking of a method setPropagateControlTuples(boolean
> > > > > > propagate)
> > > > > > > on
> > > > > > > > the output port of the operator.
> > > > > > > > The operator could disable this in the code at any point of
> > time.
> > > > > > > > Note however that this is to block the propagation of
control
> > > > tuples
> > > > > > from
> > > > > > > > upstream. Any control tuples emitted explicitly by the
> operator
> > > > would
> > > > > > > still
> > > > > > > > be emitted and sent to the downstream operators.
> > > > > > > >
> > > > > > > > Please see
> > > > > > > > https://github.com/apache/apex-core/pull/440/files#diff-
> > > > > > > > 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > > > > > > in the PR.
> > > > > > > >
> > > > > > > > ~ Bhupesh
> > > > > > > >
> > > > > > > > On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > > > > > pramod@datatorrent.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > 2 sounds good. Have you thought about what the method
would
> > > look
> > > > > > like.
> > > > > > > > >
> > > > > > > > > On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > > > > > bhupesh@datatorrent.com
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Yes, that makes sense.
> > > > > > > > > > We have following options:
> > > > > > > > > > 1. Make the annotation false by default and force the
> user
> > to
> > > > > > forward
> > > > > > > > the
> > > > > > > > > > control tuples explicitly.
> > > > > > > > > > 2. Annotation is true by default and static way of
> blocking
> > > > stays
> > > > > > as
> > > > > > > it
> > > > > > > > > is.
> > > > > > > > > > We provide another way for blocking programmatically,
> > perhaps
> > > > by
> > > > > > > means
> > > > > > > > of
> > > > > > > > > > another method call on the port.
> > > > > > > > > >
> > > > > > > > > > ~ Bhupesh
> > > > > > > > > >
> > > > > > > > > > On Dec 30, 2016 00:09, "Pramod Immaneni" <
> > > > pramod@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Bhupesh,
> > > > > > > > > > >
> > > > > > > > > > > Annotation seems like a static way to stop
propagation.
> > > Give
> > > > > > these
> > > > > > > > are
> > > > > > > > > > > programmatically generated I would think the operators
> > > should
> > > > > be
> > > > > > > able
> > > > > > > > > to
> > > > > > > > > > > stop (consume without propagating) programmatically as
> > > well.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > > > > > > > bhupesh@datatorrent.com
> > > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Thanks Vlad, I am trying out the approach you
> mentioned
> > > > > > regarding
> > > > > > > > > > having
> > > > > > > > > > > > another interface which allows sinks to put a
control
> > > > tuple.
> > > > > > > > > > > >
> > > > > > > > > > > > Regarding the delivery of control tuples, here is
> what
> > I
> > > am
> > > > > > > > planning
> > > > > > > > > to
> > > > > > > > > > > do:
> > > > > > > > > > > > All the control tuples which are emitted in a
> > particular
> > > > > window
> > > > > > > are
> > > > > > > > > > > > delivered after all the data tuples have been
> delivered
> > > to
> > > > > the
> > > > > > > > > > respective
> > > > > > > > > > > > ports, but before the endWindow() call. The operator
> > can
> > > > then
> > > > > > > > process
> > > > > > > > > > the
> > > > > > > > > > > > control tuples in that window and can do any
> > finalization
> > > > in
> > > > > > the
> > > > > > > > end
> > > > > > > > > > > window
> > > > > > > > > > > > call. There will be no delivery of control tuples
> after
> > > > > > > endWindow()
> > > > > > > > > and
> > > > > > > > > > > > before the next beginWindow() call.
> > > > > > > > > > > >
> > > > > > > > > > > > For handling the propagation of control tuples
> further
> > in
> > > > the
> > > > > > > dag,
> > > > > > > > we
> > > > > > > > > > are
> > > > > > > > > > > > planning to have an annotation on the Output Port of
> > the
> > > > > > operator
> > > > > > > > > which
> > > > > > > > > > > > would be true by default.
> > > > > > > > > > > > @OutputPortFieldAnnotation(propogateControlTuples =
> > > > false).
> > > > > > > > > > > >
> > > > > > > > > > > > ~ Bhupesh
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > > > > > > v.rozov@datatorrent.com
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Custom control tuples are control tuples emitted
by
> > an
> > > > > > operator
> > > > > > > > > > itself
> > > > > > > > > > > > and
> > > > > > > > > > > > > not by the platform. Prior to the introduction of
> the
> > > > > custom
> > > > > > > > > control
> > > > > > > > > > > > > tuples, only Apex engine itself puts control
tuples
> > > into
> > > > > > > various
> > > > > > > > > > sinks,
> > > > > > > > > > > > so
> > > > > > > > > > > > > the engine created necessary Tuple objects with
the
> > > > > > > corresponding
> > > > > > > > > > type
> > > > > > > > > > > > > prior to calling Sink.put().
> > > > > > > > > > > > >
> > > > > > > > > > > > > Not all sinks need to be changed. Only control
> tuple
> > > > aware
> > > > > > > sinks
> > > > > > > > > > should
> > > > > > > > > > > > > provide such functionality. In the case there is a
> > lot
> > > of
> > > > > > code
> > > > > > > > > > > > duplication,
> > > > > > > > > > > > > please create an abstract class, that other
control
> > > aware
> > > > > > sinks
> > > > > > > > > will
> > > > > > > > > > > > extend
> > > > > > > > > > > > > from.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Vlad
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hi Vlad,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks for the pointer on delegating the wrapping
> of
> > > the
> > > > > > user
> > > > > > > > > tuple
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > >> control port. I was trying this out today.
> > > > > > > > > > > > >> The problem I see us if we introduce a
> > > putControlTuple()
> > > > > > > method
> > > > > > > > in
> > > > > > > > > > > Sink,
> > > > > > > > > > > > >> then a lot of the existing sinks would change.
> Also
> > > the
> > > > > > > changes
> > > > > > > > > > seemed
> > > > > > > > > > > > >> redundant as, the existing control tuples already
> > use
> > > > the
> > > > > > > put()
> > > > > > > > > > method
> > > > > > > > > > > > of
> > > > > > > > > > > > >> sinks. So why do something special for custom
> > control
> > > > > > tuples?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> The only aspect in which the custom control
tuples
> > are
> > > > > > > different
> > > > > > > > > is
> > > > > > > > > > > that
> > > > > > > > > > > > >> these will be generated by the user and will
> > actually
> > > be
> > > > > > > > delivered
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > >> ports in a different order. Perhaps we should be
> > able
> > > to
> > > > > use
> > > > > > > the
> > > > > > > > > > > > existing
> > > > > > > > > > > > >> flow. The only problems as outlined before seem
to
> > be
> > > > > > > > > identification
> > > > > > > > > > > of
> > > > > > > > > > > > >> the
> > > > > > > > > > > > >> user tuple as a control tuple.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> ~ Bhupesh
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > > > > > > > > v.rozov@datatorrent.com
> > > > > > > > > > > >
> > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Why is it necessary to wrap in the OutputPort?
> Can't
> > > it
> > > > be
> > > > > > > > > delegated
> > > > > > > > > > > to
> > > > > > > > > > > > a
> > > > > > > > > > > > >>> Sink by introducing new putControlTuple method?
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Thank you,
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Vlad
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Hi Vlad,
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> The problem in using the Tuple class as the
> > wrapper
> > > is
> > > > > > that
> > > > > > > > the
> > > > > > > > > > > Ports
> > > > > > > > > > > > >>>> belong to the API and we want to wrap the
> payload
> > > > object
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > > control
> > > > > > > > > > > > >>>> tuple into the Tuple class which is not part of
> > the
> > > > API.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> The output port will just get the payload of
the
> > > user
> > > > > > > control
> > > > > > > > > > tuple.
> > > > > > > > > > > > For
> > > > > > > > > > > > >>>> example, if the user emits a Long, as a control
> > > tuple,
> > > > > the
> > > > > > > > > payload
> > > > > > > > > > > > >>>> object
> > > > > > > > > > > > >>>> will just be a Long object.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> It is necessary to bundle this Long into some
> > > > > recognizable
> > > > > > > > > object
> > > > > > > > > > so
> > > > > > > > > > > > >>>> that
> > > > > > > > > > > > >>>> the BufferServerPublisher knows that this is a
> > > Control
> > > > > > tuple
> > > > > > > > and
> > > > > > > > > > > not a
> > > > > > > > > > > > >>>> regular tuple and serialize it accordingly. It
> is
> > > > > > therefore
> > > > > > > > > > > necessary
> > > > > > > > > > > > >>>> that
> > > > > > > > > > > > >>>> the tuple be part of some known hierarchy so
> that
> > > can
> > > > be
> > > > > > > > > > > distinguished
> > > > > > > > > > > > >>>> from
> > > > > > > > > > > > >>>> other payload tuples. Let us call this class
> > > > > > > > > > ControlTupleInterface.
> > > > > > > > > > > > Note
> > > > > > > > > > > > >>>> that this needs to be done before the tuple is
> > > > inserted
> > > > > > into
> > > > > > > > the
> > > > > > > > > > > sink
> > > > > > > > > > > > >>>> which
> > > > > > > > > > > > >>>> is done in the port objects. Once the tuple is
> > > > inserted
> > > > > > into
> > > > > > > > the
> > > > > > > > > > > sink,
> > > > > > > > > > > > >>>> it
> > > > > > > > > > > > >>>> would seem just like any other payload tuple
and
> > > > cannot
> > > > > be
> > > > > > > > > > > > >>>> distinguished.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> For this reason, I had something like the
> > following
> > > in
> > > > > > API:
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> package com.datatorrent.api;
> > > > > > > > > > > > >>>> public class ControlTupleInterface
> > > > > > > > > > > > >>>> {
> > > > > > > > > > > > >>>>     Object payload; // User control tuple
> > payload. A
> > > > > > Long()
> > > > > > > > for
> > > > > > > > > > > > example.
> > > > > > > > > > > > >>>>     UUID id;  // Unique Id to de-duplicate in
> > > > downstream
> > > > > > > > > operators
> > > > > > > > > > > > >>>> }
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Regarding your suggestion on using the Tuple
> class
> > > as
> > > > > the
> > > > > > > > > wrapper
> > > > > > > > > > > for
> > > > > > > > > > > > >>>> the
> > > > > > > > > > > > >>>> control tuple payload, let me mention the
> current
> > > > > scenario
> > > > > > > > flow
> > > > > > > > > to
> > > > > > > > > > > > make
> > > > > > > > > > > > >>>> the
> > > > > > > > > > > > >>>> discussion easier:
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> We have a Tuple class in buffer server which is
> > > > > > responsible
> > > > > > > > for
> > > > > > > > > > > > >>>> serializing
> > > > > > > > > > > > >>>> the user control tuple bundling together a
> message
> > > > type:
> > > > > > > > > > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > > > > > > > > > > >>>> com.datatorrent.bufferserver.
> > > > packet.CustomControlTuple*
> > > > > > > > > > > > >>>> We have another Tuple class in Stram which
helps
> > the
> > > > > > > > > > > > >>>> BufferServerSubscriber
> > > > > > > > > > > > >>>> to de-serialize the serialized tuples. We
should
> > > have
> > > > > > > > > > > > CustomControlTuple
> > > > > > > > > > > > >>>> in
> > > > > > > > > > > > >>>> stram as follows:
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > > > > > > > > > > >>>> com.datatorrent.stram.tuple.
> > CustomControlTuple*This
> > > > > will
> > > > > > > > have a
> > > > > > > > > > > field
> > > > > > > > > > > > >>>> for
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> user control payload.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> I think we should not expose the Tuple class in
> > > stram
> > > > to
> > > > > > the
> > > > > > > > > API.
> > > > > > > > > > > That
> > > > > > > > > > > > >>>> was
> > > > > > > > > > > > >>>> the main reason I introduced another
> > class/interface
> > > > > > > > > > > > >>>> ControlTupleInterface
> > > > > > > > > > > > >>>> as described above.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Regarding, adding methods to DefaultInputPort
> and
> > > > > > > > > > > DefaultOutputPort, I
> > > > > > > > > > > > >>>> think error detection would not be early enough
> if
> > > the
> > > > > > > control
> > > > > > > > > > tuple
> > > > > > > > > > > > is
> > > > > > > > > > > > >>>> sent very late in the processing :-)
> > > > > > > > > > > > >>>> Extending the ports to ControlTupleAware*
should
> > > help
> > > > in
> > > > > > > this
> > > > > > > > > > case.
> > > > > > > > > > > > >>>> However, we still need to see if there are any
> > > > downsides
> > > > > > on
> > > > > > > > > doing
> > > > > > > > > > > > this.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Thanks.
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> ~ Bhupesh
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > > > > > > > > > > v.rozov@datatorrent.com>
> > > > > > > > > > > > >>>> wrote:
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>> Hi Bhupesh,
> > > > > > > > > > > > >>>>
> > > > > > > > > > > > >>>>> it should not be a CustomWrapper.  The wrapper
> > > object
> > > > > > > should
> > > > > > > > be
> > > > > > > > > > > > >>>>> CustomControlTuple that extends Tuple. There
is
> > > > already
> > > > > > > code
> > > > > > > > > that
> > > > > > > > > > > > >>>>> checks
> > > > > > > > > > > > >>>>> for Tuple instance. The "unWrap" name is
> > > misleading,
> > > > > IMO.
> > > > > > > It
> > > > > > > > > > should
> > > > > > > > > > > > be
> > > > > > > > > > > > >>>>> something like customControlTuple.getPayload()
> > or
> > > > > > > > > > > > >>>>> customControlTuple.getAttachment(). In the
> > > > > > emitControl(),
> > > > > > > > > create
> > > > > > > > > > > new
> > > > > > > > > > > > >>>>> CustomControlTuple using provided payload as
> one
> > of
> > > > > > > > arguments.
> > > > > > > > > It
> > > > > > > > > > > may
> > > > > > > > > > > > >>>>> still
> > > > > > > > > > > > >>>>> be good to use common parent other than Object
> > for
> > > > > > control
> > > > > > > > > tuple
> > > > > > > > > > > > >>>>> payload
> > > > > > > > > > > > >>>>> class hierarchy.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> I don't understand how adding more methods to
> the
> > > > > Default
> > > > > > > > > > > > >>>>> implementation
> > > > > > > > > > > > >>>>> will help with early error detection unless
> > > > application
> > > > > > or
> > > > > > > > > > operator
> > > > > > > > > > > > >>>>> that
> > > > > > > > > > > > >>>>> relies on the custom control tuple
> functionality
> > > > > > explicitly
> > > > > > > > > > checks
> > > > > > > > > > > > for
> > > > > > > > > > > > >>>>> the
> > > > > > > > > > > > >>>>> platform version at run-time or tries to emit
a
> > > > control
> > > > > > > tuple
> > > > > > > > > > just
> > > > > > > > > > > to
> > > > > > > > > > > > >>>>> check
> > > > > > > > > > > > >>>>> that such functionality is supported by the
> > > platform.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> Thank you,
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> Vlad
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>> Hi Vlad.
> > > > > > > > > > > > >>>>>
> > > > > > > > > > > > >>>>>> Yes, the API should not change. We can take
an
> > > > Object
> > > > > > > > instead,
> > > > > > > > > > and
> > > > > > > > > > > > >>>>>> later
> > > > > > > > > > > > >>>>>> wrap it into the required class.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> Our InputPort.put and emitControl method
would
> > > look
> > > > > > > > something
> > > > > > > > > > like
> > > > > > > > > > > > the
> > > > > > > > > > > > >>>>>> following where we handle the wrapping and
> > > > unwrapping
> > > > > > > > > > internally.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> public void put(T tuple)
> > > > > > > > > > > > >>>>>> {
> > > > > > > > > > > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > > > > > > > > > > >>>>>>        processControl(tuple.unWrap());
> > > > > > > > > > > > >>>>>>      }  else {
> > > > > > > > > > > > >>>>>>        process(tuple)
> > > > > > > > > > > > >>>>>>      }
> > > > > > > > > > > > >>>>>> }
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> emitControl(Object tuple)
> > > > > > > > > > > > >>>>>> {
> > > > > > > > > > > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > > > > > > > > > > >>>>>> }
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> Regarding the compatibility issue, I think we
> > have
> > > > two
> > > > > > > ways
> > > > > > > > of
> > > > > > > > > > > doing
> > > > > > > > > > > > >>>>>> it:
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>       1. Extend DefaultInputPort and
> > > > DefaultOutputPort
> > > > > > and
> > > > > > > > > > create
> > > > > > > > > > > > >>>>>>       ControlAwareInput and
ControlAwareOutput
> > out
> > > > of
> > > > > > it.
> > > > > > > > This
> > > > > > > > > > > might
> > > > > > > > > > > > >>>>>> require us
> > > > > > > > > > > > >>>>>>       to additionally handle specific cases
> when
> > > > > > > > > non-compatible
> > > > > > > > > > > > ports
> > > > > > > > > > > > >>>>>>       (ControlAwareOutput and DefaultInput,
> for
> > > > > example)
> > > > > > > are
> > > > > > > > > > > > >>>>>> connected to
> > > > > > > > > > > > >>>>>> each
> > > > > > > > > > > > >>>>>>       other in user apps.
> > > > > > > > > > > > >>>>>>       2. Add the additional methods in the
> > > existing
> > > > > > > Default
> > > > > > > > > > > > >>>>>> implementations.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> IMO, both of these would help in early error
> > > > > detection.
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> ~ Bhupesh
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > > > > > > > > > > v.rozov@datatorrent.com>
> > > > > > > > > > > > >>>>>> wrote:
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> A wrapper class is required for the control
> > tuples
> > > > > > > delivery,
> > > > > > > > > but
> > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > >>>>>> Port/Operator API should use Control Tuple
> > payload
> > > > > > object
> > > > > > > > > only.
> > > > > > > > > > > > >>>>>>> Implementation of the wrapper class may
> change
> > > from
> > > > > > > version
> > > > > > > > > to
> > > > > > > > > > > > >>>>>>> version,
> > > > > > > > > > > > >>>>>>> but
> > > > > > > > > > > > >>>>>>> API should not be affected by the change.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> I guess, assumption is that default input
and
> > > > output
> > > > > > port
> > > > > > > > > will
> > > > > > > > > > be
> > > > > > > > > > > > >>>>>>> extended
> > > > > > > > > > > > >>>>>>> to provide support for the control tuples.
> This
> > > may
> > > > > > cause
> > > > > > > > > some
> > > > > > > > > > > > >>>>>>> backward
> > > > > > > > > > > > >>>>>>> compatibility issues. Consider scenario when
> a
> > > > newer
> > > > > > > > version
> > > > > > > > > of
> > > > > > > > > > > > >>>>>>> Malhar
> > > > > > > > > > > > >>>>>>> that
> > > > > > > > > > > > >>>>>>> relies on EOF control tuple is deployed into
> > > older
> > > > > > > version
> > > > > > > > of
> > > > > > > > > > > core
> > > > > > > > > > > > >>>>>>> that
> > > > > > > > > > > > >>>>>>> does not support control tuples. In such
> > > scenario,
> > > > > > error
> > > > > > > > will
> > > > > > > > > > be
> > > > > > > > > > > > >>>>>>> raised
> > > > > > > > > > > > >>>>>>> only when an operator tries to emit EOF
> control
> > > > tuple
> > > > > > at
> > > > > > > > the
> > > > > > > > > > end
> > > > > > > > > > > > of a
> > > > > > > > > > > > >>>>>>> job.
> > > > > > > > > > > > >>>>>>> Introducing control tuple aware ports solve
> the
> > > > early
> > > > > > > error
> > > > > > > > > > > > >>>>>>> detection.
> > > > > > > > > > > > >>>>>>> It
> > > > > > > > > > > > >>>>>>> will require some operators to be modified
to
> > use
> > > > > > control
> > > > > > > > > tuple
> > > > > > > > > > > > aware
> > > > > > > > > > > > >>>>>>> ports, but such change may help to
> distinguish
> > > > > control
> > > > > > > > tuple
> > > > > > > > > > > aware
> > > > > > > > > > > > >>>>>>> operators from their old versions.
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> Vlad
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> I investigated this and seems like it is
> better
> > > to
> > > > > > have a
> > > > > > > > > > wrapper
> > > > > > > > > > > > >>>>>>> class
> > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > >>>>>>> for
> > > > > > > > > > > > >>>>>>>> the user object.
> > > > > > > > > > > > >>>>>>>> This would serve 2 purposes:
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>>        1. Allow us to distinguish a custom
> > > control
> > > > > > tuple
> > > > > > > > > from
> > > > > > > > > > > > other
> > > > > > > > > > > > >>>>>>>> payload
> > > > > > > > > > > > >>>>>>>>        tuples.
> > > > > > > > > > > > >>>>>>>>        2. For the same control tuple
> received
> > > from
> > > > > > > > different
> > > > > > > > > > > > >>>>>>>> upstream
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>>        partitions, we would have some
> > mechanism
> > > to
> > > > > > > > > distinguish
> > > > > > > > > > > > >>>>>>>> between
> > > > > > > > > > > > >>>>>>>> the
> > > > > > > > > > > > >>>>>>>> two in
> > > > > > > > > > > > >>>>>>>>        order to identify duplicates.
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> Additionally, the wrapper class needs to be
> > part
> > > > of
> > > > > > the
> > > > > > > > API
> > > > > > > > > as
> > > > > > > > > > > > >>>>>>>> DefaultOutputPort needs to know about it,
> > before
> > > > > > putting
> > > > > > > > it
> > > > > > > > > > into
> > > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>> sink.
> > > > > > > > > > > > >>>>>>>> We can make sure that the user is not able
> to
> > > > extend
> > > > > > or
> > > > > > > > > modify
> > > > > > > > > > > > this
> > > > > > > > > > > > >>>>>>>> class
> > > > > > > > > > > > >>>>>>>> in any manner.
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> ~ Bhupesh
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan
> <
> > > > > > > > > > davidyan@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > >>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> This C type parameter is going to fix the
> > > control
> > > > > > tuple
> > > > > > > > type
> > > > > > > > > > at
> > > > > > > > > > > > >>>>>>>> compile
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>> time and this is actually not what we want.
> > Note
> > > > > that
> > > > > > > the
> > > > > > > > > > > operator
> > > > > > > > > > > > >>>>>>>> may
> > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > >>>>>>>>> receive or emit multiple different control
> > > tuple
> > > > > > types.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> David
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> > > > > > > > > > > tushar@datatorrent.com
> > > > > > > > > > > > >
> > > > > > > > > > > > >>>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> We do not need to create an interface for
> > data
> > > > > > emitted
> > > > > > > > > > through
> > > > > > > > > > > > >>>>>>>>> emitControl or processed through
> > > processControl.
> > > > > > > > Internally
> > > > > > > > > > we
> > > > > > > > > > > > >>>>>>>>> could
> > > > > > > > > > > > >>>>>>>>> wrap the user object in ControlTuple. you
> can
> > > add
> > > > > > type
> > > > > > > > > > > parameter
> > > > > > > > > > > > >>>>>>>>> for
> > > > > > > > > > > > >>>>>>>>> control tuple object on ports.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> DefaultInputPort<D,C>
> > > > > > > > > > > > >>>>>>>>> D is the data type and C is the control
> tuple
> > > > type
> > > > > > for
> > > > > > > > > better
> > > > > > > > > > > > error
> > > > > > > > > > > > >>>>>>>>> catching at compile phase.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> - Tushar.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh
> > > Chawda <
> > > > > > > > > > > > >>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > > > > > >>>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> Agreed Vlad and David.
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> I am just suggesting there should be a
> > wrapper
> > > > for
> > > > > > the
> > > > > > > > user
> > > > > > > > > > > > object.
> > > > > > > > > > > > >>>>>>>>>> It
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> can
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> be a marker interface and we can call it
> > > > something
> > > > > > > else
> > > > > > > > > like
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> "CustomControl".
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> The user object will be wrapped in
another
> > > class
> > > > > > > > > > > "ControlTuple"
> > > > > > > > > > > > >>>>>>>>>> which
> > > > > > > > > > > > >>>>>>>>>> traverses the BufferServer and will
> perhaps
> > be
> > > > > > > extended
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>>>> packet/Tuple class. This class will not
be
> > > > exposed
> > > > > > to
> > > > > > > > the
> > > > > > > > > > > user.
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> ~ Bhupesh
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad
> Rozov
> > <
> > > > > > > > > > > > >>>>>>>>>> v.rozov@datatorrent.com>
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> I agree with David. Payload of the
control
> > > tuple
> > > > > is
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > >>>>>>>>> userObject
> > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > >>>>>>>>> and
> > > > > > > > > > > > >>>>>>>>>> operators/ports don't need to be exposed
> to
> > > the
> > > > > > > > > > implementation
> > > > > > > > > > > > of
> > > > > > > > > > > > >>>>>>>>>> the
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> ControlTuple class. With the proposed
> > > interface
> > > > > > > > operators
> > > > > > > > > > > > >>>>>>>>>> developers
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> are
> > > > > > > > > > > > >>>>>>>>>>> free to extend ControlTuple further and
I
> > > don't
> > > > > > think
> > > > > > > > > that
> > > > > > > > > > > such
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> capability
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> needs to be provided. The wrapping in
the
> > > > > > > ControlTuple
> > > > > > > > > > class
> > > > > > > > > > > is
> > > > > > > > > > > > >>>>>>>>>> necessary
> > > > > > > > > > > > >>>>>>>>>> and most likely ControlTuple needs to be
> > > > extended
> > > > > > from
> > > > > > > > the
> > > > > > > > > > > > buffer
> > > > > > > > > > > > >>>>>>>>>> server
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> Tuple. It may be good to have a common
> > parent
> > > > > other
> > > > > > > than
> > > > > > > > > > > Object
> > > > > > > > > > > > >>>>>>>>>> for
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> all
> > > > > > > > > > > > >>>>>>>>>>> user payloads, but it may be a marker
> > > interface
> > > > > as
> > > > > > > > well.
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Thank you,
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Vlad
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Hi David,
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Actually, I was thinking of another API
> > class
> > > > > > called
> > > > > > > > > > > > >>>>>>>>>>> ControlTuple,
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> different from the actual tuple class
in
> > > > buffer
> > > > > > > server
> > > > > > > > > or
> > > > > > > > > > > > stram.
> > > > > > > > > > > > >>>>>>>>>>>> This could serve as a way for the
Buffer
> > > > server
> > > > > > > > > publisher
> > > > > > > > > > to
> > > > > > > > > > > > >>>>>>>>>>>> understand
> > > > > > > > > > > > >>>>>>>>>>>> that it is a control tuple and needs to
> be
> > > > > wrapped
> > > > > > > > > > > > differently.
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> ~ Bhupesh
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <
> > > > > > > > davidyan@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > > > > > >>>>>>>>>>>>        public void
> > > processControl(ControlTuple
> > > > > > > tuple)
> > > > > > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > > > > > >>>>>>>>>>>>          // Default Implementation to
> > avoid
> > > > need
> > > > > > to
> > > > > > > > > > > implement
> > > > > > > > > > > > >>>>>>>>>>>> it in
> > > > > > > > > > > > >>>>>>>>>>>> all
> > > > > > > > > > > > >>>>>>>>>>>> implementations
> > > > > > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > > > > > >>>>>>>>>>>>        public void
> > emitControl(ControlTuple
> > > > > tuple)
> > > > > > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> I think we don't need to expose the
> > > > ControlTuple
> > > > > > > class
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>>>>>> operator
> > > > > > > > > > > > >>>>>>>>>>>> developers because the window ID is
just
> > the
> > > > > > current
> > > > > > > > > > window
> > > > > > > > > > > ID
> > > > > > > > > > > > >>>>>>>>>>>> when
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> these
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> methods are called. How about making
> them
> > > just
> > > > > > > Object?
> > > > > > > > > We
> > > > > > > > > > > also
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> need to
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> provide the way for the user to specify
> > custom
> > > > > > > > serializer
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> control
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> tuple.
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> David
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM,
Bhupesh
> > > > Chawda
> > > > > <
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>> Hi All,
> > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Here are the initial interfaces:
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > > > > > >>>>>>>>>>>>>        public void
> > > > processControl(ControlTuple
> > > > > > > tuple)
> > > > > > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > > > > > >>>>>>>>>>>>>          // Default Implementation to
> > avoid
> > > > > need
> > > > > > to
> > > > > > > > > > > implement
> > > > > > > > > > > > >>>>>>>>>>>>> it
> > > > > > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > > > > > >>>>>>>>>>>>> all
> > > > > > > > > > > > >>>>>>>>>>>>> implementations
> > > > > > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > > > > > >>>>>>>>>>>>>        public void
> > emitControl(ControlTuple
> > > > > > tuple)
> > > > > > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> We have an option to add these methods
> to
> > > the
> > > > > > > > > interfaces
> > > > > > > > > > -
> > > > > > > > > > > > >>>>>>>>>>>>> InputPort
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> OutputPort; But these would not be
> > backward
> > > > > > > > compatible
> > > > > > > > > > and
> > > > > > > > > > > > also
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> not
> > > > > > > > > > > > >>>>>>>>>>> consistent with the current
> implementation
> > of
> > > > > basic
> > > > > > > > data
> > > > > > > > > > > tuple
> > > > > > > > > > > > >>>>>>>>>>> flow
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> (as
> > > > > > > > > > > > >>>>>>>>>>>> with process() and emit()).
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> We also need to expose an interface /
> class
> > > for
> > > > > > users
> > > > > > > > to
> > > > > > > > > > wrap
> > > > > > > > > > > > >>>>>>>>>>> their
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> object
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> and emit downstream. This should be
> part
> > of
> > > > > API.
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>> public class ControlTuple extends
Tuple
> > > > > > > > > > > > >>>>>>>>>>>>> {
> > > > > > > > > > > > >>>>>>>>>>>>>        Object userObject;
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>>        public ControlTuple(long
> windowId,
> > > > > Object
> > > > > > > > > > > userObject)
> > > > > > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > > > > > >>>>>>>>>>>>>          //
> > > > > > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > > > > > >>>>>>>>>>>>> }
> > > > > > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> The emitted tuples would traverse the
> > same
> > > > flow
> > > > > > as
> > > > > > > > with
> > > > > > > > > > > other
> > > > > > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > > > > > >>>>>>>>>>>>> tuples. The plan is to intercept the
> > > control
> > > > > > tuples
> > > > > > > > in
> > > > > > > > > > > > >>>>>>>>>>>>> GenericNode
> > > > > > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > > > > > >>>>>>>>>>>>> use
> > > > > > > > > > > > >>>>>>>>>>>>> the Reservior to emit the control
> tuples
> > at
> > > > the
> > > > > > end
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>>>>>>> window.
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> GenericNode seems to be the best place
> to
> > > > > buffer
> > > > > > > > > incoming
> > > > > > > > > > > > >>>>>>>>>>>>> custom
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> tuples without delivering them
> > immediately
> > > to
> > > > > the
> > > > > > > > > > operator
> > > > > > > > > > > > >>>>>>>>>>>> port.
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> Once
> > > > > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > > > > >>>>>>>>>>> end of the window is reached, we plan to
> > use
> > > > the
> > > > > > > > > reservoir
> > > > > > > > > > > sink
> > > > > > > > > > > > >>>>>>>>>>> to
> > > > > > > > > > > > >>>>>>>>>>> push
> > > > > > > > > > > > >>>>>>>>>>> them to the port. This is different
> > behavior
> > > > than
> > > > > > any
> > > > > > > > > other
> > > > > > > > > > > > >>>>>>>>>>> control
> > > > > > > > > > > > >>>>>>>>>>> tuple
> > > > > > > > > > > > >>>>>>>>>>> where we are changing the order of
tuples
> > in
> > > > the
> > > > > > > > stream.
> > > > > > > > > > The
> > > > > > > > > > > > >>>>>>>>>>> custom
> > > > > > > > > > > > >>>>>>>>>>> control
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> tuples will be buffered and not
delivered
> > to
> > > > the
> > > > > > > ports
> > > > > > > > > > until
> > > > > > > > > > > > the
> > > > > > > > > > > > >>>>>>>>>>>> end
> > > > > > > > > > > > >>>>>>>>>>>> of
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> window.
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> To accomplish this, we need to have a
> > public
> > > > > method
> > > > > > > in
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> SweepableReservoir
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>> which allows to put a tuple back in
the
> > > sink
> > > > of
> > > > > > the
> > > > > > > > > > > > reservoir.
> > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>> ~ Bhupesh
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message