apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <...@apache.org>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 15:13:31 GMT
In this example join/writer produces the data, reader/topN consumes. You
cannot deallocate producer before all data has been drained. When using
files, join/writer can be deallocated when all data was flushed to the
files and allocation of consumer can wait until that occurred, if the space
isn't available to have both of them active at same time.

Overall it seems this is not a matter of activating/deactivating streams
but operators.

Thomas



On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> With additional file readers/writers the pipeline of a single stage
> becomes the 3 operator use case I described. With ability to open/close
> ports, platform can optimize it by re-allocating resources from readers to
> writers.
>
> Thank you,
>
> Vlad
>
>
> On 4/10/17 07:44, Thomas Weise wrote:
>
>> In streaming there is a stream (surprise), in a space constraint batch
>> case, we can have additional file writers/readers between the operators.
>>
>> Modules can in fact be used to support pipeline reuse, but they must be
>> added/removed dynamically to support stages with on-demand resource
>> allocation.
>>
>> Thomas
>>
>>
>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.rozov@datatorrent.com>
>> wrote:
>>
>> Do you suggest that in a streaming use case join operator also pass data
>>> to downstream using files or that there are two different join operators
>>> one for streaming and one for batch? If not, it means that the join
>>> operator needs to emit data to a separate file output operator, so it
>>> still
>>> needs to read data from a temporary space before emitting, why not to
>>> emit
>>> directly to topN in this case?
>>>
>>> Is not pipeline reuse already supported by Apex modules?
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/10/17 06:59, Thomas Weise wrote:
>>>
>>> I don't think this fully covers the the scenario of limited resources.
>>>> You
>>>> describe a case of 3 operators, but when you consider just 2 operators
>>>> that
>>>> both have to hold a large data set in memory, then the suggested
>>>> approach
>>>> won't work. Let's say the first operator is outer join and the second
>>>> operator topN. Both are blocking and cannot emit before all input is
>>>> seen.
>>>>
>>>> To deallocate the outer join, all results need to be drained. It's a
>>>> resource swap and you need a temporary space to hold the data. Also, if
>>>> the
>>>> requirement is to be able to recover and retry from results of stage
>>>> one,
>>>> then you need a fault tolerant swap space. If the cluster does not have
>>>> enough memory, then disk is a good option (SLA vs. memory tradeoff).
>>>>
>>>> I would also suggest to think beyond the single DAG scenario. Often
>>>> users
>>>> need to define pipelines that are composed of multiple smaller flows
>>>> (which
>>>> they may also want to reuse in multiple pipelines). APEXCORE-408 gives
>>>> you
>>>> an option to compose such flows within a single Apex application, in
>>>> addition of covering the simplified use case that we discuss there.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.rozov@datatorrent.com>
>>>> wrote:
>>>>
>>>> It is exactly the same use case with the exception that it is not
>>>>
>>>>> necessary to write data to files. Consider 3 operators, an input
>>>>> operator,
>>>>> an aggregate operator and an output operator. When the application
>>>>> starts,
>>>>> the output port of the aggregate operator should be in the closed
>>>>> state,
>>>>> the stream between the second and the third would be inactive and the
>>>>> output operator does not need to be allocated. After the input operator
>>>>> process all data, it can close the output port and the input operator
>>>>> may
>>>>> be de-allocated. Once the aggregator receives EOS on it's input port,
>>>>> it
>>>>> should open the output port and start writing to it. At this point, the
>>>>> output operator needs to be deployed and the stream between the last
>>>>> two
>>>>> operators (aggregator and output) becomes active.
>>>>>
>>>>> In a real batch use case, it is preferable to have full application DAG
>>>>> to
>>>>> be statically defined and delegate to platform activation/de-activation
>>>>> of
>>>>> stages. It is also preferable not to write intermediate files to
>>>>> disk/HDFS,
>>>>> but instead pass data in-memory.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>>
>>>>> You would need to provide more specifics of the use case you are
>>>>> thinking
>>>>>
>>>>>> to address to make this a meaningful discussion.
>>>>>>
>>>>>> An example for APEXCORE-408 (based on real batch use case): I have
two
>>>>>> stages, first stage produces a set of files that second stage needs
as
>>>>>> input. Stage 1 operators to be released and stage 2 operators deployed
>>>>>> when
>>>>>> stage 2 starts. These can be independent operators, they don't need
to
>>>>>> be
>>>>>> connected through a stream.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> It is not about a use case difference. My proposal and APEXCORE-408
>>>>>>
>>>>>> address the same use case - how to re-allocate resources for batch
>>>>>>> applications or applications where processing happens in stages.
The
>>>>>>> difference between APEXCORE-408 and the proposal is shift in
>>>>>>> complexity
>>>>>>> from application logic to the platform. IMO, supporting batch
>>>>>>> applications
>>>>>>> using APEXCORE-408 will require more coding on the application
side.
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>>
>>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>>
>>>>>>> I think this needs more input on a use case level. The ability
to
>>>>>>>
>>>>>>> dynamically alter the DAG internally will also address the resource
>>>>>>>> allocation for operators:
>>>>>>>>
>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>>
>>>>>>>> It can be used to implement stages of a batch pipeline and
is very
>>>>>>>> flexible
>>>>>>>> in general. Considering the likely implementation complexity
for the
>>>>>>>> proposed feature I would like to understand what benefits
it
>>>>>>>> provides
>>>>>>>> to
>>>>>>>> the user (use cases that cannot be addressed otherwise)?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov <
>>>>>>>> v.rozov@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Correct, a statefull downstream operator can only be undeployed
at a
>>>>>>>>
>>>>>>>> checkpoint window after it consumes all data emitted by upstream
>>>>>>>>
>>>>>>>>> operator
>>>>>>>>> on the closed port.
>>>>>>>>>
>>>>>>>>> It will be necessary to distinguish between closed port
and
>>>>>>>>> inactive
>>>>>>>>> stream. After port is closed, stream may still be active
and after
>>>>>>>>> port
>>>>>>>>> is
>>>>>>>>> open, stream may still be inactive (not yet ready).
>>>>>>>>>
>>>>>>>>> The more contributors participate in the discussion and
>>>>>>>>> implementation,
>>>>>>>>> the more solid the feature will be.
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>> Отправлено с iPhone
>>>>>>>>>
>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod Immaneni <pramod@datatorrent.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Generally a good idea. Care should be taken around fault
tolerance
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>> idempotency. Close stream would need to stop accepting
new data but
>>>>>>>>>> still
>>>>>>>>>> can't actually close all the streams and un-deploy
operators till
>>>>>>>>>> committed. Idempotency might require the close stream
to take
>>>>>>>>>> effect
>>>>>>>>>> at
>>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> end of the window. What would it then mean for re-opening
streams
>>>>>>>>>
>>>>>>>>> within
>>>>>>>>>>
>>>>>>>>>> a
>>>>>>>>>>
>>>>>>>>>> window? Also, looks like a larger undertaking, as
Ram suggested
>>>>>>>>> would
>>>>>>>>>
>>>>>>>>> be
>>>>>>>>>> good to understand the use cases and I also suggest
that multiple
>>>>>>>>>> folks
>>>>>>>>>> participate in the implementation effort to ensure
that we are
>>>>>>>>>> able
>>>>>>>>>> to
>>>>>>>>>> address all the scenarios and minimize chances of
regression in
>>>>>>>>>> existing
>>>>>>>>>> behavior.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov <
>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>> wrote:
>>>>>>>>>> All,
>>>>>>>>>>
>>>>>>>>>> Currently Apex assumes that an operator can emit
on any defined
>>>>>>>>>>
>>>>>>>>>>> output
>>>>>>>>>>> port and all streams defined by a DAG are active.
I'd like to
>>>>>>>>>>> propose
>>>>>>>>>>> an
>>>>>>>>>>> ability for an operator to open and close output
ports. By
>>>>>>>>>>> default
>>>>>>>>>>> all
>>>>>>>>>>> ports defined by an operator will be open. In
the case an
>>>>>>>>>>> operator
>>>>>>>>>>> for
>>>>>>>>>>>
>>>>>>>>>>> any
>>>>>>>>>>>
>>>>>>>>>>> reason decides that it will not emit tuples on
the output port,
>>>>>>>>>> it
>>>>>>>>>> may
>>>>>>>>>>
>>>>>>>>>> close it. This will make the stream inactive and
the application
>>>>>>>>>>
>>>>>>>>>>> master
>>>>>>>>>>>
>>>>>>>>>>> may
>>>>>>>>>>>
>>>>>>>>>>> undeploy the downstream (for that input stream)
operators. If
>>>>>>>>>> this
>>>>>>>>>> leads to
>>>>>>>>>> containers that don't have any active operators,
those containers
>>>>>>>>>> may
>>>>>>>>>> be
>>>>>>>>>>
>>>>>>>>>> undeployed as well leading to better cluster resource
utilization
>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>> better Apex elasticity. Later, the operator may
be in a state
>>>>>>>>>>> where
>>>>>>>>>>> it
>>>>>>>>>>> needs to emit tuples on the closed port. In this
case, it needs
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>> re-open
>>>>>>>>>>>
>>>>>>>>>>> the port and wait till the stream becomes active
again before
>>>>>>>>>> emitting
>>>>>>>>>>
>>>>>>>>>> tuples on that port. Making inactive stream active
again, requires
>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>> application master to re-allocate containers
and re-deploy the
>>>>>>>>>>>
>>>>>>>>>>> downstream
>>>>>>>>>>>
>>>>>>>>>>> operators.
>>>>>>>>>>
>>>>>>>>>> It should be also possible for an application designer
to mark
>>>>>>>>>>
>>>>>>>>>>> streams
>>>>>>>>>>>
>>>>>>>>>>> as
>>>>>>>>>>>
>>>>>>>>>>> inactive when an application starts. This will
allow the
>>>>>>>>>> application
>>>>>>>>>> master
>>>>>>>>>> avoid reserving all containers when the application
starts. Later,
>>>>>>>>>> the
>>>>>>>>>> port
>>>>>>>>>> can be open and inactive stream become active.
>>>>>>>>>>
>>>>>>>>>> Thank you,
>>>>>>>>>>
>>>>>>>>>>> Vlad
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message