apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amol Kekre <a...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Wed, 04 Jan 2017 19:55:46 GMT
Yes, there is a chance that two output ports will have different send
requirements.

Thks
Amol


On Wed, Jan 4, 2017 at 10:59 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
wrote:

> Wouldn't having this with output ports give a finer control on the
> propagation of control tuples?
> We might have an operator with two output ports each of which creates two
> different pipelines downstream. We would be able to say that one pipeline
> gets the control tuples and the other doesn't.
>
> ~ Bhupesh
>
>
> On Jan 4, 2017 11:55 PM, "Thomas Weise" <thw@apache.org> wrote:
>
> I'm referring to the operator that needs to make the decision to propagate
> or not. The tuples come from an input port, so it seems appropriate to say
> "don't propagate control tuples from this port". No matter how many output
> ports there are.
>
> Output ports are there for an operator to emit new tuples, in the case you
> are discussing you don't emit new control tuples.
>
> Thomas
>
>
> On Wed, Jan 4, 2017 at 9:39 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > Hi Thomas,
> >
> > Are you suggesting an attribute on the input port for controlling the
> > propagation of control tuples to downstream operators?
> > I think it should be better to do it on the output port since the
> decision
> > to block the propagation will be made at the upstream operator rather
> than
> > at the downstream.
> > Also, we need another way of controlling the propagation at run time and
> > hence I was thinking about the method call on the output port, in
> addition
> > to the annotation on the output port (which is the static way).
> >
> > Please correct me if I have misunderstood your question.
> >
> > ~ Bhupesh
> >
> > On Wed, Jan 4, 2017 at 7:26 PM, Thomas Weise <thw@apache.org> wrote:
> >
> > > Wouldn't it be more intuitive to control this with an attribute on the
> > > input port?
> > >
> > >
> > > On Tue, Jan 3, 2017 at 11:06 PM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Hi Pramod,
> > > >
> > > > I was thinking of a method setPropagateControlTuples(boolean
> > propagate)
> > > on
> > > > the output port of the operator.
> > > > The operator could disable this in the code at any point of time.
> > > > Note however that this is to block the propagation of control tuples
> > from
> > > > upstream. Any control tuples emitted explicitly by the operator would
> > > still
> > > > be emitted and sent to the downstream operators.
> > > >
> > > > Please see
> > > > https://github.com/apache/apex-core/pull/440/files#diff-
> > > > 8aa0ca1a3e645fa60e9b376c118c00a3R68
> > > > in the PR.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Wed, Jan 4, 2017 at 6:53 AM, Pramod Immaneni <
> > pramod@datatorrent.com>
> > > > wrote:
> > > >
> > > > > 2 sounds good. Have you thought about what the method would look
> > like.
> > > > >
> > > > > On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Yes, that makes sense.
> > > > > > We have following options:
> > > > > > 1. Make the annotation false by default and force the user to
> > forward
> > > > the
> > > > > > control tuples explicitly.
> > > > > > 2. Annotation is true by default and static way of blocking stays
> > as
> > > it
> > > > > is.
> > > > > > We provide another way for blocking programmatically, perhaps by
> > > means
> > > > of
> > > > > > another method call on the port.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Dec 30, 2016 00:09, "Pramod Immaneni" <pramod@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Bhupesh,
> > > > > > >
> > > > > > > Annotation seems like a static way to stop propagation. Give
> > these
> > > > are
> > > > > > > programmatically generated I would think the operators should
> be
> > > able
> > > > > to
> > > > > > > stop (consume without propagating) programmatically as well.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> > > > > bhupesh@datatorrent.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks Vlad, I am trying out the approach you mentioned
> > regarding
> > > > > > having
> > > > > > > > another interface which allows sinks to put a control tuple.
> > > > > > > >
> > > > > > > > Regarding the delivery of control tuples, here is what I am
> > > > planning
> > > > > to
> > > > > > > do:
> > > > > > > > All the control tuples which are emitted in a particular
> window
> > > are
> > > > > > > > delivered after all the data tuples have been delivered to
> the
> > > > > > respective
> > > > > > > > ports, but before the endWindow() call. The operator can then
> > > > process
> > > > > > the
> > > > > > > > control tuples in that window and can do any finalization in
> > the
> > > > end
> > > > > > > window
> > > > > > > > call. There will be no delivery of control tuples after
> > > endWindow()
> > > > > and
> > > > > > > > before the next beginWindow() call.
> > > > > > > >
> > > > > > > > For handling the propagation of control tuples further in the
> > > dag,
> > > > we
> > > > > > are
> > > > > > > > planning to have an annotation on the Output Port of the
> > operator
> > > > > which
> > > > > > > > would be true by default.
> > > > > > > > @OutputPortFieldAnnotation(propogateControlTuples = false).
> > > > > > > >
> > > > > > > > ~ Bhupesh
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <
> > > > v.rozov@datatorrent.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Custom control tuples are control tuples emitted by an
> > operator
> > > > > > itself
> > > > > > > > and
> > > > > > > > > not by the platform. Prior to the introduction of the
> custom
> > > > > control
> > > > > > > > > tuples, only Apex engine itself puts control tuples into
> > > various
> > > > > > sinks,
> > > > > > > > so
> > > > > > > > > the engine created necessary Tuple objects with the
> > > corresponding
> > > > > > type
> > > > > > > > > prior to calling Sink.put().
> > > > > > > > >
> > > > > > > > > Not all sinks need to be changed. Only control tuple aware
> > > sinks
> > > > > > should
> > > > > > > > > provide such functionality. In the case there is a lot of
> > code
> > > > > > > > duplication,
> > > > > > > > > please create an abstract class, that other control aware
> > sinks
> > > > > will
> > > > > > > > extend
> > > > > > > > > from.
> > > > > > > > >
> > > > > > > > > Thank you,
> > > > > > > > >
> > > > > > > > > Vlad
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > > > > > >
> > > > > > > > >> Hi Vlad,
> > > > > > > > >>
> > > > > > > > >> Thanks for the pointer on delegating the wrapping of the
> > user
> > > > > tuple
> > > > > > to
> > > > > > > > the
> > > > > > > > >> control port. I was trying this out today.
> > > > > > > > >> The problem I see us if we introduce a putControlTuple()
> > > method
> > > > in
> > > > > > > Sink,
> > > > > > > > >> then a lot of the existing sinks would change. Also the
> > > changes
> > > > > > seemed
> > > > > > > > >> redundant as, the existing control tuples already use the
> > > put()
> > > > > > method
> > > > > > > > of
> > > > > > > > >> sinks. So why do something special for custom control
> > tuples?
> > > > > > > > >>
> > > > > > > > >> The only aspect in which the custom control tuples are
> > > different
> > > > > is
> > > > > > > that
> > > > > > > > >> these will be generated by the user and will actually be
> > > > delivered
> > > > > > to
> > > > > > > > the
> > > > > > > > >> ports in a different order. Perhaps we should be able to
> use
> > > the
> > > > > > > > existing
> > > > > > > > >> flow. The only problems as outlined before seem to be
> > > > > identification
> > > > > > > of
> > > > > > > > >> the
> > > > > > > > >> user tuple as a control tuple.
> > > > > > > > >>
> > > > > > > > >> ~ Bhupesh
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > > > > > v.rozov@datatorrent.com
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> Why is it necessary to wrap in the OutputPort? Can't it be
> > > > > delegated
> > > > > > > to
> > > > > > > > a
> > > > > > > > >>> Sink by introducing new putControlTuple method?
> > > > > > > > >>>
> > > > > > > > >>> Thank you,
> > > > > > > > >>>
> > > > > > > > >>> Vlad
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > > > > > >>>
> > > > > > > > >>> Hi Vlad,
> > > > > > > > >>>>
> > > > > > > > >>>> The problem in using the Tuple class as the wrapper is
> > that
> > > > the
> > > > > > > Ports
> > > > > > > > >>>> belong to the API and we want to wrap the payload object
> > of
> > > > the
> > > > > > > > control
> > > > > > > > >>>> tuple into the Tuple class which is not part of the API.
> > > > > > > > >>>>
> > > > > > > > >>>> The output port will just get the payload of the user
> > > control
> > > > > > tuple.
> > > > > > > > For
> > > > > > > > >>>> example, if the user emits a Long, as a control tuple,
> the
> > > > > payload
> > > > > > > > >>>> object
> > > > > > > > >>>> will just be a Long object.
> > > > > > > > >>>>
> > > > > > > > >>>> It is necessary to bundle this Long into some
> recognizable
> > > > > object
> > > > > > so
> > > > > > > > >>>> that
> > > > > > > > >>>> the BufferServerPublisher knows that this is a Control
> > tuple
> > > > and
> > > > > > > not a
> > > > > > > > >>>> regular tuple and serialize it accordingly. It is
> > therefore
> > > > > > > necessary
> > > > > > > > >>>> that
> > > > > > > > >>>> the tuple be part of some known hierarchy so that can be
> > > > > > > distinguished
> > > > > > > > >>>> from
> > > > > > > > >>>> other payload tuples. Let us call this class
> > > > > > ControlTupleInterface.
> > > > > > > > Note
> > > > > > > > >>>> that this needs to be done before the tuple is inserted
> > into
> > > > the
> > > > > > > sink
> > > > > > > > >>>> which
> > > > > > > > >>>> is done in the port objects. Once the tuple is inserted
> > into
> > > > the
> > > > > > > sink,
> > > > > > > > >>>> it
> > > > > > > > >>>> would seem just like any other payload tuple and cannot
> be
> > > > > > > > >>>> distinguished.
> > > > > > > > >>>>
> > > > > > > > >>>> For this reason, I had something like the following in
> > API:
> > > > > > > > >>>>
> > > > > > > > >>>> package com.datatorrent.api;
> > > > > > > > >>>> public class ControlTupleInterface
> > > > > > > > >>>> {
> > > > > > > > >>>>     Object payload; // User control tuple payload. A
> > Long()
> > > > for
> > > > > > > > example.
> > > > > > > > >>>>     UUID id;  // Unique Id to de-duplicate in downstream
> > > > > operators
> > > > > > > > >>>> }
> > > > > > > > >>>>
> > > > > > > > >>>> Regarding your suggestion on using the Tuple class as
> the
> > > > > wrapper
> > > > > > > for
> > > > > > > > >>>> the
> > > > > > > > >>>> control tuple payload, let me mention the current
> scenario
> > > > flow
> > > > > to
> > > > > > > > make
> > > > > > > > >>>> the
> > > > > > > > >>>> discussion easier:
> > > > > > > > >>>>
> > > > > > > > >>>> We have a Tuple class in buffer server which is
> > responsible
> > > > for
> > > > > > > > >>>> serializing
> > > > > > > > >>>> the user control tuple bundling together a message type:
> > > > > > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > > > > > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > > > > > > > >>>> We have another Tuple class in Stram which helps the
> > > > > > > > >>>> BufferServerSubscriber
> > > > > > > > >>>> to de-serialize the serialized tuples. We should have
> > > > > > > > CustomControlTuple
> > > > > > > > >>>> in
> > > > > > > > >>>> stram as follows:
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > > > > > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This
> will
> > > > have a
> > > > > > > field
> > > > > > > > >>>> for
> > > > > > > > >>>>
> > > > > > > > >>>> user control payload.
> > > > > > > > >>>>
> > > > > > > > >>>> I think we should not expose the Tuple class in stram to
> > the
> > > > > API.
> > > > > > > That
> > > > > > > > >>>> was
> > > > > > > > >>>> the main reason I introduced another class/interface
> > > > > > > > >>>> ControlTupleInterface
> > > > > > > > >>>> as described above.
> > > > > > > > >>>>
> > > > > > > > >>>> Regarding, adding methods to DefaultInputPort and
> > > > > > > DefaultOutputPort, I
> > > > > > > > >>>> think error detection would not be early enough if the
> > > control
> > > > > > tuple
> > > > > > > > is
> > > > > > > > >>>> sent very late in the processing :-)
> > > > > > > > >>>> Extending the ports to ControlTupleAware* should help in
> > > this
> > > > > > case.
> > > > > > > > >>>> However, we still need to see if there are any downsides
> > on
> > > > > doing
> > > > > > > > this.
> > > > > > > > >>>>
> > > > > > > > >>>> Thanks.
> > > > > > > > >>>>
> > > > > > > > >>>> ~ Bhupesh
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>>
> > > > > > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > > > > > > v.rozov@datatorrent.com>
> > > > > > > > >>>> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>> Hi Bhupesh,
> > > > > > > > >>>>
> > > > > > > > >>>>> it should not be a CustomWrapper.  The wrapper object
> > > should
> > > > be
> > > > > > > > >>>>> CustomControlTuple that extends Tuple. There is already
> > > code
> > > > > that
> > > > > > > > >>>>> checks
> > > > > > > > >>>>> for Tuple instance. The "unWrap" name is misleading,
> IMO.
> > > It
> > > > > > should
> > > > > > > > be
> > > > > > > > >>>>> something like customControlTuple.getPayload() or
> > > > > > > > >>>>> customControlTuple.getAttachment(). In the
> > emitControl(),
> > > > > create
> > > > > > > new
> > > > > > > > >>>>> CustomControlTuple using provided payload as one of
> > > > arguments.
> > > > > It
> > > > > > > may
> > > > > > > > >>>>> still
> > > > > > > > >>>>> be good to use common parent other than Object for
> > control
> > > > > tuple
> > > > > > > > >>>>> payload
> > > > > > > > >>>>> class hierarchy.
> > > > > > > > >>>>>
> > > > > > > > >>>>> I don't understand how adding more methods to the
> Default
> > > > > > > > >>>>> implementation
> > > > > > > > >>>>> will help with early error detection unless application
> > or
> > > > > > operator
> > > > > > > > >>>>> that
> > > > > > > > >>>>> relies on the custom control tuple functionality
> > explicitly
> > > > > > checks
> > > > > > > > for
> > > > > > > > >>>>> the
> > > > > > > > >>>>> platform version at run-time or tries to emit a control
> > > tuple
> > > > > > just
> > > > > > > to
> > > > > > > > >>>>> check
> > > > > > > > >>>>> that such functionality is supported by the platform.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thank you,
> > > > > > > > >>>>>
> > > > > > > > >>>>> Vlad
> > > > > > > > >>>>>
> > > > > > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>> Hi Vlad.
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Yes, the API should not change. We can take an Object
> > > > instead,
> > > > > > and
> > > > > > > > >>>>>> later
> > > > > > > > >>>>>> wrap it into the required class.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Our InputPort.put and emitControl method would look
> > > > something
> > > > > > like
> > > > > > > > the
> > > > > > > > >>>>>> following where we handle the wrapping and unwrapping
> > > > > > internally.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> public void put(T tuple)
> > > > > > > > >>>>>> {
> > > > > > > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > > > > > > >>>>>>        processControl(tuple.unWrap());
> > > > > > > > >>>>>>      }  else {
> > > > > > > > >>>>>>        process(tuple)
> > > > > > > > >>>>>>      }
> > > > > > > > >>>>>> }
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> emitControl(Object tuple)
> > > > > > > > >>>>>> {
> > > > > > > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > > > > > > >>>>>> }
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Regarding the compatibility issue, I think we have two
> > > ways
> > > > of
> > > > > > > doing
> > > > > > > > >>>>>> it:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort
> > and
> > > > > > create
> > > > > > > > >>>>>>       ControlAwareInput and ControlAwareOutput out of
> > it.
> > > > This
> > > > > > > might
> > > > > > > > >>>>>> require us
> > > > > > > > >>>>>>       to additionally handle specific cases when
> > > > > non-compatible
> > > > > > > > ports
> > > > > > > > >>>>>>       (ControlAwareOutput and DefaultInput, for
> example)
> > > are
> > > > > > > > >>>>>> connected to
> > > > > > > > >>>>>> each
> > > > > > > > >>>>>>       other in user apps.
> > > > > > > > >>>>>>       2. Add the additional methods in the existing
> > > Default
> > > > > > > > >>>>>> implementations.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> IMO, both of these would help in early error
> detection.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> ~ Bhupesh
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > > > > > > v.rozov@datatorrent.com>
> > > > > > > > >>>>>> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> A wrapper class is required for the control tuples
> > > delivery,
> > > > > but
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Port/Operator API should use Control Tuple payload
> > object
> > > > > only.
> > > > > > > > >>>>>>> Implementation of the wrapper class may change from
> > > version
> > > > > to
> > > > > > > > >>>>>>> version,
> > > > > > > > >>>>>>> but
> > > > > > > > >>>>>>> API should not be affected by the change.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I guess, assumption is that default input and output
> > port
> > > > > will
> > > > > > be
> > > > > > > > >>>>>>> extended
> > > > > > > > >>>>>>> to provide support for the control tuples. This may
> > cause
> > > > > some
> > > > > > > > >>>>>>> backward
> > > > > > > > >>>>>>> compatibility issues. Consider scenario when a newer
> > > > version
> > > > > of
> > > > > > > > >>>>>>> Malhar
> > > > > > > > >>>>>>> that
> > > > > > > > >>>>>>> relies on EOF control tuple is deployed into older
> > > version
> > > > of
> > > > > > > core
> > > > > > > > >>>>>>> that
> > > > > > > > >>>>>>> does not support control tuples. In such scenario,
> > error
> > > > will
> > > > > > be
> > > > > > > > >>>>>>> raised
> > > > > > > > >>>>>>> only when an operator tries to emit EOF control tuple
> > at
> > > > the
> > > > > > end
> > > > > > > > of a
> > > > > > > > >>>>>>> job.
> > > > > > > > >>>>>>> Introducing control tuple aware ports solve the early
> > > error
> > > > > > > > >>>>>>> detection.
> > > > > > > > >>>>>>> It
> > > > > > > > >>>>>>> will require some operators to be modified to use
> > control
> > > > > tuple
> > > > > > > > aware
> > > > > > > > >>>>>>> ports, but such change may help to distinguish
> control
> > > > tuple
> > > > > > > aware
> > > > > > > > >>>>>>> operators from their old versions.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Vlad
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I investigated this and seems like it is better to
> > have a
> > > > > > wrapper
> > > > > > > > >>>>>>> class
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> for
> > > > > > > > >>>>>>>> the user object.
> > > > > > > > >>>>>>>> This would serve 2 purposes:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>        1. Allow us to distinguish a custom control
> > tuple
> > > > > from
> > > > > > > > other
> > > > > > > > >>>>>>>> payload
> > > > > > > > >>>>>>>>        tuples.
> > > > > > > > >>>>>>>>        2. For the same control tuple received from
> > > > different
> > > > > > > > >>>>>>>> upstream
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>        partitions, we would have some mechanism to
> > > > > distinguish
> > > > > > > > >>>>>>>> between
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>> two in
> > > > > > > > >>>>>>>>        order to identify duplicates.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Additionally, the wrapper class needs to be part of
> > the
> > > > API
> > > > > as
> > > > > > > > >>>>>>>> DefaultOutputPort needs to know about it, before
> > putting
> > > > it
> > > > > > into
> > > > > > > > the
> > > > > > > > >>>>>>>> sink.
> > > > > > > > >>>>>>>> We can make sure that the user is not able to extend
> > or
> > > > > modify
> > > > > > > > this
> > > > > > > > >>>>>>>> class
> > > > > > > > >>>>>>>> in any manner.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <
> > > > > > davidyan@gmail.com
> > > > > > > >
> > > > > > > > >>>>>>>> wrote:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> This C type parameter is going to fix the control
> > tuple
> > > > type
> > > > > > at
> > > > > > > > >>>>>>>> compile
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> time and this is actually not what we want. Note
> that
> > > the
> > > > > > > operator
> > > > > > > > >>>>>>>> may
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> receive or emit multiple different control tuple
> > types.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> David
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> > > > > > > tushar@datatorrent.com
> > > > > > > > >
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> We do not need to create an interface for data
> > emitted
> > > > > > through
> > > > > > > > >>>>>>>>> emitControl or processed through processControl.
> > > > Internally
> > > > > > we
> > > > > > > > >>>>>>>>> could
> > > > > > > > >>>>>>>>> wrap the user object in ControlTuple. you can add
> > type
> > > > > > > parameter
> > > > > > > > >>>>>>>>> for
> > > > > > > > >>>>>>>>> control tuple object on ports.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> DefaultInputPort<D,C>
> > > > > > > > >>>>>>>>> D is the data type and C is the control tuple type
> > for
> > > > > better
> > > > > > > > error
> > > > > > > > >>>>>>>>> catching at compile phase.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> - Tushar.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
> > > > > > > > >>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > >>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Agreed Vlad and David.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> I am just suggesting there should be a wrapper for
> > the
> > > > user
> > > > > > > > object.
> > > > > > > > >>>>>>>>>> It
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> can
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> be a marker interface and we can call it something
> > > else
> > > > > like
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> "CustomControl".
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> The user object will be wrapped in another class
> > > > > > > "ControlTuple"
> > > > > > > > >>>>>>>>>> which
> > > > > > > > >>>>>>>>>> traverses the BufferServer and will perhaps be
> > > extended
> > > > > from
> > > > > > > the
> > > > > > > > >>>>>>>>>> packet/Tuple class. This class will not be exposed
> > to
> > > > the
> > > > > > > user.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
> > > > > > > > >>>>>>>>>> v.rozov@datatorrent.com>
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> I agree with David. Payload of the control tuple
> is
> > in
> > > > the
> > > > > > > > >>>>>>>>> userObject
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> and
> > > > > > > > >>>>>>>>>> operators/ports don't need to be exposed to the
> > > > > > implementation
> > > > > > > > of
> > > > > > > > >>>>>>>>>> the
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> ControlTuple class. With the proposed interface
> > > > operators
> > > > > > > > >>>>>>>>>> developers
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> are
> > > > > > > > >>>>>>>>>>> free to extend ControlTuple further and I don't
> > think
> > > > > that
> > > > > > > such
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> capability
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> needs to be provided. The wrapping in the
> > > ControlTuple
> > > > > > class
> > > > > > > is
> > > > > > > > >>>>>>>>>> necessary
> > > > > > > > >>>>>>>>>> and most likely ControlTuple needs to be extended
> > from
> > > > the
> > > > > > > > buffer
> > > > > > > > >>>>>>>>>> server
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Tuple. It may be good to have a common parent
> other
> > > than
> > > > > > > Object
> > > > > > > > >>>>>>>>>> for
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>> user payloads, but it may be a marker interface
> as
> > > > well.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Thank you,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Vlad
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Hi David,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Actually, I was thinking of another API class
> > called
> > > > > > > > >>>>>>>>>>> ControlTuple,
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> different from the actual tuple class in buffer
> > > server
> > > > > or
> > > > > > > > stram.
> > > > > > > > >>>>>>>>>>>> This could serve as a way for the Buffer server
> > > > > publisher
> > > > > > to
> > > > > > > > >>>>>>>>>>>> understand
> > > > > > > > >>>>>>>>>>>> that it is a control tuple and needs to be
> wrapped
> > > > > > > > differently.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <
> > > > davidyan@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > >>>>>>>>>>>>        public void processControl(ControlTuple
> > > tuple)
> > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>          // Default Implementation to avoid need
> > to
> > > > > > > implement
> > > > > > > > >>>>>>>>>>>> it in
> > > > > > > > >>>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>>> implementations
> > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > >>>>>>>>>>>>        public void emitControl(ControlTuple
> tuple)
> > > > > > > > >>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> I think we don't need to expose the ControlTuple
> > > class
> > > > > to
> > > > > > > the
> > > > > > > > >>>>>>>>>>>> operator
> > > > > > > > >>>>>>>>>>>> developers because the window ID is just the
> > current
> > > > > > window
> > > > > > > ID
> > > > > > > > >>>>>>>>>>>> when
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> these
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> methods are called. How about making them just
> > > Object?
> > > > > We
> > > > > > > also
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> need to
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> provide the way for the user to specify custom
> > > > serializer
> > > > > > for
> > > > > > > > the
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> tuple.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> David
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda
> <
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> bhupesh@datatorrent.com
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>> Hi All,
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Here are the initial interfaces:
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>       // DefaultInputPort
> > > > > > > > >>>>>>>>>>>>>        public void processControl(ControlTuple
> > > tuple)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>          // Default Implementation to avoid
> need
> > to
> > > > > > > implement
> > > > > > > > >>>>>>>>>>>>> it
> > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > >>>>>>>>>>>>> all
> > > > > > > > >>>>>>>>>>>>> implementations
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > > > > > > >>>>>>>>>>>>>        public void emitControl(ControlTuple
> > tuple)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> We have an option to add these methods to the
> > > > > interfaces
> > > > > > -
> > > > > > > > >>>>>>>>>>>>> InputPort
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> OutputPort; But these would not be backward
> > > > compatible
> > > > > > and
> > > > > > > > also
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> not
> > > > > > > > >>>>>>>>>>> consistent with the current implementation of
> basic
> > > > data
> > > > > > > tuple
> > > > > > > > >>>>>>>>>>> flow
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> (as
> > > > > > > > >>>>>>>>>>>> with process() and emit()).
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> We also need to expose an interface / class for
> > users
> > > > to
> > > > > > wrap
> > > > > > > > >>>>>>>>>>> their
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> object
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> and emit downstream. This should be part of
> API.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>> public class ControlTuple extends Tuple
> > > > > > > > >>>>>>>>>>>>> {
> > > > > > > > >>>>>>>>>>>>>        Object userObject;
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>        public ControlTuple(long windowId,
> Object
> > > > > > > userObject)
> > > > > > > > >>>>>>>>>>>>>        {
> > > > > > > > >>>>>>>>>>>>>          //
> > > > > > > > >>>>>>>>>>>>>        }
> > > > > > > > >>>>>>>>>>>>> }
> > > > > > > > >>>>>>>>>>>>> {code}
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> The emitted tuples would traverse the same flow
> > as
> > > > with
> > > > > > > other
> > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>> tuples. The plan is to intercept the control
> > tuples
> > > > in
> > > > > > > > >>>>>>>>>>>>> GenericNode
> > > > > > > > >>>>>>>>>>>>> and
> > > > > > > > >>>>>>>>>>>>> use
> > > > > > > > >>>>>>>>>>>>> the Reservior to emit the control tuples at the
> > end
> > > > of
> > > > > > the
> > > > > > > > >>>>>>>>>>>>> window.
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> GenericNode seems to be the best place to
> buffer
> > > > > incoming
> > > > > > > > >>>>>>>>>>>>> custom
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> tuples without delivering them immediately to
> the
> > > > > > operator
> > > > > > > > >>>>>>>>>>>> port.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> Once
> > > > > > > > >>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>> end of the window is reached, we plan to use the
> > > > > reservoir
> > > > > > > sink
> > > > > > > > >>>>>>>>>>> to
> > > > > > > > >>>>>>>>>>> push
> > > > > > > > >>>>>>>>>>> them to the port. This is different behavior than
> > any
> > > > > other
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>> tuple
> > > > > > > > >>>>>>>>>>> where we are changing the order of tuples in the
> > > > stream.
> > > > > > The
> > > > > > > > >>>>>>>>>>> custom
> > > > > > > > >>>>>>>>>>> control
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> tuples will be buffered and not delivered to the
> > > ports
> > > > > > until
> > > > > > > > the
> > > > > > > > >>>>>>>>>>>> end
> > > > > > > > >>>>>>>>>>>> of
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>> the
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> window.
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> To accomplish this, we need to have a public
> method
> > > in
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> SweepableReservoir
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>> which allows to put a tuple back in the sink of
> > the
> > > > > > > > reservoir.
> > > > > > > > >>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>> ~ Bhupesh
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >>>>>>>>>>>>>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message