apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 14:02:03 GMT
Right, I did not mean that APEXCORE-408 can not support in-memory 
streams, my point was that it is preferable to pass data between stages 
by not using disk/HDFS and that use cases for the proposal and 
APEXCORE-408 are the same or very close to each other.

I filed https://issues.apache.org/jira/browse/APEXCORE-696.

Thank you,


On 4/9/17 22:23, Tushar Gosavi wrote:
> Hi Vlad,
> you can avoid writing data files, if intermediate data can fit in memory
> with APEXCORE-408. The api allows extending the DAG by adding streams and
> operators to existing DAG. In the example you have provided user could
> extend a new dag after aggregate operator by adding new output operator and
> connecting it with aggregate.
> - Tushar.
> On Fri, Apr 7, 2017 at 6:22 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
>> It is exactly the same use case with the exception that it is not
>> necessary to write data to files. Consider 3 operators, an input operator,
>> an aggregate operator and an output operator. When the application starts,
>> the output port of the aggregate operator should be in the closed state,
>> the stream between the second and the third would be inactive and the
>> output operator does not need to be allocated. After the input operator
>> process all data, it can close the output port and the input operator may
>> be de-allocated. Once the aggregator receives EOS on it's input port, it
>> should open the output port and start writing to it. At this point, the
>> output operator needs to be deployed and the stream between the last two
>> operators (aggregator and output) becomes active.
>> In a real batch use case, it is preferable to have full application DAG to
>> be statically defined and delegate to platform activation/de-activation of
>> stages. It is also preferable not to write intermediate files to disk/HDFS,
>> but instead pass data in-memory.
>> Thank you,
>> Vlad
>> On 4/6/17 09:37, Thomas Weise wrote:
>>> You would need to provide more specifics of the use case you are thinking
>>> to address to make this a meaningful discussion.
>>> An example for APEXCORE-408 (based on real batch use case): I have two
>>> stages, first stage produces a set of files that second stage needs as
>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>> when
>>> stage 2 starts. These can be independent operators, they don't need to be
>>> connected through a stream.
>>> Thomas
>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>> address the same use case - how to re-allocate resources for batch
>>>> applications or applications where processing happens in stages. The
>>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>>> from application logic to the platform. IMO, supporting batch
>>>> applications
>>>> using APEXCORE-408 will require more coding on the application side.
>>>> Thank you,
>>>> Vlad
>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>> I think this needs more input on a use case level. The ability to
>>>>> dynamically alter the DAG internally will also address the resource
>>>>> allocation for operators:
>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>> It can be used to implement stages of a batch pipeline and is very
>>>>> flexible
>>>>> in general. Considering the likely implementation complexity for the
>>>>> proposed feature I would like to understand what benefits it provides
>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>> Thanks,
>>>>> Thomas
>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>> Correct, a statefull downstream operator can only be undeployed at a
>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>> operator
>>>>>> on the closed port.
>>>>>> It will be necessary to distinguish between closed port and inactive
>>>>>> stream. After port is closed, stream may still be active and after
>>>>>> is
>>>>>> open, stream may still be inactive (not yet ready).
>>>>>> The more contributors participate in the discussion and implementation,
>>>>>> the more solid the feature will be.
>>>>>> Thank you,
>>>>>> Vlad
>>>>>> Отправлено с iPhone
>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pramod@datatorrent.com>
>>>>>> wrote:
>>>>>> Generally a good idea. Care should be taken around fault tolerance
>>>>>>> idempotency. Close stream would need to stop accepting new data
>>>>>>> still
>>>>>>> can't actually close all the streams and un-deploy operators
>>>>>>> committed. Idempotency might require the close stream to take
>>>>>>> at
>>>>>>> the
>>>>>> end of the window. What would it then mean for re-opening streams
>>>>>>> within
>>>>>>> a
>>>>>> window? Also, looks like a larger undertaking, as Ram suggested would
>>>>>>> be
>>>>>>> good to understand the use cases and I also suggest that multiple
>>>>>>> folks
>>>>>>> participate in the implementation effort to ensure that we are
able to
>>>>>>> address all the scenarios and minimize chances of regression
>>>>>>> existing
>>>>>>> behavior.
>>>>>>> Thanks
>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>>> wrote:
>>>>>>> All,
>>>>>>>> Currently Apex assumes that an operator can emit on any defined
>>>>>>>> output
>>>>>>>> port and all streams defined by a DAG are active. I'd like
to propose
>>>>>>>> an
>>>>>>>> ability for an operator to open and close output ports. By
>>>>>>>> all
>>>>>>>> ports defined by an operator will be open. In the case an
>>>>>>>> for
>>>>>>>> any
>>>>>>> reason decides that it will not emit tuples on the output port,
it may
>>>>>>>> close it. This will make the stream inactive and the application
>>>>>>>> master
>>>>>>>> may
>>>>>>> undeploy the downstream (for that input stream) operators. If
>>>>>>> leads to
>>>>>>> containers that don't have any active operators, those containers
>>>>>>> be
>>>>>>>> undeployed as well leading to better cluster resource utilization
>>>>>>>> better Apex elasticity. Later, the operator may be in a state
>>>>>>>> it
>>>>>>>> needs to emit tuples on the closed port. In this case, it
needs to
>>>>>>>> re-open
>>>>>>> the port and wait till the stream becomes active again before
>>>>>>>> tuples on that port. Making inactive stream active again,
>>>>>>>> the
>>>>>>>> application master to re-allocate containers and re-deploy
>>>>>>>> downstream
>>>>>>> operators.
>>>>>>>> It should be also possible for an application designer to
>>>>>>>> streams
>>>>>>>> as
>>>>>>> inactive when an application starts. This will allow the application
>>>>>>> master
>>>>>>> avoid reserving all containers when the application starts. Later,
>>>>>>> port
>>>>>>> can be open and inactive stream become active.
>>>>>>>> Thank you,
>>>>>>>> Vlad

View raw message