apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Tue, 03 Jan 2017 18:23:43 GMT
Yes David, that is correct.
The annotation is true by default.

However, it may happen that in the course of processing, we may want to
stop the propagation of control tuples to the downstream operators. In this
case, we should have an option to block the control tuples propagation to
downstream operators based on a runtime condition.

~ Bhupesh

On Jan 3, 2017 23:04, "David Yan" <davidyan@gmail.com> wrote:

> The annotation should be true by default. If an operator does not care
> about the control tuples, it should propagate them because the downstream
> might care about it. For example, let's say the original DAG looks like:
>
> A->B
>
> And A emits control tuples that B cares about, and of course along with
> other data tuples. Now I want to change the DAG and insert a filter
> operator F in between A and B for filtering data tuples.
>
> A->F->B
>
> F is not aware of any control tuples because it's the business between A
> and B, and F only filters on data tuples. Should the application developer
> who inserts F have to remember that we need to annotate F so that F will
> propagate the control tuples? The answer should be no.
>
> David
>
> On Sat, Dec 31, 2016 at 8:29 PM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > Yes, that makes sense.
> > We have following options:
> > 1. Make the annotation false by default and force the user to forward the
> > control tuples explicitly.
> > 2. Annotation is true by default and static way of blocking stays as it
> is.
> > We provide another way for blocking programmatically, perhaps by means of
> > another method call on the port.
> >
> > ~ Bhupesh
> >
> > On Dec 30, 2016 00:09, "Pramod Immaneni" <pramod@datatorrent.com> wrote:
> >
> > > Bhupesh,
> > >
> > > Annotation seems like a static way to stop propagation. Give these are
> > > programmatically generated I would think the operators should be able
> to
> > > stop (consume without propagating) programmatically as well.
> > >
> > > Thanks
> > >
> > > On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Thanks Vlad, I am trying out the approach you mentioned regarding
> > having
> > > > another interface which allows sinks to put a control tuple.
> > > >
> > > > Regarding the delivery of control tuples, here is what I am planning
> to
> > > do:
> > > > All the control tuples which are emitted in a particular window are
> > > > delivered after all the data tuples have been delivered to the
> > respective
> > > > ports, but before the endWindow() call. The operator can then process
> > the
> > > > control tuples in that window and can do any finalization in the end
> > > window
> > > > call. There will be no delivery of control tuples after endWindow()
> and
> > > > before the next beginWindow() call.
> > > >
> > > > For handling the propagation of control tuples further in the dag, we
> > are
> > > > planning to have an annotation on the Output Port of the operator
> which
> > > > would be true by default.
> > > > @OutputPortFieldAnnotation(propogateControlTuples = false).
> > > >
> > > > ~ Bhupesh
> > > >
> > > >
> > > > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <v.rozov@datatorrent.com
> >
> > > > wrote:
> > > >
> > > > > Custom control tuples are control tuples emitted by an operator
> > itself
> > > > and
> > > > > not by the platform. Prior to the introduction of the custom
> control
> > > > > tuples, only Apex engine itself puts control tuples into various
> > sinks,
> > > > so
> > > > > the engine created necessary Tuple objects with the corresponding
> > type
> > > > > prior to calling Sink.put().
> > > > >
> > > > > Not all sinks need to be changed. Only control tuple aware sinks
> > should
> > > > > provide such functionality. In the case there is a lot of code
> > > > duplication,
> > > > > please create an abstract class, that other control aware sinks
> will
> > > > extend
> > > > > from.
> > > > >
> > > > > Thank you,
> > > > >
> > > > > Vlad
> > > > >
> > > > >
> > > > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > > > >
> > > > >> Hi Vlad,
> > > > >>
> > > > >> Thanks for the pointer on delegating the wrapping of the user
> tuple
> > to
> > > > the
> > > > >> control port. I was trying this out today.
> > > > >> The problem I see us if we introduce a putControlTuple() method
in
> > > Sink,
> > > > >> then a lot of the existing sinks would change. Also the changes
> > seemed
> > > > >> redundant as, the existing control tuples already use the put()
> > method
> > > > of
> > > > >> sinks. So why do something special for custom control tuples?
> > > > >>
> > > > >> The only aspect in which the custom control tuples are different
> is
> > > that
> > > > >> these will be generated by the user and will actually be delivered
> > to
> > > > the
> > > > >> ports in a different order. Perhaps we should be able to use
the
> > > > existing
> > > > >> flow. The only problems as outlined before seem to be
> identification
> > > of
> > > > >> the
> > > > >> user tuple as a control tuple.
> > > > >>
> > > > >> ~ Bhupesh
> > > > >>
> > > > >>
> > > > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <
> > v.rozov@datatorrent.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> Why is it necessary to wrap in the OutputPort? Can't it be
> delegated
> > > to
> > > > a
> > > > >>> Sink by introducing new putControlTuple method?
> > > > >>>
> > > > >>> Thank you,
> > > > >>>
> > > > >>> Vlad
> > > > >>>
> > > > >>>
> > > > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > > > >>>
> > > > >>> Hi Vlad,
> > > > >>>>
> > > > >>>> The problem in using the Tuple class as the wrapper is
that the
> > > Ports
> > > > >>>> belong to the API and we want to wrap the payload object
of the
> > > > control
> > > > >>>> tuple into the Tuple class which is not part of the API.
> > > > >>>>
> > > > >>>> The output port will just get the payload of the user
control
> > tuple.
> > > > For
> > > > >>>> example, if the user emits a Long, as a control tuple,
the
> payload
> > > > >>>> object
> > > > >>>> will just be a Long object.
> > > > >>>>
> > > > >>>> It is necessary to bundle this Long into some recognizable
> object
> > so
> > > > >>>> that
> > > > >>>> the BufferServerPublisher knows that this is a Control
tuple and
> > > not a
> > > > >>>> regular tuple and serialize it accordingly. It is therefore
> > > necessary
> > > > >>>> that
> > > > >>>> the tuple be part of some known hierarchy so that can
be
> > > distinguished
> > > > >>>> from
> > > > >>>> other payload tuples. Let us call this class
> > ControlTupleInterface.
> > > > Note
> > > > >>>> that this needs to be done before the tuple is inserted
into the
> > > sink
> > > > >>>> which
> > > > >>>> is done in the port objects. Once the tuple is inserted
into the
> > > sink,
> > > > >>>> it
> > > > >>>> would seem just like any other payload tuple and cannot
be
> > > > >>>> distinguished.
> > > > >>>>
> > > > >>>> For this reason, I had something like the following in
API:
> > > > >>>>
> > > > >>>> package com.datatorrent.api;
> > > > >>>> public class ControlTupleInterface
> > > > >>>> {
> > > > >>>>     Object payload; // User control tuple payload. A
Long() for
> > > > example.
> > > > >>>>     UUID id;  // Unique Id to de-duplicate in downstream
> operators
> > > > >>>> }
> > > > >>>>
> > > > >>>> Regarding your suggestion on using the Tuple class as
the
> wrapper
> > > for
> > > > >>>> the
> > > > >>>> control tuple payload, let me mention the current scenario
flow
> to
> > > > make
> > > > >>>> the
> > > > >>>> discussion easier:
> > > > >>>>
> > > > >>>> We have a Tuple class in buffer server which is responsible
for
> > > > >>>> serializing
> > > > >>>> the user control tuple bundling together a message type:
> > > > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > > > >>>>
> > > > >>>>
> > > > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > > > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > > > >>>> We have another Tuple class in Stram which helps the
> > > > >>>> BufferServerSubscriber
> > > > >>>> to de-serialize the serialized tuples. We should have
> > > > CustomControlTuple
> > > > >>>> in
> > > > >>>> stram as follows:
> > > > >>>>
> > > > >>>>
> > > > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > > > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will
have a
> > > field
> > > > >>>> for
> > > > >>>>
> > > > >>>> user control payload.
> > > > >>>>
> > > > >>>> I think we should not expose the Tuple class in stram
to the
> API.
> > > That
> > > > >>>> was
> > > > >>>> the main reason I introduced another class/interface
> > > > >>>> ControlTupleInterface
> > > > >>>> as described above.
> > > > >>>>
> > > > >>>> Regarding, adding methods to DefaultInputPort and
> > > DefaultOutputPort, I
> > > > >>>> think error detection would not be early enough if the
control
> > tuple
> > > > is
> > > > >>>> sent very late in the processing :-)
> > > > >>>> Extending the ports to ControlTupleAware* should help
in this
> > case.
> > > > >>>> However, we still need to see if there are any downsides
on
> doing
> > > > this.
> > > > >>>>
> > > > >>>> Thanks.
> > > > >>>>
> > > > >>>> ~ Bhupesh
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> > > v.rozov@datatorrent.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Hi Bhupesh,
> > > > >>>>
> > > > >>>>> it should not be a CustomWrapper.  The wrapper object
should be
> > > > >>>>> CustomControlTuple that extends Tuple. There is already
code
> that
> > > > >>>>> checks
> > > > >>>>> for Tuple instance. The "unWrap" name is misleading,
IMO. It
> > should
> > > > be
> > > > >>>>> something like customControlTuple.getPayload() or
> > > > >>>>> customControlTuple.getAttachment(). In the emitControl(),
> create
> > > new
> > > > >>>>> CustomControlTuple using provided payload as one
of arguments.
> It
> > > may
> > > > >>>>> still
> > > > >>>>> be good to use common parent other than Object for
control
> tuple
> > > > >>>>> payload
> > > > >>>>> class hierarchy.
> > > > >>>>>
> > > > >>>>> I don't understand how adding more methods to the
Default
> > > > >>>>> implementation
> > > > >>>>> will help with early error detection unless application
or
> > operator
> > > > >>>>> that
> > > > >>>>> relies on the custom control tuple functionality
explicitly
> > checks
> > > > for
> > > > >>>>> the
> > > > >>>>> platform version at run-time or tries to emit a control
tuple
> > just
> > > to
> > > > >>>>> check
> > > > >>>>> that such functionality is supported by the platform.
> > > > >>>>>
> > > > >>>>> Thank you,
> > > > >>>>>
> > > > >>>>> Vlad
> > > > >>>>>
> > > > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > > > >>>>>
> > > > >>>>> Hi Vlad.
> > > > >>>>>
> > > > >>>>>> Yes, the API should not change. We can take an
Object instead,
> > and
> > > > >>>>>> later
> > > > >>>>>> wrap it into the required class.
> > > > >>>>>>
> > > > >>>>>> Our InputPort.put and emitControl method would
look something
> > like
> > > > the
> > > > >>>>>> following where we handle the wrapping and unwrapping
> > internally.
> > > > >>>>>>
> > > > >>>>>> public void put(T tuple)
> > > > >>>>>> {
> > > > >>>>>>      if (tuple instanceof CustomWrapper) {
> > > > >>>>>>        processControl(tuple.unWrap());
> > > > >>>>>>      }  else {
> > > > >>>>>>        process(tuple)
> > > > >>>>>>      }
> > > > >>>>>> }
> > > > >>>>>>
> > > > >>>>>> emitControl(Object tuple)
> > > > >>>>>> {
> > > > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > > > >>>>>> }
> > > > >>>>>>
> > > > >>>>>> Regarding the compatibility issue, I think we
have two ways of
> > > doing
> > > > >>>>>> it:
> > > > >>>>>>
> > > > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort
and
> > create
> > > > >>>>>>       ControlAwareInput and ControlAwareOutput
out of it. This
> > > might
> > > > >>>>>> require us
> > > > >>>>>>       to additionally handle specific cases when
> non-compatible
> > > > ports
> > > > >>>>>>       (ControlAwareOutput and DefaultInput, for
example) are
> > > > >>>>>> connected to
> > > > >>>>>> each
> > > > >>>>>>       other in user apps.
> > > > >>>>>>       2. Add the additional methods in the existing
Default
> > > > >>>>>> implementations.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> IMO, both of these would help in early error
detection.
> > > > >>>>>>
> > > > >>>>>> ~ Bhupesh
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > > > v.rozov@datatorrent.com>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>> A wrapper class is required for the control tuples
delivery,
> but
> > > > >>>>>>
> > > > >>>>>> Port/Operator API should use Control Tuple payload
object
> only.
> > > > >>>>>>> Implementation of the wrapper class may change
from version
> to
> > > > >>>>>>> version,
> > > > >>>>>>> but
> > > > >>>>>>> API should not be affected by the change.
> > > > >>>>>>>
> > > > >>>>>>> I guess, assumption is that default input
and output port
> will
> > be
> > > > >>>>>>> extended
> > > > >>>>>>> to provide support for the control tuples.
This may cause
> some
> > > > >>>>>>> backward
> > > > >>>>>>> compatibility issues. Consider scenario when
a newer version
> of
> > > > >>>>>>> Malhar
> > > > >>>>>>> that
> > > > >>>>>>> relies on EOF control tuple is deployed into
older version of
> > > core
> > > > >>>>>>> that
> > > > >>>>>>> does not support control tuples. In such
scenario, error will
> > be
> > > > >>>>>>> raised
> > > > >>>>>>> only when an operator tries to emit EOF control
tuple at the
> > end
> > > > of a
> > > > >>>>>>> job.
> > > > >>>>>>> Introducing control tuple aware ports solve
the early error
> > > > >>>>>>> detection.
> > > > >>>>>>> It
> > > > >>>>>>> will require some operators to be modified
to use control
> tuple
> > > > aware
> > > > >>>>>>> ports, but such change may help to distinguish
control tuple
> > > aware
> > > > >>>>>>> operators from their old versions.
> > > > >>>>>>>
> > > > >>>>>>> Vlad
> > > > >>>>>>>
> > > > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > > > >>>>>>>
> > > > >>>>>>> I investigated this and seems like it is
better to have a
> > wrapper
> > > > >>>>>>> class
> > > > >>>>>>>
> > > > >>>>>>> for
> > > > >>>>>>>> the user object.
> > > > >>>>>>>> This would serve 2 purposes:
> > > > >>>>>>>>
> > > > >>>>>>>>        1. Allow us to distinguish a custom
control tuple
> from
> > > > other
> > > > >>>>>>>> payload
> > > > >>>>>>>>        tuples.
> > > > >>>>>>>>        2. For the same control tuple
received from different
> > > > >>>>>>>> upstream
> > > > >>>>>>>>
> > > > >>>>>>>>        partitions, we would have some
mechanism to
> distinguish
> > > > >>>>>>>> between
> > > > >>>>>>>> the
> > > > >>>>>>>> two in
> > > > >>>>>>>>        order to identify duplicates.
> > > > >>>>>>>>
> > > > >>>>>>>> Additionally, the wrapper class needs
to be part of the API
> as
> > > > >>>>>>>> DefaultOutputPort needs to know about
it, before putting it
> > into
> > > > the
> > > > >>>>>>>> sink.
> > > > >>>>>>>> We can make sure that the user is not
able to extend or
> modify
> > > > this
> > > > >>>>>>>> class
> > > > >>>>>>>> in any manner.
> > > > >>>>>>>>
> > > > >>>>>>>> ~ Bhupesh
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David
Yan <
> > davidyan@gmail.com
> > > >
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> This C type parameter is going to fix
the control tuple type
> > at
> > > > >>>>>>>> compile
> > > > >>>>>>>>
> > > > >>>>>>>> time and this is actually not what we
want. Note that the
> > > operator
> > > > >>>>>>>> may
> > > > >>>>>>>>
> > > > >>>>>>>>> receive or emit multiple different
control tuple types.
> > > > >>>>>>>>>
> > > > >>>>>>>>> David
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar
Gosavi" <
> > > tushar@datatorrent.com
> > > > >
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> We do not need to create an interface
for data emitted
> > through
> > > > >>>>>>>>> emitControl or processed through
processControl. Internally
> > we
> > > > >>>>>>>>> could
> > > > >>>>>>>>> wrap the user object in ControlTuple.
you can add type
> > > parameter
> > > > >>>>>>>>> for
> > > > >>>>>>>>> control tuple object on ports.
> > > > >>>>>>>>>
> > > > >>>>>>>>> DefaultInputPort<D,C>
> > > > >>>>>>>>> D is the data type and C is the control
tuple type for
> better
> > > > error
> > > > >>>>>>>>> catching at compile phase.
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> - Tushar.
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM,
Bhupesh Chawda <
> > > > >>>>>>>>> bhupesh@datatorrent.com
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> Agreed Vlad and David.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I am just suggesting there should
be a wrapper for the user
> > > > object.
> > > > >>>>>>>>>> It
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> can
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> be a marker interface and we
can call it something else
> like
> > > > >>>>>>>>>
> > > > >>>>>>>>> "CustomControl".
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The user object will be wrapped
in another class
> > > "ControlTuple"
> > > > >>>>>>>>>> which
> > > > >>>>>>>>>> traverses the BufferServer and
will perhaps be extended
> from
> > > the
> > > > >>>>>>>>>> packet/Tuple class. This class
will not be exposed to the
> > > user.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11
AM, Vlad Rozov <
> > > > >>>>>>>>>> v.rozov@datatorrent.com>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I agree with David. Payload of
the control tuple is in the
> > > > >>>>>>>>> userObject
> > > > >>>>>>>>>
> > > > >>>>>>>>> and
> > > > >>>>>>>>>> operators/ports don't need to
be exposed to the
> > implementation
> > > > of
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> ControlTuple class. With the
proposed interface operators
> > > > >>>>>>>>>> developers
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> are
> > > > >>>>>>>>>>> free to extend ControlTuple
further and I don't think
> that
> > > such
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> capability
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> needs to be provided. The
wrapping in the ControlTuple
> > class
> > > is
> > > > >>>>>>>>>> necessary
> > > > >>>>>>>>>> and most likely ControlTuple
needs to be extended from the
> > > > buffer
> > > > >>>>>>>>>> server
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Tuple. It may be good to have
a common parent other than
> > > Object
> > > > >>>>>>>>>> for
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> all
> > > > >>>>>>>>>>> user payloads, but it may
be a marker interface as well.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thank you,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Vlad
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh
Chawda wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Hi David,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Actually, I was thinking
of another API class called
> > > > >>>>>>>>>>> ControlTuple,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> different from the actual
tuple class in buffer server
> or
> > > > stram.
> > > > >>>>>>>>>>>> This could serve as a
way for the Buffer server
> publisher
> > to
> > > > >>>>>>>>>>>> understand
> > > > >>>>>>>>>>>> that it is a control
tuple and needs to be wrapped
> > > > differently.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Dec 16, 2016 22:28,
"David Yan" <davidyan@gmail.com>
> > > > wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>       // DefaultInputPort
> > > > >>>>>>>>>>>>        public void processControl(ControlTuple
tuple)
> > > > >>>>>>>>>>>>        {
> > > > >>>>>>>>>>>>          // Default Implementation
to avoid need to
> > > implement
> > > > >>>>>>>>>>>> it in
> > > > >>>>>>>>>>>> all
> > > > >>>>>>>>>>>> implementations
> > > > >>>>>>>>>>>>        }
> > > > >>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>       // DefaultOutputPort
> > > > >>>>>>>>>>>>        public void emitControl(ControlTuple
tuple)
> > > > >>>>>>>>>>>>        {
> > > > >>>>>>>>>>>>        }
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> I think we don't need
to expose the ControlTuple class
> to
> > > the
> > > > >>>>>>>>>>>> operator
> > > > >>>>>>>>>>>> developers because the
window ID is just the current
> > window
> > > ID
> > > > >>>>>>>>>>>> when
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> these
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> methods are called. How
about making them just Object?
> We
> > > also
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> need to
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> provide the way for the user
to specify custom serializer
> > for
> > > > the
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> control
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> tuple.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> David
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43
AM, Bhupesh Chawda <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> bhupesh@datatorrent.com
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> Hi All,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Here are the initial interfaces:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>>       // DefaultInputPort
> > > > >>>>>>>>>>>>>        public void
processControl(ControlTuple tuple)
> > > > >>>>>>>>>>>>>        {
> > > > >>>>>>>>>>>>>          // Default
Implementation to avoid need to
> > > implement
> > > > >>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>> in
> > > > >>>>>>>>>>>>> all
> > > > >>>>>>>>>>>>> implementations
> > > > >>>>>>>>>>>>>        }
> > > > >>>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>>       // DefaultOutputPort
> > > > >>>>>>>>>>>>>        public void
emitControl(ControlTuple tuple)
> > > > >>>>>>>>>>>>>        {
> > > > >>>>>>>>>>>>>        }
> > > > >>>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> We have an option
to add these methods to the
> interfaces
> > -
> > > > >>>>>>>>>>>>> InputPort
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> OutputPort; But these
would not be backward compatible
> > and
> > > > also
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> not
> > > > >>>>>>>>>>> consistent with the current
implementation of basic data
> > > tuple
> > > > >>>>>>>>>>> flow
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> (as
> > > > >>>>>>>>>>>> with process() and emit()).
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> We also need to expose an
interface / class for users to
> > wrap
> > > > >>>>>>>>>>> their
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> object
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> and emit downstream.
This should be part of API.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>> public class ControlTuple
extends Tuple
> > > > >>>>>>>>>>>>> {
> > > > >>>>>>>>>>>>>        Object userObject;
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>        public ControlTuple(long
windowId, Object
> > > userObject)
> > > > >>>>>>>>>>>>>        {
> > > > >>>>>>>>>>>>>          //
> > > > >>>>>>>>>>>>>        }
> > > > >>>>>>>>>>>>> }
> > > > >>>>>>>>>>>>> {code}
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> The emitted tuples
would traverse the same flow as with
> > > other
> > > > >>>>>>>>>>>>> control
> > > > >>>>>>>>>>>>> tuples. The plan
is to intercept the control tuples in
> > > > >>>>>>>>>>>>> GenericNode
> > > > >>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>> use
> > > > >>>>>>>>>>>>> the Reservior to
emit the control tuples at the end of
> > the
> > > > >>>>>>>>>>>>> window.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> GenericNode seems
to be the best place to buffer
> incoming
> > > > >>>>>>>>>>>>> custom
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> control
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> tuples without delivering
them immediately to the
> > operator
> > > > >>>>>>>>>>>> port.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> Once
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>> end of the window is reached,
we plan to use the
> reservoir
> > > sink
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>> push
> > > > >>>>>>>>>>> them to the port. This is
different behavior than any
> other
> > > > >>>>>>>>>>> control
> > > > >>>>>>>>>>> tuple
> > > > >>>>>>>>>>> where we are changing the
order of tuples in the stream.
> > The
> > > > >>>>>>>>>>> custom
> > > > >>>>>>>>>>> control
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> tuples will be buffered and
not delivered to the ports
> > until
> > > > the
> > > > >>>>>>>>>>>> end
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> window.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> To accomplish this, we need
to have a public method in
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> SweepableReservoir
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> which allows to put
a tuple back in the sink of the
> > > > reservoir.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> ~ Bhupesh
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message