apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Sun, 01 Jan 2017 04:29:52 GMT
Yes, that makes sense.
We have following options:
1. Make the annotation false by default and force the user to forward the
control tuples explicitly.
2. Annotation is true by default and static way of blocking stays as it is.
We provide another way for blocking programmatically, perhaps by means of
another method call on the port.

~ Bhupesh

On Dec 30, 2016 00:09, "Pramod Immaneni" <pramod@datatorrent.com> wrote:

> Bhupesh,
>
> Annotation seems like a static way to stop propagation. Give these are
> programmatically generated I would think the operators should be able to
> stop (consume without propagating) programmatically as well.
>
> Thanks
>
> On Thu, Dec 29, 2016 at 8:48 AM, Bhupesh Chawda <bhupesh@datatorrent.com>
> wrote:
>
> > Thanks Vlad, I am trying out the approach you mentioned regarding having
> > another interface which allows sinks to put a control tuple.
> >
> > Regarding the delivery of control tuples, here is what I am planning to
> do:
> > All the control tuples which are emitted in a particular window are
> > delivered after all the data tuples have been delivered to the respective
> > ports, but before the endWindow() call. The operator can then process the
> > control tuples in that window and can do any finalization in the end
> window
> > call. There will be no delivery of control tuples after endWindow() and
> > before the next beginWindow() call.
> >
> > For handling the propagation of control tuples further in the dag, we are
> > planning to have an annotation on the Output Port of the operator which
> > would be true by default.
> > @OutputPortFieldAnnotation(propogateControlTuples = false).
> >
> > ~ Bhupesh
> >
> >
> > On Thu, Dec 29, 2016 at 6:24 AM, Vlad Rozov <v.rozov@datatorrent.com>
> > wrote:
> >
> > > Custom control tuples are control tuples emitted by an operator itself
> > and
> > > not by the platform. Prior to the introduction of the custom control
> > > tuples, only Apex engine itself puts control tuples into various sinks,
> > so
> > > the engine created necessary Tuple objects with the corresponding type
> > > prior to calling Sink.put().
> > >
> > > Not all sinks need to be changed. Only control tuple aware sinks should
> > > provide such functionality. In the case there is a lot of code
> > duplication,
> > > please create an abstract class, that other control aware sinks will
> > extend
> > > from.
> > >
> > > Thank you,
> > >
> > > Vlad
> > >
> > >
> > > On 12/23/16 06:24, Bhupesh Chawda wrote:
> > >
> > >> Hi Vlad,
> > >>
> > >> Thanks for the pointer on delegating the wrapping of the user tuple to
> > the
> > >> control port. I was trying this out today.
> > >> The problem I see us if we introduce a putControlTuple() method in
> Sink,
> > >> then a lot of the existing sinks would change. Also the changes seemed
> > >> redundant as, the existing control tuples already use the put() method
> > of
> > >> sinks. So why do something special for custom control tuples?
> > >>
> > >> The only aspect in which the custom control tuples are different is
> that
> > >> these will be generated by the user and will actually be delivered to
> > the
> > >> ports in a different order. Perhaps we should be able to use the
> > existing
> > >> flow. The only problems as outlined before seem to be identification
> of
> > >> the
> > >> user tuple as a control tuple.
> > >>
> > >> ~ Bhupesh
> > >>
> > >>
> > >> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <v.rozov@datatorrent.com
> >
> > >> wrote:
> > >>
> > >> Why is it necessary to wrap in the OutputPort? Can't it be delegated
> to
> > a
> > >>> Sink by introducing new putControlTuple method?
> > >>>
> > >>> Thank you,
> > >>>
> > >>> Vlad
> > >>>
> > >>>
> > >>> On 12/21/16 22:10, Bhupesh Chawda wrote:
> > >>>
> > >>> Hi Vlad,
> > >>>>
> > >>>> The problem in using the Tuple class as the wrapper is that the
> Ports
> > >>>> belong to the API and we want to wrap the payload object of the
> > control
> > >>>> tuple into the Tuple class which is not part of the API.
> > >>>>
> > >>>> The output port will just get the payload of the user control tuple.
> > For
> > >>>> example, if the user emits a Long, as a control tuple, the payload
> > >>>> object
> > >>>> will just be a Long object.
> > >>>>
> > >>>> It is necessary to bundle this Long into some recognizable object
so
> > >>>> that
> > >>>> the BufferServerPublisher knows that this is a Control tuple and
> not a
> > >>>> regular tuple and serialize it accordingly. It is therefore
> necessary
> > >>>> that
> > >>>> the tuple be part of some known hierarchy so that can be
> distinguished
> > >>>> from
> > >>>> other payload tuples. Let us call this class ControlTupleInterface.
> > Note
> > >>>> that this needs to be done before the tuple is inserted into the
> sink
> > >>>> which
> > >>>> is done in the port objects. Once the tuple is inserted into the
> sink,
> > >>>> it
> > >>>> would seem just like any other payload tuple and cannot be
> > >>>> distinguished.
> > >>>>
> > >>>> For this reason, I had something like the following in API:
> > >>>>
> > >>>> package com.datatorrent.api;
> > >>>> public class ControlTupleInterface
> > >>>> {
> > >>>>     Object payload; // User control tuple payload. A Long() for
> > example.
> > >>>>     UUID id;  // Unique Id to de-duplicate in downstream operators
> > >>>> }
> > >>>>
> > >>>> Regarding your suggestion on using the Tuple class as the wrapper
> for
> > >>>> the
> > >>>> control tuple payload, let me mention the current scenario flow
to
> > make
> > >>>> the
> > >>>> discussion easier:
> > >>>>
> > >>>> We have a Tuple class in buffer server which is responsible for
> > >>>> serializing
> > >>>> the user control tuple bundling together a message type:
> > >>>> CUSTOM_CONTROL_TUPLE_VALUE.
> > >>>>
> > >>>>
> > >>>> *com.datatorrent.bufferserver.packet.Tuple|--
> > >>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
> > >>>> We have another Tuple class in Stram which helps the
> > >>>> BufferServerSubscriber
> > >>>> to de-serialize the serialized tuples. We should have
> > CustomControlTuple
> > >>>> in
> > >>>> stram as follows:
> > >>>>
> > >>>>
> > >>>> *com.datatorrent.stram.tuple.Tuple|--
> > >>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a
> field
> > >>>> for
> > >>>>
> > >>>> user control payload.
> > >>>>
> > >>>> I think we should not expose the Tuple class in stram to the API.
> That
> > >>>> was
> > >>>> the main reason I introduced another class/interface
> > >>>> ControlTupleInterface
> > >>>> as described above.
> > >>>>
> > >>>> Regarding, adding methods to DefaultInputPort and
> DefaultOutputPort, I
> > >>>> think error detection would not be early enough if the control
tuple
> > is
> > >>>> sent very late in the processing :-)
> > >>>> Extending the ports to ControlTupleAware* should help in this case.
> > >>>> However, we still need to see if there are any downsides on doing
> > this.
> > >>>>
> > >>>> Thanks.
> > >>>>
> > >>>> ~ Bhupesh
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <
> v.rozov@datatorrent.com>
> > >>>> wrote:
> > >>>>
> > >>>> Hi Bhupesh,
> > >>>>
> > >>>>> it should not be a CustomWrapper.  The wrapper object should
be
> > >>>>> CustomControlTuple that extends Tuple. There is already code
that
> > >>>>> checks
> > >>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It
should
> > be
> > >>>>> something like customControlTuple.getPayload() or
> > >>>>> customControlTuple.getAttachment(). In the emitControl(), create
> new
> > >>>>> CustomControlTuple using provided payload as one of arguments.
It
> may
> > >>>>> still
> > >>>>> be good to use common parent other than Object for control
tuple
> > >>>>> payload
> > >>>>> class hierarchy.
> > >>>>>
> > >>>>> I don't understand how adding more methods to the Default
> > >>>>> implementation
> > >>>>> will help with early error detection unless application or
operator
> > >>>>> that
> > >>>>> relies on the custom control tuple functionality explicitly
checks
> > for
> > >>>>> the
> > >>>>> platform version at run-time or tries to emit a control tuple
just
> to
> > >>>>> check
> > >>>>> that such functionality is supported by the platform.
> > >>>>>
> > >>>>> Thank you,
> > >>>>>
> > >>>>> Vlad
> > >>>>>
> > >>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
> > >>>>>
> > >>>>> Hi Vlad.
> > >>>>>
> > >>>>>> Yes, the API should not change. We can take an Object instead,
and
> > >>>>>> later
> > >>>>>> wrap it into the required class.
> > >>>>>>
> > >>>>>> Our InputPort.put and emitControl method would look something
like
> > the
> > >>>>>> following where we handle the wrapping and unwrapping internally.
> > >>>>>>
> > >>>>>> public void put(T tuple)
> > >>>>>> {
> > >>>>>>      if (tuple instanceof CustomWrapper) {
> > >>>>>>        processControl(tuple.unWrap());
> > >>>>>>      }  else {
> > >>>>>>        process(tuple)
> > >>>>>>      }
> > >>>>>> }
> > >>>>>>
> > >>>>>> emitControl(Object tuple)
> > >>>>>> {
> > >>>>>>      sink.put(CustomWrapper.wrap(tuple));
> > >>>>>> }
> > >>>>>>
> > >>>>>> Regarding the compatibility issue, I think we have two
ways of
> doing
> > >>>>>> it:
> > >>>>>>
> > >>>>>>       1. Extend DefaultInputPort and DefaultOutputPort
and create
> > >>>>>>       ControlAwareInput and ControlAwareOutput out of it.
This
> might
> > >>>>>> require us
> > >>>>>>       to additionally handle specific cases when non-compatible
> > ports
> > >>>>>>       (ControlAwareOutput and DefaultInput, for example)
are
> > >>>>>> connected to
> > >>>>>> each
> > >>>>>>       other in user apps.
> > >>>>>>       2. Add the additional methods in the existing Default
> > >>>>>> implementations.
> > >>>>>>
> > >>>>>>
> > >>>>>> IMO, both of these would help in early error detection.
> > >>>>>>
> > >>>>>> ~ Bhupesh
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <
> > v.rozov@datatorrent.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> A wrapper class is required for the control tuples delivery,
but
> > >>>>>>
> > >>>>>> Port/Operator API should use Control Tuple payload object
only.
> > >>>>>>> Implementation of the wrapper class may change from
version to
> > >>>>>>> version,
> > >>>>>>> but
> > >>>>>>> API should not be affected by the change.
> > >>>>>>>
> > >>>>>>> I guess, assumption is that default input and output
port will be
> > >>>>>>> extended
> > >>>>>>> to provide support for the control tuples. This may
cause some
> > >>>>>>> backward
> > >>>>>>> compatibility issues. Consider scenario when a newer
version of
> > >>>>>>> Malhar
> > >>>>>>> that
> > >>>>>>> relies on EOF control tuple is deployed into older
version of
> core
> > >>>>>>> that
> > >>>>>>> does not support control tuples. In such scenario,
error will be
> > >>>>>>> raised
> > >>>>>>> only when an operator tries to emit EOF control tuple
at the end
> > of a
> > >>>>>>> job.
> > >>>>>>> Introducing control tuple aware ports solve the early
error
> > >>>>>>> detection.
> > >>>>>>> It
> > >>>>>>> will require some operators to be modified to use control
tuple
> > aware
> > >>>>>>> ports, but such change may help to distinguish control
tuple
> aware
> > >>>>>>> operators from their old versions.
> > >>>>>>>
> > >>>>>>> Vlad
> > >>>>>>>
> > >>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
> > >>>>>>>
> > >>>>>>> I investigated this and seems like it is better to
have a wrapper
> > >>>>>>> class
> > >>>>>>>
> > >>>>>>> for
> > >>>>>>>> the user object.
> > >>>>>>>> This would serve 2 purposes:
> > >>>>>>>>
> > >>>>>>>>        1. Allow us to distinguish a custom control
tuple from
> > other
> > >>>>>>>> payload
> > >>>>>>>>        tuples.
> > >>>>>>>>        2. For the same control tuple received from
different
> > >>>>>>>> upstream
> > >>>>>>>>
> > >>>>>>>>        partitions, we would have some mechanism
to distinguish
> > >>>>>>>> between
> > >>>>>>>> the
> > >>>>>>>> two in
> > >>>>>>>>        order to identify duplicates.
> > >>>>>>>>
> > >>>>>>>> Additionally, the wrapper class needs to be part
of the API as
> > >>>>>>>> DefaultOutputPort needs to know about it, before
putting it into
> > the
> > >>>>>>>> sink.
> > >>>>>>>> We can make sure that the user is not able to extend
or modify
> > this
> > >>>>>>>> class
> > >>>>>>>> in any manner.
> > >>>>>>>>
> > >>>>>>>> ~ Bhupesh
> > >>>>>>>>
> > >>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <davidyan@gmail.com
> >
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> This C type parameter is going to fix the control
tuple type at
> > >>>>>>>> compile
> > >>>>>>>>
> > >>>>>>>> time and this is actually not what we want. Note
that the
> operator
> > >>>>>>>> may
> > >>>>>>>>
> > >>>>>>>>> receive or emit multiple different control
tuple types.
> > >>>>>>>>>
> > >>>>>>>>> David
> > >>>>>>>>>
> > >>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <
> tushar@datatorrent.com
> > >
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> We do not need to create an interface for data
emitted through
> > >>>>>>>>> emitControl or processed through processControl.
Internally we
> > >>>>>>>>> could
> > >>>>>>>>> wrap the user object in ControlTuple. you can
add type
> parameter
> > >>>>>>>>> for
> > >>>>>>>>> control tuple object on ports.
> > >>>>>>>>>
> > >>>>>>>>> DefaultInputPort<D,C>
> > >>>>>>>>> D is the data type and C is the control tuple
type for better
> > error
> > >>>>>>>>> catching at compile phase.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> - Tushar.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda
<
> > >>>>>>>>> bhupesh@datatorrent.com
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Agreed Vlad and David.
> > >>>>>>>>>
> > >>>>>>>>> I am just suggesting there should be a wrapper
for the user
> > object.
> > >>>>>>>>>> It
> > >>>>>>>>>>
> > >>>>>>>>>> can
> > >>>>>>>>>>
> > >>>>>>>>>> be a marker interface and we can call it
something else like
> > >>>>>>>>>
> > >>>>>>>>> "CustomControl".
> > >>>>>>>>>>
> > >>>>>>>>>> The user object will be wrapped in another
class
> "ControlTuple"
> > >>>>>>>>>> which
> > >>>>>>>>>> traverses the BufferServer and will perhaps
be extended from
> the
> > >>>>>>>>>> packet/Tuple class. This class will not
be exposed to the
> user.
> > >>>>>>>>>>
> > >>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov
<
> > >>>>>>>>>> v.rozov@datatorrent.com>
> > >>>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> I agree with David. Payload of the control
tuple is in the
> > >>>>>>>>> userObject
> > >>>>>>>>>
> > >>>>>>>>> and
> > >>>>>>>>>> operators/ports don't need to be exposed
to the implementation
> > of
> > >>>>>>>>>> the
> > >>>>>>>>>>
> > >>>>>>>>>> ControlTuple class. With the proposed interface
operators
> > >>>>>>>>>> developers
> > >>>>>>>>>>
> > >>>>>>>>>>> are
> > >>>>>>>>>>> free to extend ControlTuple further
and I don't think that
> such
> > >>>>>>>>>>>
> > >>>>>>>>>>> capability
> > >>>>>>>>>>>
> > >>>>>>>>>>> needs to be provided. The wrapping
in the ControlTuple class
> is
> > >>>>>>>>>> necessary
> > >>>>>>>>>> and most likely ControlTuple needs to be
extended from the
> > buffer
> > >>>>>>>>>> server
> > >>>>>>>>>>
> > >>>>>>>>>> Tuple. It may be good to have a common
parent other than
> Object
> > >>>>>>>>>> for
> > >>>>>>>>>>
> > >>>>>>>>>>> all
> > >>>>>>>>>>> user payloads, but it may be a marker
interface as well.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thank you,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Vlad
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi David,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Actually, I was thinking of another
API class called
> > >>>>>>>>>>> ControlTuple,
> > >>>>>>>>>>>
> > >>>>>>>>>>>> different from the actual tuple
class in buffer server or
> > stram.
> > >>>>>>>>>>>> This could serve as a way for the
Buffer server publisher to
> > >>>>>>>>>>>> understand
> > >>>>>>>>>>>> that it is a control tuple and
needs to be wrapped
> > differently.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan"
<davidyan@gmail.com>
> > wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>       // DefaultInputPort
> > >>>>>>>>>>>>        public void processControl(ControlTuple
tuple)
> > >>>>>>>>>>>>        {
> > >>>>>>>>>>>>          // Default Implementation
to avoid need to
> implement
> > >>>>>>>>>>>> it in
> > >>>>>>>>>>>> all
> > >>>>>>>>>>>> implementations
> > >>>>>>>>>>>>        }
> > >>>>>>>>>>>> {code}
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> {code}
> > >>>>>>>>>>>>       // DefaultOutputPort
> > >>>>>>>>>>>>        public void emitControl(ControlTuple
tuple)
> > >>>>>>>>>>>>        {
> > >>>>>>>>>>>>        }
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I think we don't need to expose
the ControlTuple class to
> the
> > >>>>>>>>>>>> operator
> > >>>>>>>>>>>> developers because the window ID
is just the current window
> ID
> > >>>>>>>>>>>> when
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> these
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> methods are called. How about making
them just Object? We
> also
> > >>>>>>>>>>>
> > >>>>>>>>>> need to
> > >>>>>>>>>>
> > >>>>>>>>>> provide the way for the user to specify
custom serializer for
> > the
> > >>>>>>>>>>
> > >>>>>>>>>>> control
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> tuple.
> > >>>>>>>>>>>
> > >>>>>>>>>> David
> > >>>>>>>>>>
> > >>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh
Chawda <
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> bhupesh@datatorrent.com
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>> Hi All,
> > >>>>>>>>>>
> > >>>>>>>>>>> Here are the initial interfaces:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> {code}
> > >>>>>>>>>>>>>       // DefaultInputPort
> > >>>>>>>>>>>>>        public void processControl(ControlTuple
tuple)
> > >>>>>>>>>>>>>        {
> > >>>>>>>>>>>>>          // Default Implementation
to avoid need to
> implement
> > >>>>>>>>>>>>> it
> > >>>>>>>>>>>>> in
> > >>>>>>>>>>>>> all
> > >>>>>>>>>>>>> implementations
> > >>>>>>>>>>>>>        }
> > >>>>>>>>>>>>> {code}
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> {code}
> > >>>>>>>>>>>>>       // DefaultOutputPort
> > >>>>>>>>>>>>>        public void emitControl(ControlTuple
tuple)
> > >>>>>>>>>>>>>        {
> > >>>>>>>>>>>>>        }
> > >>>>>>>>>>>>> {code}
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> We have an option to add these
methods to the interfaces -
> > >>>>>>>>>>>>> InputPort
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> OutputPort; But these would
not be backward compatible and
> > also
> > >>>>>>>>>>>>
> > >>>>>>>>>>> not
> > >>>>>>>>>>> consistent with the current implementation
of basic data
> tuple
> > >>>>>>>>>>> flow
> > >>>>>>>>>>>
> > >>>>>>>>>>> (as
> > >>>>>>>>>>>> with process() and emit()).
> > >>>>>>>>>>>>
> > >>>>>>>>>>> We also need to expose an interface
/ class for users to wrap
> > >>>>>>>>>>> their
> > >>>>>>>>>>>
> > >>>>>>>>>>> object
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> and emit downstream. This should
be part of API.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> {code}
> > >>>>>>>>>>>>> public class ControlTuple extends
Tuple
> > >>>>>>>>>>>>> {
> > >>>>>>>>>>>>>        Object userObject;
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>        public ControlTuple(long
windowId, Object
> userObject)
> > >>>>>>>>>>>>>        {
> > >>>>>>>>>>>>>          //
> > >>>>>>>>>>>>>        }
> > >>>>>>>>>>>>> }
> > >>>>>>>>>>>>> {code}
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The emitted tuples would traverse
the same flow as with
> other
> > >>>>>>>>>>>>> control
> > >>>>>>>>>>>>> tuples. The plan is to intercept
the control tuples in
> > >>>>>>>>>>>>> GenericNode
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>> use
> > >>>>>>>>>>>>> the Reservior to emit the control
tuples at the end of the
> > >>>>>>>>>>>>> window.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> GenericNode seems to be the
best place to buffer incoming
> > >>>>>>>>>>>>> custom
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> control
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> tuples without delivering them
immediately to the operator
> > >>>>>>>>>>>> port.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> Once
> > >>>>>>>>>>> the
> > >>>>>>>>>>> end of the window is reached, we plan
to use the reservoir
> sink
> > >>>>>>>>>>> to
> > >>>>>>>>>>> push
> > >>>>>>>>>>> them to the port. This is different
behavior than any other
> > >>>>>>>>>>> control
> > >>>>>>>>>>> tuple
> > >>>>>>>>>>> where we are changing the order of
tuples in the stream. The
> > >>>>>>>>>>> custom
> > >>>>>>>>>>> control
> > >>>>>>>>>>>
> > >>>>>>>>>>> tuples will be buffered and not delivered
to the ports until
> > the
> > >>>>>>>>>>>> end
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>
> > >>>>>>>>>>> window.
> > >>>>>>>>>>>
> > >>>>>>>>>>> To accomplish this, we need to have
a public method in
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> SweepableReservoir
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> which allows to put a tuple
back in the sink of the
> > reservoir.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message