apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 15:56:22 GMT
For the second pipeline, source can be de-allocated as soon as join gets 
all data and join can be de-allocated as soon as topN gets all data. 
Note that topN (and sink) does not need to be allocated before join 
starts emitting data.

Thank you,

Vlad

On 4/10/17 08:48, Thomas Weise wrote:
> The pipeline depends on the resource availability. It could be:
>
> ( source -> join -> writer ) - - -> ( reader -> topN -> sink)
>
> or
>
> (source -> join -> topN -> sink)
>
> The second case does not allow you do deallocate join (join and topN are
> active at the same time).
>
>
> On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:
>
>> It is important. The generic pipeline proposed is (... -> writer) --->
>> (reader -> join -> writer) ---> (reader -> ...), where reader-> aggregator
>> -> writer becomes a common pattern for a single stage processing.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 4/10/17 08:31, Thomas Weise wrote:
>>
>>> Where the data comes from isn't important for this discussion. The
>>> scenario
>>> is join -> topN
>>>
>>> With intermediate files it is: ( join -> writer ) - - -> ( reader ->
topN
>>> )
>>>
>>>
>>> On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>> wrote:
>>>
>>> In your example join is both consumer and producer, is not it? Where does
>>>> it get data from? Join is not an input operator.
>>>>
>>>> Thank you,
>>>>
>>>> Vlad
>>>>
>>>>
>>>> On 4/10/17 08:13, Thomas Weise wrote:
>>>>
>>>> In this example join/writer produces the data, reader/topN consumes. You
>>>>> cannot deallocate producer before all data has been drained. When using
>>>>> files, join/writer can be deallocated when all data was flushed to the
>>>>> files and allocation of consumer can wait until that occurred, if the
>>>>> space
>>>>> isn't available to have both of them active at same time.
>>>>>
>>>>> Overall it seems this is not a matter of activating/deactivating streams
>>>>> but operators.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> With additional file readers/writers the pipeline of a single stage
>>>>>
>>>>>> becomes the 3 operator use case I described. With ability to open/close
>>>>>> ports, platform can optimize it by re-allocating resources from readers
>>>>>> to
>>>>>> writers.
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Vlad
>>>>>>
>>>>>>
>>>>>> On 4/10/17 07:44, Thomas Weise wrote:
>>>>>>
>>>>>> In streaming there is a stream (surprise), in a space constraint
batch
>>>>>>
>>>>>>> case, we can have additional file writers/readers between the
>>>>>>> operators.
>>>>>>>
>>>>>>> Modules can in fact be used to support pipeline reuse, but they
must
>>>>>>> be
>>>>>>> added/removed dynamically to support stages with on-demand resource
>>>>>>> allocation.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Do you suggest that in a streaming use case join operator also
pass
>>>>>>> data
>>>>>>>
>>>>>>> to downstream using files or that there are two different join
>>>>>>>> operators
>>>>>>>> one for streaming and one for batch? If not, it means that
the join
>>>>>>>> operator needs to emit data to a separate file output operator,
so it
>>>>>>>> still
>>>>>>>> needs to read data from a temporary space before emitting,
why not to
>>>>>>>> emit
>>>>>>>> directly to topN in this case?
>>>>>>>>
>>>>>>>> Is not pipeline reuse already supported by Apex modules?
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>>
>>>>>>>> Vlad
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/10/17 06:59, Thomas Weise wrote:
>>>>>>>>
>>>>>>>> I don't think this fully covers the the scenario of limited
>>>>>>>> resources.
>>>>>>>>
>>>>>>>> You
>>>>>>>>> describe a case of 3 operators, but when you consider
just 2
>>>>>>>>> operators
>>>>>>>>> that
>>>>>>>>> both have to hold a large data set in memory, then the
suggested
>>>>>>>>> approach
>>>>>>>>> won't work. Let's say the first operator is outer join
and the
>>>>>>>>> second
>>>>>>>>> operator topN. Both are blocking and cannot emit before
all input is
>>>>>>>>> seen.
>>>>>>>>>
>>>>>>>>> To deallocate the outer join, all results need to be
drained. It's a
>>>>>>>>> resource swap and you need a temporary space to hold
the data. Also,
>>>>>>>>> if
>>>>>>>>> the
>>>>>>>>> requirement is to be able to recover and retry from results
of stage
>>>>>>>>> one,
>>>>>>>>> then you need a fault tolerant swap space. If the cluster
does not
>>>>>>>>> have
>>>>>>>>> enough memory, then disk is a good option (SLA vs. memory
tradeoff).
>>>>>>>>>
>>>>>>>>> I would also suggest to think beyond the single DAG scenario.
Often
>>>>>>>>> users
>>>>>>>>> need to define pipelines that are composed of multiple
smaller flows
>>>>>>>>> (which
>>>>>>>>> they may also want to reuse in multiple pipelines). APEXCORE-408
>>>>>>>>> gives
>>>>>>>>> you
>>>>>>>>> an option to compose such flows within a single Apex
application, in
>>>>>>>>> addition of covering the simplified use case that we
discuss there.
>>>>>>>>>
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <v.rozov@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> It is exactly the same use case with the exception that
it is not
>>>>>>>>>
>>>>>>>>> necessary to write data to files. Consider 3 operators,
an input
>>>>>>>>>
>>>>>>>>>> operator,
>>>>>>>>>> an aggregate operator and an output operator. When
the application
>>>>>>>>>> starts,
>>>>>>>>>> the output port of the aggregate operator should
be in the closed
>>>>>>>>>> state,
>>>>>>>>>> the stream between the second and the third would
be inactive and
>>>>>>>>>> the
>>>>>>>>>> output operator does not need to be allocated. After
the input
>>>>>>>>>> operator
>>>>>>>>>> process all data, it can close the output port and
the input
>>>>>>>>>> operator
>>>>>>>>>> may
>>>>>>>>>> be de-allocated. Once the aggregator receives EOS
on it's input
>>>>>>>>>> port,
>>>>>>>>>> it
>>>>>>>>>> should open the output port and start writing to
it. At this point,
>>>>>>>>>> the
>>>>>>>>>> output operator needs to be deployed and the stream
between the
>>>>>>>>>> last
>>>>>>>>>> two
>>>>>>>>>> operators (aggregator and output) becomes active.
>>>>>>>>>>
>>>>>>>>>> In a real batch use case, it is preferable to have
full application
>>>>>>>>>> DAG
>>>>>>>>>> to
>>>>>>>>>> be statically defined and delegate to platform
>>>>>>>>>> activation/de-activation
>>>>>>>>>> of
>>>>>>>>>> stages. It is also preferable not to write intermediate
files to
>>>>>>>>>> disk/HDFS,
>>>>>>>>>> but instead pass data in-memory.
>>>>>>>>>>
>>>>>>>>>> Thank you,
>>>>>>>>>>
>>>>>>>>>> Vlad
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>>>>>>>
>>>>>>>>>> You would need to provide more specifics of the use
case you are
>>>>>>>>>> thinking
>>>>>>>>>>
>>>>>>>>>> to address to make this a meaningful discussion.
>>>>>>>>>>
>>>>>>>>>>> An example for APEXCORE-408 (based on real batch
use case): I have
>>>>>>>>>>> two
>>>>>>>>>>> stages, first stage produces a set of files that
second stage
>>>>>>>>>>> needs
>>>>>>>>>>> as
>>>>>>>>>>> input. Stage 1 operators to be released and stage
2 operators
>>>>>>>>>>> deployed
>>>>>>>>>>> when
>>>>>>>>>>> stage 2 starts. These can be independent operators,
they don't
>>>>>>>>>>> need
>>>>>>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>> connected through a stream.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov <
>>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> It is not about a use case difference. My proposal
and
>>>>>>>>>>> APEXCORE-408
>>>>>>>>>>>
>>>>>>>>>>> address the same use case - how to re-allocate
resources for batch
>>>>>>>>>>>
>>>>>>>>>>> applications or applications where processing
happens in stages.
>>>>>>>>>>>> The
>>>>>>>>>>>> difference between APEXCORE-408 and the proposal
is shift in
>>>>>>>>>>>> complexity
>>>>>>>>>>>> from application logic to the platform. IMO,
supporting batch
>>>>>>>>>>>> applications
>>>>>>>>>>>> using APEXCORE-408 will require more coding
on the application
>>>>>>>>>>>> side.
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>
>>>>>>>>>>>> Vlad
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> I think this needs more input on a use case
level. The ability to
>>>>>>>>>>>>
>>>>>>>>>>>> dynamically alter the DAG internally will
also address the
>>>>>>>>>>>> resource
>>>>>>>>>>>>
>>>>>>>>>>>> allocation for operators:
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>>>>>>>
>>>>>>>>>>>>> It can be used to implement stages of
a batch pipeline and is
>>>>>>>>>>>>> very
>>>>>>>>>>>>> flexible
>>>>>>>>>>>>> in general. Considering the likely implementation
complexity for
>>>>>>>>>>>>> the
>>>>>>>>>>>>> proposed feature I would like to understand
what benefits it
>>>>>>>>>>>>> provides
>>>>>>>>>>>>> to
>>>>>>>>>>>>> the user (use cases that cannot be addressed
otherwise)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM, Vlad
Rozov <
>>>>>>>>>>>>> v.rozov@datatorrent.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Correct, a statefull downstream operator
can only be undeployed
>>>>>>>>>>>>> at a
>>>>>>>>>>>>>
>>>>>>>>>>>>> checkpoint window after it consumes all
data emitted by upstream
>>>>>>>>>>>>>
>>>>>>>>>>>>> operator
>>>>>>>>>>>>>
>>>>>>>>>>>>>> on the closed port.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It will be necessary to distinguish
between closed port and
>>>>>>>>>>>>>> inactive
>>>>>>>>>>>>>> stream. After port is closed, stream
may still be active and
>>>>>>>>>>>>>> after
>>>>>>>>>>>>>> port
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> open, stream may still be inactive
(not yet ready).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The more contributors participate
in the discussion and
>>>>>>>>>>>>>> implementation,
>>>>>>>>>>>>>> the more solid the feature will be.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Отправлено с iPhone
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod
Immaneni <
>>>>>>>>>>>>>> pramod@datatorrent.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Generally a good idea. Care should
be taken around fault
>>>>>>>>>>>>>> tolerance
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> idempotency. Close stream would need
to stop accepting new data
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>> can't actually close all the
streams and un-deploy operators
>>>>>>>>>>>>>>> till
>>>>>>>>>>>>>>> committed. Idempotency might
require the close stream to take
>>>>>>>>>>>>>>> effect
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> end of the window. What would
it then mean for re-opening
>>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> within
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> window? Also, looks like a larger
undertaking, as Ram
>>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> good to understand the use cases
and I also suggest that
>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>> folks
>>>>>>>>>>>>>>> participate in the implementation
effort to ensure that we are
>>>>>>>>>>>>>>> able
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> address all the scenarios and
minimize chances of regression
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>> behavior.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12 AM,
Vlad Rozov <
>>>>>>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Currently Apex assumes that an
operator can emit on any
>>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> port and all streams defined
by a DAG are active. I'd like to
>>>>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>> ability for an operator to
open and close output ports. By
>>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>> ports defined by an operator
will be open. In the case an
>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> reason decides that it will
not emit tuples on the output
>>>>>>>>>>>>>>>> port,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> close it. This will make the
stream inactive and the
>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> master
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> undeploy the downstream (for
that input stream) operators. If
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> leads to
>>>>>>>>>>>>>>> containers that don't have any
active operators, those
>>>>>>>>>>>>>>> containers
>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> undeployed as well leading to
better cluster resource
>>>>>>>>>>>>>>> utilization
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> better Apex elasticity. Later,
the operator may be in a state
>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>> needs to emit tuples on the
closed port. In this case, it
>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> re-open
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the port and wait till the
stream becomes active again before
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> emitting
>>>>>>>>>>>>>>> tuples on that port. Making inactive
stream active again,
>>>>>>>>>>>>>>> requires
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> application master to re-allocate
containers and re-deploy
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> operators.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It should be also possible
for an application designer to
>>>>>>>>>>>>>>> mark
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> inactive when an application
starts. This will allow the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>> master
>>>>>>>>>>>>>>> avoid reserving all containers
when the application starts.
>>>>>>>>>>>>>>> Later,
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> port
>>>>>>>>>>>>>>> can be open and inactive stream
become active.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>


Mime
View raw message