apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <...@apache.org>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Wed, 04 Jan 2017 20:25:35 GMT
I think there is (1) implicit propagation just like other control tuples
where the operator code isn't involved and (2) where the operator developer
wants to decide how control tuples are created or routed and will receive
and emit them on the output ports as desired.

I don't see a use case for hybrid approaches? Maybe propagation does not
need to be tied to ports at all, maybe just by annotation at the operator
level?

Thomas


On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> Wouldn't having this with output ports give a finer control on the
> propagation of control tuples?
> We might have an operator with two output ports each of which creates two
> different pipelines downstream. We would be able to say that one pipeline
> gets the control tuples and the other doesn't.
>
> ~ Bhupesh
>
>
> On Jan 4, 2017 11:55 PM, "Thomas Weise" <thw@apache.org> wrote:
>
> I'm referring to the operator that needs to make the decision to propagate
> or not. The tuples come from an input port, so it seems appropriate to say
> "don't propagate control tuples from this port". No matter how many output
> ports there are.
>
> Output ports are there for an operator to emit new tuples, in the case you
> are discussing you don't emit new control tuples.
>
> Thomas
>
>
> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > Hi Thomas,
> >
> > Are you suggesting an attribute on the input port for controlling the
> > propagation of control tuples to downstream operators?
> > I think it should be better to do it on the output port since the
> decision
> > to block the propagation will be made at the upstream operator rather
> than
> > at the downstream.
> > Also, we need another way of controlling the propagation at run time and
> > hence I was thinking about the method call on the output port, in
> addition
> > to the annotation on the output port (which is the static way).
> >
> > Please correct me if I have misunderstood your question.
> >
> > ~ Bhupesh
> >
> > On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <thw@apache.org> wrote:
> >
> > > Wouldn't it be more intuitive to control this with an attribute on the
> > > input port?
> > >
> > >
> > > On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Hi Pramod,
> > > >
> > > > I was thinking of a method setPropagateControlTuples(boolean
> > propagate)
> > > on
> > > > the output port of the operator.
> > > > The operator could disable this in the code at any point of time.
> > > > Note however that this is to block the propagation of control tuples
> > from
> > > > upstream. Any control tuples emitted explicitly by the operator would
> > > still
> > > > be emitted and sent to the downstream operators.
> > > >
> > > > Please see
> > > > https://github.com/apache/apex-core/pull/440/files#diff-
> > > > 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > > in the PR.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > > wrote:
> > > >
> > > > > 2 sounds good. Have you thought about what the method would look
> > like.
> > > > >
> > > > > On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Yes, that makes sense.
> > > > > > We have following options:
> > > > > > 1. Make the annotation false by default and force the user to
> > forward
> > > > the
> > > > > > control tuples explicitly.
> > > > > > 2. Annotation is true by default and static way of blocking stays
> > as
> > > it
> > > > > is.
> > > > > > We provide another way for blocking programmatically, perhaps by
> > > means
> > > > of
> > > > > > another method call on the port.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Dec 30, 2016 00:09, "Pramod Immaneni" <pramod@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Bhupesh,
> > > > > > >
> > > > > > > Annotation seems like a static way to stop propagation. Give
> > these
> > > > are
> > > > > > > programmatically generated I would think the operators should
> be
> > > able
> > > > > to
> > > > > > > stop (consume without propagating) programmatically as well.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Vlad, I am trying out the approach you mentioned
> > regarding
> > > > > > having
> > > > > > > > another interface which allows sinks to put a control tuple.
> > > > > > > >
> > > > > > > > Regarding the delivery of control tuples, here is what I am
> > > > planning
> > > > > to
> > > > > > > do:
> > > > > > > > All the control tuples which are emitted in a particular
> window
> > > are
> > > > > > > > delivered after all the data tuples have been delivered to
> the
> > > > > > respective
> > > > > > > > ports, but before the endWindow() call. The operator can then
> > > > process
> > > > > > the
> > > > > > > > control tuples in that window and can do any finalization in
> > the
> > > > end
> > > > > > > window
> > > > > > > > call. There will be no delivery of control tuples after
> > > endWindow()
> > > > > and
> > > > > > > > before the next beginWindow() call.
> > > > > > > >
> > > > > > > > For handling the propagation of control tuples further in the
> > > dag,
> > > > we
> > > > > > are
> > > > > > > > planning to have an annotation on the Output Port of the
> > operator
> > > > > which
> > > > > > > > would be true by default.
> > > > > > > > @OutputPortFieldAnnotation(propogateControlTuples = false).
> > > > > > > >
> > > > > > > > ~ Bhupesh
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > > v.rozov@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Custom control tuples are control tuples emitted by an
> > operator
> > > > > > itself
> > > > > > > > and
> > > > > > > > > not by the platform. Prior to the introduction of the
> custom
> > > > > control
> > > > > > > > > tuples, only Apex engine itself puts control tuples into
> > > various
> > > > > > sinks,
> > > > > > > > so
> > > > > > > > > the engine created necessary Tuple objects with the
> > > corresponding
> > > > > > type
> > > > > > > > > prior to calling Sink.put().
> > > > > > > > >
> > > > > > > > > Not all sinks need to be changed. Only control tuple aware
> > > sinks
> > > > > > should
> > > > > > > > > provide such functionality. In the case there is a lot of
> > code
> > > > > > > > duplication,
> > > > > > > > > please create an abstract class, that other control aware
> > sinks
> > > > > will
> > > > > > > > extend
> > > > > > > > > from.
> > > > > > > > >
> > > > > > > > > Thank you,
> > > > > > > > >
> > > > > > > > > Vlad
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > > > > > >
> > > > > > > > >> Hi Vlad,
> > > > > > > > >>
> > > > > > > > >> Thanks for the pointer on delegating the wrapping of the
> > user
> > > > > tuple
> > > > > > to
> > > > > > > > the
> > > > > > > > >> control port. I was trying this out today.
> > > > > > > > >> The problem I see us if we introduce a putControlTuple()
> > > method
> > > > in
> > > > > > > Sink,
> > > > > > > > >> then a lot of the existing sinks would change. Also the
> > > changes
> > > > > > seemed
> > > > > > > > >> redundant as, the existing control tuples already use the
> > > put()
> > > > > > method
> > > > > > > > of
> > > > > > > > >> sinks. So why do something special for custom control
> > tuples?
> > > > > > > > >>
> > > > > > > > >> The only aspect in which the custom control tuples are
> > > different
> > > > > is
> > > > > > > that
> > > > > > > > >> these will be generated by the user and will actually be
> > > > delivered
> > > > > > to
> > > > > > > > the
> > > > > > > > >> ports in a different order. Perhaps we should be able to
> use
> > > the
> > > > > > > > existing
> > > > > > > > >> flow. The only problems as outlined before seem to be
> > > > > identification
> > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> user tuple as a control tuple.
> > > > > > > > >>
> > > > > > > > >> ~ Bhupesh
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > > > > v.rozov@datatorrent.com
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> Why is it necessary to wrap in the OutputPort? Can't it be
> > > > > delegated
> > > > > > > to
> > > > > > > > a
> > > > > > > > >>> Sink by introducing new putControlTuple method?
> > > > > > > > >>>
> > > > > > > > >>> Thank you,
> > > > > > > > >>>
> > > > > > > > >>> Vlad
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > > > > > >>>
> > > > > > > > >>> Hi Vlad,
> > > > > > > > >>>>
> > > > > > > > >>>> The problem in using the Tuple class as the wrapper is
> > that
> > > > the
> > > > > > > Ports
> > > > > > > > >>>> belong to the API and we want to wrap the payload object
> > of
> > > > the
> > > > > > > > control
> > > > > > > > >>>> tuple into the Tuple class which is not part of the API.
> > > > > > > > >>>>
> > > > > > > > >>>> The output port will just get the payload of the user
> > > control
> > > > > > tuple.
> > > > > > > > For
> > > > > > > > >>>> example, if the user emits a Long, as a control tuple,
> the
> > > > > payload
> > > > > > > > >>>> object
> > > > > > > > >>>> will just be a Long object.
> > > > > > > > >>>>
> > > > > > > > >>>> It is necessary to bundle this Long into some
> recognizable
> > > > > object
> > > > > > so
> > > > > > > > >>>> that
> > > > > > > > >>>> the BufferServerPublisher knows that this is a Control
> > tuple
> > > > and
> > > > > > > not a
> > > > > > > > >>>> regular tuple and serialize it accordingly. It is
> > therefore
> > > > > > > necessary
> > > > > > > > >>>> that
> > > > > > > > >>>> the tuple be part of some known hierarchy so that can be
> > > > > > > distinguished
> > > > > > > > >>>> from
> > > > > > > > >>>> other payload tuples. Let us call this class
> > > > > > ControlTupleInterface.
> > > > > > > > Note
> > > > > > > > >>>> that this needs to be done before the tuple is inserted
> > into
> > > > the
> > > > > > > sink
> > > > > > > > >>>> which
> > > > > > > > >>>> is done in the port objects. Once the tuple is inserted
> > into
> > > > the
> > > > > > > sink,
> > > > > > > > >>>> it
> > > > > > > > >>>> would seem just like any other payload tuple and cannot
> be
> > > > > > > > >>>> distinguished.
> > > > > > > > >>>>
> > > > > > > > >>>> For this reason, I had something like the following in
> > API:
> > > > > > > > >>>>
> > > > > > > > >>>> package com.datatorrent.api;
> > > > > > > > >>>> public class ControlTupleInterface
> > > > > > > > >>>> {
> > > > > > > > >>>>     Object payload; // User control tuple payload. A
> > Long()
> > > > for
> > > > > > > > example.
> > > > > > > > >>>>     UUID id;  // Unique Id to de-duplicate in downstream
> > > > > operators
> > > > > > > > >>>> }
> > > > > > > > >>>>
> > > > > > > > >>>> Regarding your suggestion on using the Tuple class as
> the
> > > > > wrapper
> > > > > > > for
> > > > > > > > >>>> the
> > > > > > > > >>>> control tuple payload, let me mention the current
> scenario
> > > > flow
> > > > > to
> > > > > > > > make
> > > > > > > > >>>> the
> > > > > > > > >>>> discussion easier:
> > > > > > > > >>>>
> > > > > > > > >>>> We have a Tuple class in buffer server which is
> > responsible
> > > > for
> > > > > > > > >>>> serializing
> > > > > > > > >>>> the user control tuple bundling together a message type:
> > > > > > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > > > > > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > > > > > > > >>>> We have another Tuple class in Stram which helps the
> > > > > > > > >>>> BufferServerSubscriber
> > > > > > > > >>>> to de-serialize the serialized tuples. We should have
> > > > > > > > CustomControlTuple
> > > > > > > > >>>> in
> > > > > > > > >>>> stram as follows:
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > > > > > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This
> will
> > > > have a
> > > > > > > field
> > > > > > > > >>>> for
> > > > > > > > >>>>
> > > > > > > > >>>> user control payload.
> > > > > > > > >>>>
> > > > > > > > >>>> I think we should not expose the Tuple class in stram to
> > the
> > > > > API.
> > > > > > > That
> > > > > > > > >>>> was
> > > > > > > > >>>> the main reason I introduced another class/interface
> > > > > > > > >>>> ControlTupleInterface
> > > > > > > > >>>> as described above.
> > > > > > > > >>>>
> > > > > > > > >>>> Regarding, adding methods to DefaultInputPort and
> > > > > > > DefaultOutputPort, I
> > > > > > > > >>>> think error detection would not be early enough if the
> > > control
> > > > > > tuple
> > > > > > > > is
> > > > > > > > >>>> sent very late in the processing :-)
> > > > > > > > >>>> Extending the ports to ControlTupleAware* should help in
> > > this
> > > > > > case.
> > > > > > > > >>>> However, we still need to see if there are any downsides
> > on
> > > > > doing
> > > > > > > > this.
> > > > > > > > >>>>
> > > > > > > > >>>> Thanks.
> > > > > > > > >>>>
> > > > > > > > >>>> ~ Bhupesh
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > > > > > > v.rozov@datatorrent.com>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>> Hi Bhupesh,
> > > > > > > > >>>>
> > > > > > > > >>>>> it should not be a CustomWrapper.  The wrapper object
> > > should
> > > > be
> > > > > > > > >>>>> CustomControlTuple that extends Tuple. There is already
> > > code
> > > > > that
> > > > > > > > >>>>> checks
> > > > > > > > >>>>> for Tuple instance. The "unWrap" name is misleading,
> IMO.
> > > It
> > > > > > should
> > > > > > > > be
> > > > > > > > >>>>> something like customControlTuple.getPayload() or
> > > > > > > > >>>>> customControlTuple.getAttachment(). In the
> > emitControl(),
> > > > > create
> > > > > > > new
> > > > > > > > >>>>> CustomControlTuple using provided payload as one of
> > > > arguments.
> > > > > It
> > > > > > > may
> > > > > > > > >>>>> still
> > > > > > > > >>>>> be good to use common parent other than Object for
> > control
> > > > > tuple
> > > > > > > > >>>>> payload
> > > > > > > > >>>>> class hierarchy.
> > > > > > > > >>>>>
> > > > > > > > >>>>> I don't understand how adding more methods to the
> Default
> > > > > > > > >>>>> implementation
> > > > > > > > >>>>> will help with early error detection unless application
> > or
> > > > > > operator
> > > > > > > > >>>>> that
> > > > > > > > >>>>> relies on the custom control tuple functionality
> > explicitly
> > > > > > checks
> > > > > > > > for
> > > > > > > > >>>>> the
> > > > > > > > >>>>> platform version at run-time or tries to emit a control
> > > tuple
> > > > > > just
> > > > > > > to
> > > > > > > > >>>>> check
> > > > > > > > >>>>> that such functionality is supported by the platform.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thank you,
> > > > > > > > >>>>>
> > > > > > > > >>>>> Vlad
> > > > > > > > >>>>>
> > > > > > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>> Hi Vlad.
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Yes, the API should not change. We can take an Object
> > > > instead,
> > > > > > and
> > > > > > > > >>>>>> later
> > > > > > > > >>>>>> wrap it into the required class.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Our InputPort.put and emitControl method would look
> > > > something
> > > > > > like
> > > > > > > > the
> > > > > > > > >>>>>> following where we handle the wrapping and unwrapping
> > > > > > internally.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> public void put(T tuple)
> > > > > > > > >>>>>> {
> > > > > > > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > > > > > > >>>>>>        processControl(tuple.unWrap());
> > > > > > > > >>>>>>      }  else {
> > > > > > > > >>>>>>        process(tuple)
> > > > > > > > >>>>>>      }
> > > > > > > > >>>>>> }
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> emitControl(Object tuple)
> > > > > > > > >>>>>> {
> > > > > > > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > > > > > > >>>>>> }
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Regarding the compatibility issue, I think we have two
> > > ways
> > > > of
> > > > > > > doing
> > > > > > > > >>>>>> it:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort
> > and
> > > > > > create
> > > > > > > > >>>>>>       ControlAwareInput and ControlAwareOutput out of
> > it.
> > > > This
> > > > > > > might
> > > > > > > > >>>>>> require us
> > > > > > > > >>>>>>       to additionally handle specific cases when
> > > > > non-compatible
> > > > > > > > ports
> > > > > > > > >>>>>>       (ControlAwareOutput and DefaultInput, for
> example)
> > > are
> > > > > > > > >>>>>> connected to
> > > > > > > > >>>>>> each
> > > > > > > > >>>>>>       other in user apps.
> > > > > > > > >>>>>>       2. Add the additional methods in the existing
> > > Default
> > > > > > > > >>>>>> implementations.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> IMO, both of these would help in early error
> detection.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> ~ Bhupesh
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > > > > > > v.rozov@datatorrent.com>
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> A wrapper class is required for the control tuples
> > > delivery,
> > > > > but
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Port/Operator API should use Control Tuple payload
> > object
> > > > > only.
> > > > > > > > >>>>>>> Implementation of the wrapper class may change from
> > > version
> > > > > to
> > > > > > > > >>>>>>> version,
> > > > > > > > >>>>>>> but
> > > > > > > > >>>>>>> API should not be affected by the change.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I guess, assumption is that default input and output
> > port
> > > > > will
> > > > > > be
> > > > > > > > >>>>>>> extended
> > > > > > > > >>>>>>> to provide support for the control tuples. This may
> > cause
> > > > > some
> > > > > > > > >>>>>>> backward
> > > > > > > > >>>>>>> compatibility issues. Consider scenario when a newer
> > > > version
> > > > > of
> > > > > > > > >>>>>>> Malhar
> > > > > > > > >>>>>>> that
> > > > > > > > >>>>>>> relies on EOF control tuple is deployed into older
> > > version
> > > > of
> > > > > > > core
> > > > > > > > >>>>>>> that
> > > > > > > > >>>>>>> does not support control tuples. In such scenario,
> > error
> > > > will
> > > > > > be
> > > > > > > > >>>>>>> raised
> > > > > > > > >>>>>>> only when an operator tries to emit EOF control tuple
> > at
> > > > the
> > > > > > end
> > > > > > > > of a
> > > > > > > > >>>>>>> job.
> > > > > > > > >>>>>>> Introducing control tuple aware ports solve the early
> > > error
> > > > > > > > >>>>>>> detection.
> > > > > > > > >>>>>>> It
> > > > > > > > >>>>>>> will require some operators to be modified to use
> > control
> > > > > tuple
> > > > > > > > aware
> > > > > > > > >>>>>>> ports, but such change may help to distinguish
> control
> > > > tuple
> > > > > > > aware
> > > > > > > > >>>>>>> operators from their old versions.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Vlad
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I investigated this and seems like it is better to
> > have a
> > > > > > wrapper
> > > > > > > > >>>>>>> class
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> for
> > > > > > > > >>>>>>>> the user object.
> > > > > > > > >>>>>>>> This would serve 2 purposes:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>        1. Allow us to distinguish a custom control
> > tuple
> > > > > from
> > > > > > > > other
> > > > > > > > >>>>>>>> payload
> > > > > > > > >>>>>>>>        tuples.
> > > > > > > > >>>>>>>>        2. For the same control tuple received from
> > > > different
> > > > > > > > >>>>>>>> upstream
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>        partitions, we would have some mechanism to
> > > > > distinguish
> > > > > > > > >>>>>>>> between
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>> two in
> > > > > > > > >>>>>>>>        order to identify duplicates.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Additionally, the wrapper class needs to be part of
> > the
> > > > API
> > > > > as
> > > > > > > > >>>>>>>> DefaultOutputPort needs to know about it, before
> > putting
> > > > it
> > > > > > into
> > > > > > > > the
> > > > > > > > >>>>>>>> sink.
> > > > > > > > >>>>>>>> We can make sure that the user is not able to extend
> > or
> > > > > modify
> > > > > > > > this
> > > > > > > > >>>>>>>> class
> > > > > > > > >>>>>>>> in any manner.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <
> > > > > > davidyan@gmail.com
> > > > > > > >
> > > > > > > > >>>>>>>> wrote:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> This C type parameter is going to fix the control
> > tuple
> > > > type
> > > > > > at
> > > > > > > > >>>>>>>> compile
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> time and this is actually not what we want. Note
> that
> > > the
> > > > > > > operator
> > > > > > > > >>>>>>>> may
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> receive or emit multiple different control tuple
> > types.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> David
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> > > > > > > tushar@datatorrent.com
> > > > > > > > >
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> We do not need to create an interface for data
> > emitted
> > > > > > through
> > > > > > > > >>>>>>>>> emitControl or processed through processControl.
> > > > Internally
> > > > > > we
> > > > > > > > >>>>>>>>> could
> > > > > > > > >>>>>>>>> wrap the user object in ControlTuple. you can add
> > type
> > > > > > > parameter
> > > > > > > > >>>>>>>>> for
> > > > > > > > >>>>>>>>> control tuple object on ports.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> DefaultInputPort<D,C>
> > > > > > > > >>>>>>>>> D is the data type and C is the control tuple type
> > for
> > > > > better
> > > > > > > > error
> > > > > > > > >>>>>>>>> catching at compile phase.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> - Tushar.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
> > > > > > > > >>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Agreed Vlad and David.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> I am just suggesting there should be a wrapper for
> > the
> > > > user
> > > > > > > > object.
> > > > > > > > >>>>>>>>>> It
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> can
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> be a marker interface and we can call it something
> > > else
> > > > > like
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> "CustomControl".
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> The user object will be wrapped in another class
> > > > > > > "ControlTuple"
> > > > > > > > >>>>>>>>>> which
> > > > > > > > >>>>>>>>>> traverses the BufferServer and will perhaps be
> > > extended
> > > > > from
> > > > > > > the
> > > > > > > > >>>>>>>>>> packet/Tuple class. This class will not be exposed
> > to
> > > > the
> > > > > > > user.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
> > > > > > > > >>>>>>>>>> v.rozov@datatorrent.com>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> I agree with David. Payload of the control tuple
> is
> > in
> > > > the
> > > > > > > > >>>>>>>>> userObject
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> and
> > > > > > > > >>>>>>>>>> operators/ports don't need to be exposed to the
> > > > > > implementation
> > > > > > > > of
> > > > > > > > >>>>>>>>>> the
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> ControlTuple class. With the proposed interface
> > > > operators
> > > > > > > > >>>>>>>>>> developers
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> are
> > > > > > > > >>>>>>>>>>> free to extend ControlTuple further and I don't
> > think
> > > > > that
> > > > > > > such
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> capability
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> needs to be provided. The wrapping in the
> > > ControlTuple
> > > > > > class
> > > > > > > is
> > > > > > > > >>>>>>>>>> necessary
> > > > > > > > >>>>>>>>>> and most likely ControlTuple needs to be extended
> > from
> > > > the
> > > > > > > > buffer
> > > > > > > > >>>>>>>>>> server
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Tuple. It may be good to have a common parent
> other
> > > than
> > > > > > > Object
> > > > > > > > >>>>>>>>>> for
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>> user payloads, but it may be a marker interface
> as
> > > > well.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Thank you,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Vlad
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Hi David,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Actually, I was thinking of another API class
> > called
> > > > > > > > >>>>>>>>>>> ControlTuple,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> different from the actual tuple class in buffer
> > > server
> > > > > or
> > > > > > > > stram.
> > > > > > > > >>>>>>>>>>>> This could serve as a way for the Buffer server
> > > > > publisher
> > > > > > to
> > > > > > > > >>>>>>>>>>>> understand
> > > > > > > > >>>>>>>>>>>> that it is a control tuple and needs to be
> wrapped
> > > > > > > > differently.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <
> > > > davidyan@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > >>>>>>>>>>>>        public void processControl(ControlTuple
> > > tuple)
> > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>          // Default Implementation to avoid need
> > to
> > > > > > > implement
> > > > > > > > >>>>>>>>>>>> it in
> > > > > > > > >>>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>>> implementations
> > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > >>>>>>>>>>>>        public void emitControl(ControlTuple
> tuple)
> > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> I think we don't need to expose the ControlTuple
> > > class
> > > > > to
> > > > > > > the
> > > > > > > > >>>>>>>>>>>> operator
> > > > > > > > >>>>>>>>>>>> developers because the window ID is just the
> > current
> > > > > > window
> > > > > > > ID
> > > > > > > > >>>>>>>>>>>> when
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> these
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> methods are called. How about making them just
> > > Object?
> > > > > We
> > > > > > > also
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> need to
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> provide the way for the user to specify custom
> > > > serializer
> > > > > > for
> > > > > > > > the
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> tuple.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> David
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda
> <
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> Hi All,
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Here are the initial interfaces:
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > >>>>>>>>>>>>>        public void processControl(ControlTuple
> > > tuple)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>          // Default Implementation to avoid
> need
> > to
> > > > > > > implement
> > > > > > > > >>>>>>>>>>>>> it
> > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > >>>>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>>>> implementations
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > >>>>>>>>>>>>>        public void emitControl(ControlTuple
> > tuple)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> We have an option to add these methods to the
> > > > > interfaces
> > > > > > -
> > > > > > > > >>>>>>>>>>>>> InputPort
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> OutputPort; But these would not be backward
> > > > compatible
> > > > > > and
> > > > > > > > also
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> not
> > > > > > > > >>>>>>>>>>> consistent with the current implementation of
> basic
> > > > data
> > > > > > > tuple
> > > > > > > > >>>>>>>>>>> flow
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> (as
> > > > > > > > >>>>>>>>>>>> with process() and emit()).
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> We also need to expose an interface / class for
> > users
> > > > to
> > > > > > wrap
> > > > > > > > >>>>>>>>>>> their
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> object
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> and emit downstream. This should be part of
> API.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>> public class ControlTuple extends Tuple
> > > > > > > > >>>>>>>>>>>>> {
> > > > > > > > >>>>>>>>>>>>>        Object userObject;
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>        public ControlTuple(long windowId,
> Object
> > > > > > > userObject)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>          //
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> The emitted tuples would traverse the same flow
> > as
> > > > with
> > > > > > > other
> > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>> tuples. The plan is to intercept the control
> > tuples
> > > > in
> > > > > > > > >>>>>>>>>>>>> GenericNode
> > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > >>>>>>>>>>>>> use
> > > > > > > > >>>>>>>>>>>>> the Reservior to emit the control tuples at the
> > end
> > > > of
> > > > > > the
> > > > > > > > >>>>>>>>>>>>> window.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> GenericNode seems to be the best place to
> buffer
> > > > > incoming
> > > > > > > > >>>>>>>>>>>>> custom
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> tuples without delivering them immediately to
> the
> > > > > > operator
> > > > > > > > >>>>>>>>>>>> port.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Once
> > > > > > > > >>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>> end of the window is reached, we plan to use the
> > > > > reservoir
> > > > > > > sink
> > > > > > > > >>>>>>>>>>> to
> > > > > > > > >>>>>>>>>>> push
> > > > > > > > >>>>>>>>>>> them to the port. This is different behavior than
> > any
> > > > > other
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>> tuple
> > > > > > > > >>>>>>>>>>> where we are changing the order of tuples in the
> > > > stream.
> > > > > > The
> > > > > > > > >>>>>>>>>>> custom
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> tuples will be buffered and not delivered to the
> > > ports
> > > > > > until
> > > > > > > > the
> > > > > > > > >>>>>>>>>>>> end
> > > > > > > > >>>>>>>>>>>> of
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> window.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> To accomplish this, we need to have a public
> method
> > > in
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> SweepableReservoir
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> which allows to put a tuple back in the sink of
> > the
> > > > > > > > reservoir.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message