apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 14:02:03 GMT
Right, I did not mean that APEXCORE-408 can not support in-memory 
streams, my point was that it is preferable to pass data between stages 
by not using disk/HDFS and that use cases for the proposal and 
APEXCORE-408 are the same or very close to each other.

I filed https://issues.apache.org/jira/browse/APEXCORE-696.

Thank you,

Vlad

On 4/9/17 22:23, Tushar Gosavi wrote:
> Hi Vlad,
>
> you can avoid writing data files, if intermediate data can fit in memory
> with APEXCORE-408. The api allows extending the DAG by adding streams and
> operators to existing DAG. In the example you have provided user could
> extend a new dag after aggregate operator by adding new output operator and
> connecting it with aggregate.
>
> - Tushar.
>
>
> On Fri, Apr 7, 2017 at 6:22 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
>
>> It is exactly the same use case with the exception that it is not
>> necessary to write data to files. Consider 3 operators, an input operator,
>> an aggregate operator and an output operator. When the application starts,
>> the output port of the aggregate operator should be in the closed state,
>> the stream between the second and the third would be inactive and the
>> output operator does not need to be allocated. After the input operator
>> process all data, it can close the output port and the input operator may
>> be de-allocated. Once the aggregator receives EOS on it's input port, it
>> should open the output port and start writing to it. At this point, the
>> output operator needs to be deployed and the stream between the last two
>> operators (aggregator and output) becomes active.
>>
>> In a real batch use case, it is preferable to have full application DAG to
>> be statically defined and delegate to platform activation/de-activation of
>> stages. It is also preferable not to write intermediate files to disk/HDFS,
>> but instead pass data in-memory.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 4/6/17 09:37, Thomas Weise wrote:
>>
>>> You would need to provide more specifics of the use case you are thinking
>>> to address to make this a meaningful discussion.
>>>
>>> An example for APEXCORE-408 (based on real batch use case): I have two
>>> stages, first stage produces a set of files that second stage needs as
>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>> when
>>> stage 2 starts. These can be independent operators, they don't need to be
>>> connected through a stream.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>>
>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>> address the same use case - how to re-allocate resources for batch
>>>> applications or applications where processing happens in stages. The
>>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>>> from application logic to the platform. IMO, supporting batch
>>>> applications
>>>> using APEXCORE-408 will require more coding on the application side.
>>>>
>>>> Thank you,
>>>>
>>>> Vlad
>>>>
>>>>
>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>
>>>> I think this needs more input on a use case level. The ability to
>>>>> dynamically alter the DAG internally will also address the resource
>>>>> allocation for operators:
>>>>>
>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>
>>>>> It can be used to implement stages of a batch pipeline and is very
>>>>> flexible
>>>>> in general. Considering the likely implementation complexity for the
>>>>> proposed feature I would like to understand what benefits it provides
to
>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> Correct, a statefull downstream operator can only be undeployed at a
>>>>>
>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>> operator
>>>>>> on the closed port.
>>>>>>
>>>>>> It will be necessary to distinguish between closed port and inactive
>>>>>> stream. After port is closed, stream may still be active and after
port
>>>>>> is
>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>
>>>>>> The more contributors participate in the discussion and implementation,
>>>>>> the more solid the feature will be.
>>>>>>
>>>>>> Thank you,
>>>>>> Vlad
>>>>>>
>>>>>> Отправлено с iPhone
>>>>>>
>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pramod@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> Generally a good idea. Care should be taken around fault tolerance
and
>>>>>>> idempotency. Close stream would need to stop accepting new data
but
>>>>>>> still
>>>>>>> can't actually close all the streams and un-deploy operators
till
>>>>>>> committed. Idempotency might require the close stream to take
effect
>>>>>>> at
>>>>>>>
>>>>>>> the
>>>>>> end of the window. What would it then mean for re-opening streams
>>>>>>> within
>>>>>>>
>>>>>>> a
>>>>>> window? Also, looks like a larger undertaking, as Ram suggested would
>>>>>>> be
>>>>>>> good to understand the use cases and I also suggest that multiple
>>>>>>> folks
>>>>>>> participate in the implementation effort to ensure that we are
able to
>>>>>>> address all the scenarios and minimize chances of regression
in
>>>>>>> existing
>>>>>>> behavior.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>>> wrote:
>>>>>>> All,
>>>>>>>
>>>>>>>> Currently Apex assumes that an operator can emit on any defined
>>>>>>>> output
>>>>>>>> port and all streams defined by a DAG are active. I'd like
to propose
>>>>>>>> an
>>>>>>>> ability for an operator to open and close output ports. By
default
>>>>>>>> all
>>>>>>>> ports defined by an operator will be open. In the case an
operator
>>>>>>>> for
>>>>>>>>
>>>>>>>> any
>>>>>>> reason decides that it will not emit tuples on the output port,
it may
>>>>>>>
>>>>>>>> close it. This will make the stream inactive and the application
>>>>>>>> master
>>>>>>>>
>>>>>>>> may
>>>>>>> undeploy the downstream (for that input stream) operators. If
this
>>>>>>> leads to
>>>>>>> containers that don't have any active operators, those containers
may
>>>>>>> be
>>>>>>>
>>>>>>>> undeployed as well leading to better cluster resource utilization
and
>>>>>>>> better Apex elasticity. Later, the operator may be in a state
where
>>>>>>>> it
>>>>>>>> needs to emit tuples on the closed port. In this case, it
needs to
>>>>>>>>
>>>>>>>> re-open
>>>>>>> the port and wait till the stream becomes active again before
emitting
>>>>>>>
>>>>>>>> tuples on that port. Making inactive stream active again,
requires
>>>>>>>> the
>>>>>>>> application master to re-allocate containers and re-deploy
the
>>>>>>>>
>>>>>>>> downstream
>>>>>>> operators.
>>>>>>>
>>>>>>>> It should be also possible for an application designer to
mark
>>>>>>>> streams
>>>>>>>>
>>>>>>>> as
>>>>>>> inactive when an application starts. This will allow the application
>>>>>>> master
>>>>>>> avoid reserving all containers when the application starts. Later,
the
>>>>>>> port
>>>>>>> can be open and inactive stream become active.
>>>>>>>
>>>>>>>> Thank you,
>>>>>>>>
>>>>>>>> Vlad
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>


Mime
View raw message