apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 15:05:53 GMT
With additional file readers/writers the pipeline of a single stage 
becomes the 3 operator use case I described. With ability to open/close 
ports, platform can optimize it by re-allocating resources from readers 
to writers.

Thank you,

Vlad

On 4/10/17 07:44, Thomas Weise wrote:
> In streaming there is a stream (surprise), in a space constraint batch
> case, we can have additional file writers/readers between the operators.
>
> Modules can in fact be used to support pipeline reuse, but they must be
> added/removed dynamically to support stages with on-demand resource
> allocation.
>
> Thomas
>
>
> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
>
>> Do you suggest that in a streaming use case join operator also pass data
>> to downstream using files or that there are two different join operators
>> one for streaming and one for batch? If not, it means that the join
>> operator needs to emit data to a separate file output operator, so it still
>> needs to read data from a temporary space before emitting, why not to emit
>> directly to topN in this case?
>>
>> Is not pipeline reuse already supported by Apex modules?
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 4/10/17 06:59, Thomas Weise wrote:
>>
>>> I don't think this fully covers the the scenario of limited resources. You
>>> describe a case of 3 operators, but when you consider just 2 operators
>>> that
>>> both have to hold a large data set in memory, then the suggested approach
>>> won't work. Let's say the first operator is outer join and the second
>>> operator topN. Both are blocking and cannot emit before all input is seen.
>>>
>>> To deallocate the outer join, all results need to be drained. It's a
>>> resource swap and you need a temporary space to hold the data. Also, if
>>> the
>>> requirement is to be able to recover and retry from results of stage one,
>>> then you need a fault tolerant swap space. If the cluster does not have
>>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>>
>>> I would also suggest to think beyond the single DAG scenario. Often users
>>> need to define pipelines that are composed of multiple smaller flows
>>> (which
>>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives you
>>> an option to compose such flows within a single Apex application, in
>>> addition of covering the simplified use case that we discuss there.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>>
>>> It is exactly the same use case with the exception that it is not
>>>> necessary to write data to files. Consider 3 operators, an input
>>>> operator,
>>>> an aggregate operator and an output operator. When the application
>>>> starts,
>>>> the output port of the aggregate operator should be in the closed state,
>>>> the stream between the second and the third would be inactive and the
>>>> output operator does not need to be allocated. After the input operator
>>>> process all data, it can close the output port and the input operator may
>>>> be de-allocated. Once the aggregator receives EOS on it's input port, it
>>>> should open the output port and start writing to it. At this point, the
>>>> output operator needs to be deployed and the stream between the last two
>>>> operators (aggregator and output) becomes active.
>>>>
>>>> In a real batch use case, it is preferable to have full application DAG
>>>> to
>>>> be statically defined and delegate to platform activation/de-activation
>>>> of
>>>> stages. It is also preferable not to write intermediate files to
>>>> disk/HDFS,
>>>> but instead pass data in-memory.
>>>>
>>>> Thank you,
>>>>
>>>> Vlad
>>>>
>>>>
>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>
>>>> You would need to provide more specifics of the use case you are thinking
>>>>> to address to make this a meaningful discussion.
>>>>>
>>>>> An example for APEXCORE-408 (based on real batch use case): I have two
>>>>> stages, first stage produces a set of files that second stage needs as
>>>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>>>> when
>>>>> stage 2 starts. These can be independent operators, they don't need to
>>>>> be
>>>>> connected through a stream.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>>>
>>>>>> address the same use case - how to re-allocate resources for batch
>>>>>> applications or applications where processing happens in stages.
The
>>>>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>>>>> from application logic to the platform. IMO, supporting batch
>>>>>> applications
>>>>>> using APEXCORE-408 will require more coding on the application side.
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Vlad
>>>>>>
>>>>>>
>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>
>>>>>> I think this needs more input on a use case level. The ability to
>>>>>>
>>>>>>> dynamically alter the DAG internally will also address the resource
>>>>>>> allocation for operators:
>>>>>>>
>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>
>>>>>>> It can be used to implement stages of a batch pipeline and is
very
>>>>>>> flexible
>>>>>>> in general. Considering the likely implementation complexity
for the
>>>>>>> proposed feature I would like to understand what benefits it
provides
>>>>>>> to
>>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Correct, a statefull downstream operator can only be undeployed
at a
>>>>>>>
>>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>>> operator
>>>>>>>> on the closed port.
>>>>>>>>
>>>>>>>> It will be necessary to distinguish between closed port and
inactive
>>>>>>>> stream. After port is closed, stream may still be active
and after
>>>>>>>> port
>>>>>>>> is
>>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>>
>>>>>>>> The more contributors participate in the discussion and
>>>>>>>> implementation,
>>>>>>>> the more solid the feature will be.
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Vlad
>>>>>>>>
>>>>>>>> Отправлено с iPhone
>>>>>>>>
>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pramod@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Generally a good idea. Care should be taken around fault
tolerance
>>>>>>>> and
>>>>>>>>
>>>>>>>>> idempotency. Close stream would need to stop accepting
new data but
>>>>>>>>> still
>>>>>>>>> can't actually close all the streams and un-deploy operators
till
>>>>>>>>> committed. Idempotency might require the close stream
to take effect
>>>>>>>>> at
>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>> end of the window. What would it then mean for re-opening
streams
>>>>>>>>
>>>>>>>>> within
>>>>>>>>>
>>>>>>>>> a
>>>>>>>>>
>>>>>>>> window? Also, looks like a larger undertaking, as Ram suggested
would
>>>>>>>>
>>>>>>>>> be
>>>>>>>>> good to understand the use cases and I also suggest that
multiple
>>>>>>>>> folks
>>>>>>>>> participate in the implementation effort to ensure that
we are able
>>>>>>>>> to
>>>>>>>>> address all the scenarios and minimize chances of regression
in
>>>>>>>>> existing
>>>>>>>>> behavior.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.rozov@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>> All,
>>>>>>>>>
>>>>>>>>> Currently Apex assumes that an operator can emit on any
defined
>>>>>>>>>> output
>>>>>>>>>> port and all streams defined by a DAG are active.
I'd like to
>>>>>>>>>> propose
>>>>>>>>>> an
>>>>>>>>>> ability for an operator to open and close output
ports. By default
>>>>>>>>>> all
>>>>>>>>>> ports defined by an operator will be open. In the
case an operator
>>>>>>>>>> for
>>>>>>>>>>
>>>>>>>>>> any
>>>>>>>>>>
>>>>>>>>> reason decides that it will not emit tuples on the output
port, it
>>>>>>>>> may
>>>>>>>>>
>>>>>>>>> close it. This will make the stream inactive and the
application
>>>>>>>>>> master
>>>>>>>>>>
>>>>>>>>>> may
>>>>>>>>>>
>>>>>>>>> undeploy the downstream (for that input stream) operators.
If this
>>>>>>>>> leads to
>>>>>>>>> containers that don't have any active operators, those
containers
>>>>>>>>> may
>>>>>>>>> be
>>>>>>>>>
>>>>>>>>> undeployed as well leading to better cluster resource
utilization
>>>>>>>>>> and
>>>>>>>>>> better Apex elasticity. Later, the operator may be
in a state where
>>>>>>>>>> it
>>>>>>>>>> needs to emit tuples on the closed port. In this
case, it needs to
>>>>>>>>>>
>>>>>>>>>> re-open
>>>>>>>>>>
>>>>>>>>> the port and wait till the stream becomes active again
before
>>>>>>>>> emitting
>>>>>>>>>
>>>>>>>>> tuples on that port. Making inactive stream active again,
requires
>>>>>>>>>> the
>>>>>>>>>> application master to re-allocate containers and
re-deploy the
>>>>>>>>>>
>>>>>>>>>> downstream
>>>>>>>>>>
>>>>>>>>> operators.
>>>>>>>>>
>>>>>>>>> It should be also possible for an application designer
to mark
>>>>>>>>>> streams
>>>>>>>>>>
>>>>>>>>>> as
>>>>>>>>>>
>>>>>>>>> inactive when an application starts. This will allow
the application
>>>>>>>>> master
>>>>>>>>> avoid reserving all containers when the application starts.
Later,
>>>>>>>>> the
>>>>>>>>> port
>>>>>>>>> can be open and inactive stream become active.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>>> Vlad
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>


Mime
View raw message