apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Thu, 29 Dec 2016 00:54:52 GMT
Custom control tuples are control tuples emitted by an operator itself 
and not by the platform. Prior to the introduction of the custom control 
tuples, only Apex engine itself puts control tuples into various sinks, 
so the engine created necessary Tuple objects with the corresponding 
type prior to calling Sink.put().

Not all sinks need to be changed. Only control tuple aware sinks should 
provide such functionality. In the case there is a lot of code 
duplication, please create an abstract class, that other control aware 
sinks will extend from.

Thank you,

Vlad

On 12/23/16 06:24, Bhupesh Chawda wrote:
> Hi Vlad,
>
> Thanks for the pointer on delegating the wrapping of the user tuple to the
> control port. I was trying this out today.
> The problem I see us if we introduce a putControlTuple() method in Sink,
> then a lot of the existing sinks would change. Also the changes seemed
> redundant as, the existing control tuples already use the put() method of
> sinks. So why do something special for custom control tuples?
>
> The only aspect in which the custom control tuples are different is that
> these will be generated by the user and will actually be delivered to the
> ports in a different order. Perhaps we should be able to use the existing
> flow. The only problems as outlined before seem to be identification of the
> user tuple as a control tuple.
>
> ~ Bhupesh
>
>
> On Thu, Dec 22, 2016 at 10:44 PM, Vlad Rozov <v.rozov@datatorrent.com>
> wrote:
>
>> Why is it necessary to wrap in the OutputPort? Can't it be delegated to a
>> Sink by introducing new putControlTuple method?
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 12/21/16 22:10, Bhupesh Chawda wrote:
>>
>>> Hi Vlad,
>>>
>>> The problem in using the Tuple class as the wrapper is that the Ports
>>> belong to the API and we want to wrap the payload object of the control
>>> tuple into the Tuple class which is not part of the API.
>>>
>>> The output port will just get the payload of the user control tuple. For
>>> example, if the user emits a Long, as a control tuple, the payload object
>>> will just be a Long object.
>>>
>>> It is necessary to bundle this Long into some recognizable object so that
>>> the BufferServerPublisher knows that this is a Control tuple and not a
>>> regular tuple and serialize it accordingly. It is therefore necessary that
>>> the tuple be part of some known hierarchy so that can be distinguished
>>> from
>>> other payload tuples. Let us call this class ControlTupleInterface. Note
>>> that this needs to be done before the tuple is inserted into the sink
>>> which
>>> is done in the port objects. Once the tuple is inserted into the sink, it
>>> would seem just like any other payload tuple and cannot be distinguished.
>>>
>>> For this reason, I had something like the following in API:
>>>
>>> package com.datatorrent.api;
>>> public class ControlTupleInterface
>>> {
>>>     Object payload; // User control tuple payload. A Long() for example.
>>>     UUID id;  // Unique Id to de-duplicate in downstream operators
>>> }
>>>
>>> Regarding your suggestion on using the Tuple class as the wrapper for the
>>> control tuple payload, let me mention the current scenario flow to make
>>> the
>>> discussion easier:
>>>
>>> We have a Tuple class in buffer server which is responsible for
>>> serializing
>>> the user control tuple bundling together a message type:
>>> CUSTOM_CONTROL_TUPLE_VALUE.
>>>
>>>
>>> *com.datatorrent.bufferserver.packet.Tuple|--
>>> com.datatorrent.bufferserver.packet.CustomControlTuple*
>>> We have another Tuple class in Stram which helps the
>>> BufferServerSubscriber
>>> to de-serialize the serialized tuples. We should have CustomControlTuple
>>> in
>>> stram as follows:
>>>
>>>
>>> *com.datatorrent.stram.tuple.Tuple|--
>>> com.datatorrent.stram.tuple.CustomControlTuple*This will have a field for
>>>
>>> user control payload.
>>>
>>> I think we should not expose the Tuple class in stram to the API. That was
>>> the main reason I introduced another class/interface ControlTupleInterface
>>> as described above.
>>>
>>> Regarding, adding methods to DefaultInputPort and DefaultOutputPort, I
>>> think error detection would not be early enough if the control tuple is
>>> sent very late in the processing :-)
>>> Extending the ports to ControlTupleAware* should help in this case.
>>> However, we still need to see if there are any downsides on doing this.
>>>
>>> Thanks.
>>>
>>> ~ Bhupesh
>>>
>>>
>>>
>>>
>>> On Thu, Dec 22, 2016 at 7:26 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>>
>>> Hi Bhupesh,
>>>> it should not be a CustomWrapper.  The wrapper object should be
>>>> CustomControlTuple that extends Tuple. There is already code that checks
>>>> for Tuple instance. The "unWrap" name is misleading, IMO. It should be
>>>> something like customControlTuple.getPayload() or
>>>> customControlTuple.getAttachment(). In the emitControl(), create new
>>>> CustomControlTuple using provided payload as one of arguments. It may
>>>> still
>>>> be good to use common parent other than Object for control tuple payload
>>>> class hierarchy.
>>>>
>>>> I don't understand how adding more methods to the Default implementation
>>>> will help with early error detection unless application or operator that
>>>> relies on the custom control tuple functionality explicitly checks for
>>>> the
>>>> platform version at run-time or tries to emit a control tuple just to
>>>> check
>>>> that such functionality is supported by the platform.
>>>>
>>>> Thank you,
>>>>
>>>> Vlad
>>>>
>>>> On 12/21/16 04:58, Bhupesh Chawda wrote:
>>>>
>>>> Hi Vlad.
>>>>> Yes, the API should not change. We can take an Object instead, and later
>>>>> wrap it into the required class.
>>>>>
>>>>> Our InputPort.put and emitControl method would look something like the
>>>>> following where we handle the wrapping and unwrapping internally.
>>>>>
>>>>> public void put(T tuple)
>>>>> {
>>>>>      if (tuple instanceof CustomWrapper) {
>>>>>        processControl(tuple.unWrap());
>>>>>      }  else {
>>>>>        process(tuple)
>>>>>      }
>>>>> }
>>>>>
>>>>> emitControl(Object tuple)
>>>>> {
>>>>>      sink.put(CustomWrapper.wrap(tuple));
>>>>> }
>>>>>
>>>>> Regarding the compatibility issue, I think we have two ways of doing
it:
>>>>>
>>>>>       1. Extend DefaultInputPort and DefaultOutputPort and create
>>>>>       ControlAwareInput and ControlAwareOutput out of it. This might
>>>>> require us
>>>>>       to additionally handle specific cases when non-compatible ports
>>>>>       (ControlAwareOutput and DefaultInput, for example) are connected
to
>>>>> each
>>>>>       other in user apps.
>>>>>       2. Add the additional methods in the existing Default
>>>>> implementations.
>>>>>
>>>>>
>>>>> IMO, both of these would help in early error detection.
>>>>>
>>>>> ~ Bhupesh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> A wrapper class is required for the control tuples delivery, but
>>>>>
>>>>>> Port/Operator API should use Control Tuple payload object only.
>>>>>> Implementation of the wrapper class may change from version to version,
>>>>>> but
>>>>>> API should not be affected by the change.
>>>>>>
>>>>>> I guess, assumption is that default input and output port will be
>>>>>> extended
>>>>>> to provide support for the control tuples. This may cause some backward
>>>>>> compatibility issues. Consider scenario when a newer version of Malhar
>>>>>> that
>>>>>> relies on EOF control tuple is deployed into older version of core
that
>>>>>> does not support control tuples. In such scenario, error will be
raised
>>>>>> only when an operator tries to emit EOF control tuple at the end
of a
>>>>>> job.
>>>>>> Introducing control tuple aware ports solve the early error detection.
>>>>>> It
>>>>>> will require some operators to be modified to use control tuple aware
>>>>>> ports, but such change may help to distinguish control tuple aware
>>>>>> operators from their old versions.
>>>>>>
>>>>>> Vlad
>>>>>>
>>>>>> On 12/20/16 04:09, Bhupesh Chawda wrote:
>>>>>>
>>>>>> I investigated this and seems like it is better to have a wrapper
class
>>>>>>
>>>>>>> for
>>>>>>> the user object.
>>>>>>> This would serve 2 purposes:
>>>>>>>
>>>>>>>        1. Allow us to distinguish a custom control tuple from
other
>>>>>>> payload
>>>>>>>        tuples.
>>>>>>>        2. For the same control tuple received from different
upstream
>>>>>>>
>>>>>>>        partitions, we would have some mechanism to distinguish
between
>>>>>>> the
>>>>>>> two in
>>>>>>>        order to identify duplicates.
>>>>>>>
>>>>>>> Additionally, the wrapper class needs to be part of the API as
>>>>>>> DefaultOutputPort needs to know about it, before putting it into
the
>>>>>>> sink.
>>>>>>> We can make sure that the user is not able to extend or modify
this
>>>>>>> class
>>>>>>> in any manner.
>>>>>>>
>>>>>>> ~ Bhupesh
>>>>>>>
>>>>>>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <davidyan@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> This C type parameter is going to fix the control tuple type
at
>>>>>>> compile
>>>>>>>
>>>>>>> time and this is actually not what we want. Note that the operator
may
>>>>>>>> receive or emit multiple different control tuple types.
>>>>>>>>
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <tushar@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> We do not need to create an interface for data emitted through
>>>>>>>> emitControl or processed through processControl. Internally
we could
>>>>>>>> wrap the user object in ControlTuple. you can add type parameter
for
>>>>>>>> control tuple object on ports.
>>>>>>>>
>>>>>>>> DefaultInputPort<D,C>
>>>>>>>> D is the data type and C is the control tuple type for better
error
>>>>>>>> catching at compile phase.
>>>>>>>>
>>>>>>>>
>>>>>>>> - Tushar.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <
>>>>>>>> bhupesh@datatorrent.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Agreed Vlad and David.
>>>>>>>>
>>>>>>>>> I am just suggesting there should be a wrapper for the
user object.
>>>>>>>>> It
>>>>>>>>>
>>>>>>>>> can
>>>>>>>>>
>>>>>>>> be a marker interface and we can call it something else like
>>>>>>>>
>>>>>>>>> "CustomControl".
>>>>>>>>>
>>>>>>>>> The user object will be wrapped in another class "ControlTuple"
>>>>>>>>> which
>>>>>>>>> traverses the BufferServer and will perhaps be extended
from the
>>>>>>>>> packet/Tuple class. This class will not be exposed to
the user.
>>>>>>>>>
>>>>>>>>> ~ Bhupesh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <
>>>>>>>>> v.rozov@datatorrent.com>
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>> I agree with David. Payload of the control tuple is in the
userObject
>>>>>>>>
>>>>>>>>> and
>>>>>>>>> operators/ports don't need to be exposed to the implementation
of
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>> ControlTuple class. With the proposed interface operators
developers
>>>>>>>>>> are
>>>>>>>>>> free to extend ControlTuple further and I don't think
that such
>>>>>>>>>>
>>>>>>>>>> capability
>>>>>>>>>>
>>>>>>>>> needs to be provided. The wrapping in the ControlTuple
class is
>>>>>>>>> necessary
>>>>>>>>> and most likely ControlTuple needs to be extended from
the buffer
>>>>>>>>> server
>>>>>>>>>
>>>>>>>>> Tuple. It may be good to have a common parent other than
Object for
>>>>>>>>>> all
>>>>>>>>>> user payloads, but it may be a marker interface as
well.
>>>>>>>>>>
>>>>>>>>>> Thank you,
>>>>>>>>>>
>>>>>>>>>> Vlad
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
>>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> Actually, I was thinking of another API class called
ControlTuple,
>>>>>>>>>>> different from the actual tuple class in buffer
server or stram.
>>>>>>>>>>> This could serve as a way for the Buffer server
publisher to
>>>>>>>>>>> understand
>>>>>>>>>>> that it is a control tuple and needs to be wrapped
differently.
>>>>>>>>>>>
>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Dec 16, 2016 22:28, "David Yan" <davidyan@gmail.com>
wrote:
>>>>>>>>>>>
>>>>>>>>>>>       // DefaultInputPort
>>>>>>>>>>>        public void processControl(ControlTuple
tuple)
>>>>>>>>>>>        {
>>>>>>>>>>>          // Default Implementation to avoid need
to implement it in
>>>>>>>>>>> all
>>>>>>>>>>> implementations
>>>>>>>>>>>        }
>>>>>>>>>>> {code}
>>>>>>>>>>>
>>>>>>>>>>> {code}
>>>>>>>>>>>       // DefaultOutputPort
>>>>>>>>>>>        public void emitControl(ControlTuple tuple)
>>>>>>>>>>>        {
>>>>>>>>>>>        }
>>>>>>>>>>>
>>>>>>>>>>> I think we don't need to expose the ControlTuple
class to the
>>>>>>>>>>> operator
>>>>>>>>>>> developers because the window ID is just the
current window ID
>>>>>>>>>>> when
>>>>>>>>>>>
>>>>>>>>>>> these
>>>>>>>>>>>
>>>>>>>>>> methods are called. How about making them just Object?
We also
>>>>>>>>> need to
>>>>>>>>>
>>>>>>>>> provide the way for the user to specify custom serializer
for the
>>>>>>>>>>> control
>>>>>>>>>>>
>>>>>>>>>> tuple.
>>>>>>>>> David
>>>>>>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda
<
>>>>>>>>>>>
>>>>>>>>>>> bhupesh@datatorrent.com
>>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>> Hi All,
>>>>>>>>>>> Here are the initial interfaces:
>>>>>>>>>>>
>>>>>>>>>>>> {code}
>>>>>>>>>>>>       // DefaultInputPort
>>>>>>>>>>>>        public void processControl(ControlTuple
tuple)
>>>>>>>>>>>>        {
>>>>>>>>>>>>          // Default Implementation to avoid
need to implement it
>>>>>>>>>>>> in
>>>>>>>>>>>> all
>>>>>>>>>>>> implementations
>>>>>>>>>>>>        }
>>>>>>>>>>>> {code}
>>>>>>>>>>>>
>>>>>>>>>>>> {code}
>>>>>>>>>>>>       // DefaultOutputPort
>>>>>>>>>>>>        public void emitControl(ControlTuple
tuple)
>>>>>>>>>>>>        {
>>>>>>>>>>>>        }
>>>>>>>>>>>> {code}
>>>>>>>>>>>>
>>>>>>>>>>>> We have an option to add these methods to
the interfaces -
>>>>>>>>>>>> InputPort
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>
>>>>>>>>>>> OutputPort; But these would not be backward compatible
and also
>>>>>>>>>> not
>>>>>>>>>> consistent with the current implementation of basic
data tuple flow
>>>>>>>>>>
>>>>>>>>>>> (as
>>>>>>>>>>> with process() and emit()).
>>>>>>>>>> We also need to expose an interface / class for users
to wrap their
>>>>>>>>>>
>>>>>>>>>>> object
>>>>>>>>>>>> and emit downstream. This should be part
of API.
>>>>>>>>>>>>
>>>>>>>>>>>> {code}
>>>>>>>>>>>> public class ControlTuple extends Tuple
>>>>>>>>>>>> {
>>>>>>>>>>>>        Object userObject;
>>>>>>>>>>>>
>>>>>>>>>>>>        public ControlTuple(long windowId,
Object userObject)
>>>>>>>>>>>>        {
>>>>>>>>>>>>          //
>>>>>>>>>>>>        }
>>>>>>>>>>>> }
>>>>>>>>>>>> {code}
>>>>>>>>>>>>
>>>>>>>>>>>> The emitted tuples would traverse the same
flow as with other
>>>>>>>>>>>> control
>>>>>>>>>>>> tuples. The plan is to intercept the control
tuples in
>>>>>>>>>>>> GenericNode
>>>>>>>>>>>> and
>>>>>>>>>>>> use
>>>>>>>>>>>> the Reservior to emit the control tuples
at the end of the
>>>>>>>>>>>> window.
>>>>>>>>>>>>
>>>>>>>>>>>> GenericNode seems to be the best place to
buffer incoming custom
>>>>>>>>>>>>
>>>>>>>>>>>> control
>>>>>>>>>>>>
>>>>>>>>>>> tuples without delivering them immediately to
the operator port.
>>>>>>>>>> Once
>>>>>>>>>> the
>>>>>>>>>> end of the window is reached, we plan to use the
reservoir sink to
>>>>>>>>>> push
>>>>>>>>>> them to the port. This is different behavior than
any other control
>>>>>>>>>> tuple
>>>>>>>>>> where we are changing the order of tuples in the
stream. The custom
>>>>>>>>>> control
>>>>>>>>>>
>>>>>>>>>>> tuples will be buffered and not delivered to
the ports until the
>>>>>>>>>>> end
>>>>>>>>>>> of
>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>> window.
>>>>>>>>>>
>>>>>>>>>>> To accomplish this, we need to have a public
method in
>>>>>>>>>>>> SweepableReservoir
>>>>>>>>>>>>
>>>>>>>>>>> which allows to put a tuple back in the sink
of the reservoir.
>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>


Mime
View raw message