apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 15:05:53 GMT
With additional file readers/writers the pipeline of a single stage 
becomes the 3 operator use case I described. With ability to open/close 
ports, platform can optimize it by re-allocating resources from readers 
to writers.

Thank you,


On 4/10/17 07:44, Thomas Weise wrote:
> In streaming there is a stream (surprise), in a space constraint batch
> case, we can have additional file writers/readers between the operators.
> Modules can in fact be used to support pipeline reuse, but they must be
> added/removed dynamically to support stages with on-demand resource
> allocation.
> Thomas
> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
>> Do you suggest that in a streaming use case join operator also pass data
>> to downstream using files or that there are two different join operators
>> one for streaming and one for batch? If not, it means that the join
>> operator needs to emit data to a separate file output operator, so it still
>> needs to read data from a temporary space before emitting, why not to emit
>> directly to topN in this case?
>> Is not pipeline reuse already supported by Apex modules?
>> Thank you,
>> Vlad
>> On 4/10/17 06:59, Thomas Weise wrote:
>>> I don't think this fully covers the the scenario of limited resources. You
>>> describe a case of 3 operators, but when you consider just 2 operators
>>> that
>>> both have to hold a large data set in memory, then the suggested approach
>>> won't work. Let's say the first operator is outer join and the second
>>> operator topN. Both are blocking and cannot emit before all input is seen.
>>> To deallocate the outer join, all results need to be drained. It's a
>>> resource swap and you need a temporary space to hold the data. Also, if
>>> the
>>> requirement is to be able to recover and retry from results of stage one,
>>> then you need a fault tolerant swap space. If the cluster does not have
>>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>> I would also suggest to think beyond the single DAG scenario. Often users
>>> need to define pipelines that are composed of multiple smaller flows
>>> (which
>>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives you
>>> an option to compose such flows within a single Apex application, in
>>> addition of covering the simplified use case that we discuss there.
>>> Thomas
>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>> It is exactly the same use case with the exception that it is not
>>>> necessary to write data to files. Consider 3 operators, an input
>>>> operator,
>>>> an aggregate operator and an output operator. When the application
>>>> starts,
>>>> the output port of the aggregate operator should be in the closed state,
>>>> the stream between the second and the third would be inactive and the
>>>> output operator does not need to be allocated. After the input operator
>>>> process all data, it can close the output port and the input operator may
>>>> be de-allocated. Once the aggregator receives EOS on it's input port, it
>>>> should open the output port and start writing to it. At this point, the
>>>> output operator needs to be deployed and the stream between the last two
>>>> operators (aggregator and output) becomes active.
>>>> In a real batch use case, it is preferable to have full application DAG
>>>> to
>>>> be statically defined and delegate to platform activation/de-activation
>>>> of
>>>> stages. It is also preferable not to write intermediate files to
>>>> disk/HDFS,
>>>> but instead pass data in-memory.
>>>> Thank you,
>>>> Vlad
>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>> You would need to provide more specifics of the use case you are thinking
>>>>> to address to make this a meaningful discussion.
>>>>> An example for APEXCORE-408 (based on real batch use case): I have two
>>>>> stages, first stage produces a set of files that second stage needs as
>>>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>>>> when
>>>>> stage 2 starts. These can be independent operators, they don't need to
>>>>> be
>>>>> connected through a stream.
>>>>> Thomas
>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>>>> address the same use case - how to re-allocate resources for batch
>>>>>> applications or applications where processing happens in stages.
>>>>>> difference between APEXCORE-408 and the proposal is shift in complexity
>>>>>> from application logic to the platform. IMO, supporting batch
>>>>>> applications
>>>>>> using APEXCORE-408 will require more coding on the application side.
>>>>>> Thank you,
>>>>>> Vlad
>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>> I think this needs more input on a use case level. The ability to
>>>>>>> dynamically alter the DAG internally will also address the resource
>>>>>>> allocation for operators:
>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>> It can be used to implement stages of a batch pipeline and is
>>>>>>> flexible
>>>>>>> in general. Considering the likely implementation complexity
for the
>>>>>>> proposed feature I would like to understand what benefits it
>>>>>>> to
>>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>>> wrote:
>>>>>>> Correct, a statefull downstream operator can only be undeployed
at a
>>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>>> operator
>>>>>>>> on the closed port.
>>>>>>>> It will be necessary to distinguish between closed port and
>>>>>>>> stream. After port is closed, stream may still be active
and after
>>>>>>>> port
>>>>>>>> is
>>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>> The more contributors participate in the discussion and
>>>>>>>> implementation,
>>>>>>>> the more solid the feature will be.
>>>>>>>> Thank you,
>>>>>>>> Vlad
>>>>>>>> Отправлено с iPhone
>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pramod@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>> Generally a good idea. Care should be taken around fault
>>>>>>>> and
>>>>>>>>> idempotency. Close stream would need to stop accepting
new data but
>>>>>>>>> still
>>>>>>>>> can't actually close all the streams and un-deploy operators
>>>>>>>>> committed. Idempotency might require the close stream
to take effect
>>>>>>>>> at
>>>>>>>>> the
>>>>>>>> end of the window. What would it then mean for re-opening
>>>>>>>>> within
>>>>>>>>> a
>>>>>>>> window? Also, looks like a larger undertaking, as Ram suggested
>>>>>>>>> be
>>>>>>>>> good to understand the use cases and I also suggest that
>>>>>>>>> folks
>>>>>>>>> participate in the implementation effort to ensure that
we are able
>>>>>>>>> to
>>>>>>>>> address all the scenarios and minimize chances of regression
>>>>>>>>> existing
>>>>>>>>> behavior.
>>>>>>>>> Thanks
>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <v.rozov@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>> All,
>>>>>>>>> Currently Apex assumes that an operator can emit on any
>>>>>>>>>> output
>>>>>>>>>> port and all streams defined by a DAG are active.
I'd like to
>>>>>>>>>> propose
>>>>>>>>>> an
>>>>>>>>>> ability for an operator to open and close output
ports. By default
>>>>>>>>>> all
>>>>>>>>>> ports defined by an operator will be open. In the
case an operator
>>>>>>>>>> for
>>>>>>>>>> any
>>>>>>>>> reason decides that it will not emit tuples on the output
port, it
>>>>>>>>> may
>>>>>>>>> close it. This will make the stream inactive and the
>>>>>>>>>> master
>>>>>>>>>> may
>>>>>>>>> undeploy the downstream (for that input stream) operators.
If this
>>>>>>>>> leads to
>>>>>>>>> containers that don't have any active operators, those
>>>>>>>>> may
>>>>>>>>> be
>>>>>>>>> undeployed as well leading to better cluster resource
>>>>>>>>>> and
>>>>>>>>>> better Apex elasticity. Later, the operator may be
in a state where
>>>>>>>>>> it
>>>>>>>>>> needs to emit tuples on the closed port. In this
case, it needs to
>>>>>>>>>> re-open
>>>>>>>>> the port and wait till the stream becomes active again
>>>>>>>>> emitting
>>>>>>>>> tuples on that port. Making inactive stream active again,
>>>>>>>>>> the
>>>>>>>>>> application master to re-allocate containers and
re-deploy the
>>>>>>>>>> downstream
>>>>>>>>> operators.
>>>>>>>>> It should be also possible for an application designer
to mark
>>>>>>>>>> streams
>>>>>>>>>> as
>>>>>>>>> inactive when an application starts. This will allow
the application
>>>>>>>>> master
>>>>>>>>> avoid reserving all containers when the application starts.
>>>>>>>>> the
>>>>>>>>> port
>>>>>>>>> can be open and inactive stream become active.
>>>>>>>>> Thank you,
>>>>>>>>>> Vlad

View raw message