apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhupesh Chawda <bhup...@datatorrent.com>
Subject Re: [DISCUSS] Custom Control Tuples Design
Date Wed, 21 Dec 2016 12:58:35 GMT
Hi Vlad.

Yes, the API should not change. We can take an Object instead, and later
wrap it into the required class.

Our InputPort.put and emitControl method would look something like the
following where we handle the wrapping and unwrapping internally.

public void put(T tuple)
{
  if (tuple instanceof CustomWrapper) {
    processControl(tuple.unWrap());
  }  else {
    process(tuple)
  }
}

emitControl(Object tuple)
{
  sink.put(CustomWrapper.wrap(tuple));
}

Regarding the compatibility issue, I think we have two ways of doing it:

   1. Extend DefaultInputPort and DefaultOutputPort and create
   ControlAwareInput and ControlAwareOutput out of it. This might require us
   to additionally handle specific cases when non-compatible ports
   (ControlAwareOutput and DefaultInput, for example) are connected to each
   other in user apps.
   2. Add the additional methods in the existing Default implementations.

IMO, both of these would help in early error detection.

~ Bhupesh




On Wed, Dec 21, 2016 at 1:36 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> A wrapper class is required for the control tuples delivery, but
> Port/Operator API should use Control Tuple payload object only.
> Implementation of the wrapper class may change from version to version, but
> API should not be affected by the change.
>
> I guess, assumption is that default input and output port will be extended
> to provide support for the control tuples. This may cause some backward
> compatibility issues. Consider scenario when a newer version of Malhar that
> relies on EOF control tuple is deployed into older version of core that
> does not support control tuples. In such scenario, error will be raised
> only when an operator tries to emit EOF control tuple at the end of a job.
> Introducing control tuple aware ports solve the early error detection. It
> will require some operators to be modified to use control tuple aware
> ports, but such change may help to distinguish control tuple aware
> operators from their old versions.
>
> Vlad
>
> On 12/20/16 04:09, Bhupesh Chawda wrote:
>
>> I investigated this and seems like it is better to have a wrapper class
>> for
>> the user object.
>> This would serve 2 purposes:
>>
>>     1. Allow us to distinguish a custom control tuple from other payload
>>     tuples.
>>     2. For the same control tuple received from different upstream
>>
>>     partitions, we would have some mechanism to distinguish between the
>> two in
>>     order to identify duplicates.
>>
>> Additionally, the wrapper class needs to be part of the API as
>> DefaultOutputPort needs to know about it, before putting it into the sink.
>> We can make sure that the user is not able to extend or modify this class
>> in any manner.
>>
>> ~ Bhupesh
>>
>> On Mon, Dec 19, 2016 at 12:18 PM, David Yan <davidyan@gmail.com> wrote:
>>
>> This C type parameter is going to fix the control tuple type at compile
>>> time and this is actually not what we want. Note that the operator may
>>> receive or emit multiple different control tuple types.
>>>
>>> David
>>>
>>> On Dec 17, 2016 3:33 AM, "Tushar Gosavi" <tushar@datatorrent.com> wrote:
>>>
>>> We do not need to create an interface for data emitted through
>>> emitControl or processed through processControl. Internally we could
>>> wrap the user object in ControlTuple. you can add type parameter for
>>> control tuple object on ports.
>>>
>>> DefaultInputPort<D,C>
>>> D is the data type and C is the control tuple type for better error
>>> catching at compile phase.
>>>
>>>
>>> - Tushar.
>>>
>>>
>>> On Sat, Dec 17, 2016 at 8:35 AM, Bhupesh Chawda <bhupesh@datatorrent.com
>>> >
>>> wrote:
>>>
>>>> Agreed Vlad and David.
>>>> I am just suggesting there should be a wrapper for the user object. It
>>>>
>>> can
>>>
>>>> be a marker interface and we can call it something else like
>>>> "CustomControl".
>>>>
>>>> The user object will be wrapped in another class "ControlTuple" which
>>>> traverses the BufferServer and will perhaps be extended from the
>>>> packet/Tuple class. This class will not be exposed to the user.
>>>>
>>>> ~ Bhupesh
>>>>
>>>>
>>>> On Sat, Dec 17, 2016 at 4:11 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>
>>> wrote:
>>>
>>>> I agree with David. Payload of the control tuple is in the userObject
>>>>>
>>>> and
>>>
>>>> operators/ports don't need to be exposed to the implementation of the
>>>>> ControlTuple class. With the proposed interface operators developers
>>>>> are
>>>>> free to extend ControlTuple further and I don't think that such
>>>>>
>>>> capability
>>>
>>>> needs to be provided. The wrapping in the ControlTuple class is
>>>>>
>>>> necessary
>>>
>>>> and most likely ControlTuple needs to be extended from the buffer server
>>>>> Tuple. It may be good to have a common parent other than Object for all
>>>>> user payloads, but it may be a marker interface as well.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 12/16/16 09:59, Bhupesh Chawda wrote:
>>>>>
>>>>> Hi David,
>>>>>>
>>>>>> Actually, I was thinking of another API class called ControlTuple,
>>>>>> different from the actual tuple class in buffer server or stram.
>>>>>> This could serve as a way for the Buffer server publisher to
>>>>>> understand
>>>>>> that it is a control tuple and needs to be wrapped differently.
>>>>>>
>>>>>> ~ Bhupesh
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Dec 16, 2016 22:28, "David Yan" <davidyan@gmail.com> wrote:
>>>>>>
>>>>>>    // DefaultInputPort
>>>>>>     public void processControl(ControlTuple tuple)
>>>>>>     {
>>>>>>       // Default Implementation to avoid need to implement it in
all
>>>>>> implementations
>>>>>>     }
>>>>>> {code}
>>>>>>
>>>>>> {code}
>>>>>>    // DefaultOutputPort
>>>>>>     public void emitControl(ControlTuple tuple)
>>>>>>     {
>>>>>>     }
>>>>>>
>>>>>> I think we don't need to expose the ControlTuple class to the operator
>>>>>> developers because the window ID is just the current window ID when
>>>>>>
>>>>> these
>>>
>>>> methods are called. How about making them just Object? We also need to
>>>>>> provide the way for the user to specify custom serializer for the
>>>>>>
>>>>> control
>>>
>>>> tuple.
>>>>>>
>>>>>> David
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 15, 2016 at 12:43 AM, Bhupesh Chawda <
>>>>>>
>>>>> bhupesh@datatorrent.com
>>>
>>>> wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>> Here are the initial interfaces:
>>>>>>>
>>>>>>> {code}
>>>>>>>    // DefaultInputPort
>>>>>>>     public void processControl(ControlTuple tuple)
>>>>>>>     {
>>>>>>>       // Default Implementation to avoid need to implement it
in all
>>>>>>> implementations
>>>>>>>     }
>>>>>>> {code}
>>>>>>>
>>>>>>> {code}
>>>>>>>    // DefaultOutputPort
>>>>>>>     public void emitControl(ControlTuple tuple)
>>>>>>>     {
>>>>>>>     }
>>>>>>> {code}
>>>>>>>
>>>>>>> We have an option to add these methods to the interfaces - InputPort
>>>>>>>
>>>>>> and
>>>
>>>> OutputPort; But these would not be backward compatible and also not
>>>>>>> consistent with the current implementation of basic data tuple
flow
>>>>>>>
>>>>>> (as
>>>
>>>> with process() and emit()).
>>>>>>>
>>>>>>> We also need to expose an interface / class for users to wrap
their
>>>>>>> object
>>>>>>> and emit downstream. This should be part of API.
>>>>>>>
>>>>>>> {code}
>>>>>>> public class ControlTuple extends Tuple
>>>>>>> {
>>>>>>>     Object userObject;
>>>>>>>
>>>>>>>     public ControlTuple(long windowId, Object userObject)
>>>>>>>     {
>>>>>>>       //
>>>>>>>     }
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> The emitted tuples would traverse the same flow as with other
control
>>>>>>> tuples. The plan is to intercept the control tuples in GenericNode
>>>>>>> and
>>>>>>> use
>>>>>>> the Reservior to emit the control tuples at the end of the window.
>>>>>>>
>>>>>>> GenericNode seems to be the best place to buffer incoming custom
>>>>>>>
>>>>>> control
>>>
>>>> tuples without delivering them immediately to the operator port. Once
>>>>>>>
>>>>>> the
>>>
>>>> end of the window is reached, we plan to use the reservoir sink to
>>>>>>>
>>>>>> push
>>>
>>>> them to the port. This is different behavior than any other control
>>>>>>>
>>>>>> tuple
>>>
>>>> where we are changing the order of tuples in the stream. The custom
>>>>>>>
>>>>>>> control
>>>>>>
>>>>>> tuples will be buffered and not delivered to the ports until the
end
>>>>>>>
>>>>>> of
>>>
>>>> the
>>>>>>
>>>>>> window.
>>>>>>> To accomplish this, we need to have a public method in
>>>>>>>
>>>>>> SweepableReservoir
>>>
>>>> which allows to put a tuple back in the sink of the reservoir.
>>>>>>>
>>>>>>> ~ Bhupesh
>>>>>>>
>>>>>>>
>>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message