apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <...@apache.org>
Subject Re: open/close ports and active/inactive streams
Date Mon, 10 Apr 2017 16:07:39 GMT
Yes, but source is not important for the resource allocation aspect because
it is a reader that does not hold a lot of resources. The big ticket items
are join and topN, and they need to be allocated at the same time if you
don't have a swap space.


On Mon, Apr 10, 2017 at 8:56 AM, Vlad Rozov <v.rozov@datatorrent.com> wrote:

> For the second pipeline, source can be de-allocated as soon as join gets
> all data and join can be de-allocated as soon as topN gets all data. Note
> that topN (and sink) does not need to be allocated before join starts
> emitting data.
>
> Thank you,
>
> Vlad
>
>
> On 4/10/17 08:48, Thomas Weise wrote:
>
>> The pipeline depends on the resource availability. It could be:
>>
>> ( source -> join -> writer ) - - -> ( reader -> topN -> sink)
>>
>> or
>>
>> (source -> join -> topN -> sink)
>>
>> The second case does not allow you do deallocate join (join and topN are
>> active at the same time).
>>
>>
>> On Mon, Apr 10, 2017 at 8:37 AM, Vlad Rozov <v.rozov@datatorrent.com>
>> wrote:
>>
>> It is important. The generic pipeline proposed is (... -> writer) --->
>>> (reader -> join -> writer) ---> (reader -> ...), where reader->
>>> aggregator
>>> -> writer becomes a common pattern for a single stage processing.
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>>
>>> On 4/10/17 08:31, Thomas Weise wrote:
>>>
>>> Where the data comes from isn't important for this discussion. The
>>>> scenario
>>>> is join -> topN
>>>>
>>>> With intermediate files it is: ( join -> writer ) - - -> ( reader ->
>>>> topN
>>>> )
>>>>
>>>>
>>>> On Mon, Apr 10, 2017 at 8:26 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>> wrote:
>>>>
>>>> In your example join is both consumer and producer, is not it? Where
>>>> does
>>>>
>>>>> it get data from? Join is not an input operator.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>> On 4/10/17 08:13, Thomas Weise wrote:
>>>>>
>>>>> In this example join/writer produces the data, reader/topN consumes.
>>>>> You
>>>>>
>>>>>> cannot deallocate producer before all data has been drained. When
>>>>>> using
>>>>>> files, join/writer can be deallocated when all data was flushed to
the
>>>>>> files and allocation of consumer can wait until that occurred, if
the
>>>>>> space
>>>>>> isn't available to have both of them active at same time.
>>>>>>
>>>>>> Overall it seems this is not a matter of activating/deactivating
>>>>>> streams
>>>>>> but operators.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 10, 2017 at 8:05 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>> With additional file readers/writers the pipeline of a single stage
>>>>>>
>>>>>> becomes the 3 operator use case I described. With ability to
>>>>>>> open/close
>>>>>>> ports, platform can optimize it by re-allocating resources from
>>>>>>> readers
>>>>>>> to
>>>>>>> writers.
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Vlad
>>>>>>>
>>>>>>>
>>>>>>> On 4/10/17 07:44, Thomas Weise wrote:
>>>>>>>
>>>>>>> In streaming there is a stream (surprise), in a space constraint
>>>>>>> batch
>>>>>>>
>>>>>>> case, we can have additional file writers/readers between the
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> Modules can in fact be used to support pipeline reuse, but
they must
>>>>>>>> be
>>>>>>>> added/removed dynamically to support stages with on-demand
resource
>>>>>>>> allocation.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 10, 2017 at 7:37 AM, Vlad Rozov <
>>>>>>>> v.rozov@datatorrent.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Do you suggest that in a streaming use case join operator
also pass
>>>>>>>> data
>>>>>>>>
>>>>>>>> to downstream using files or that there are two different
join
>>>>>>>>
>>>>>>>>> operators
>>>>>>>>> one for streaming and one for batch? If not, it means
that the join
>>>>>>>>> operator needs to emit data to a separate file output
operator, so
>>>>>>>>> it
>>>>>>>>> still
>>>>>>>>> needs to read data from a temporary space before emitting,
why not
>>>>>>>>> to
>>>>>>>>> emit
>>>>>>>>> directly to topN in this case?
>>>>>>>>>
>>>>>>>>> Is not pipeline reuse already supported by Apex modules?
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>>
>>>>>>>>> Vlad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 4/10/17 06:59, Thomas Weise wrote:
>>>>>>>>>
>>>>>>>>> I don't think this fully covers the the scenario of limited
>>>>>>>>> resources.
>>>>>>>>>
>>>>>>>>> You
>>>>>>>>>
>>>>>>>>>> describe a case of 3 operators, but when you consider
just 2
>>>>>>>>>> operators
>>>>>>>>>> that
>>>>>>>>>> both have to hold a large data set in memory, then
the suggested
>>>>>>>>>> approach
>>>>>>>>>> won't work. Let's say the first operator is outer
join and the
>>>>>>>>>> second
>>>>>>>>>> operator topN. Both are blocking and cannot emit
before all input
>>>>>>>>>> is
>>>>>>>>>> seen.
>>>>>>>>>>
>>>>>>>>>> To deallocate the outer join, all results need to
be drained.
>>>>>>>>>> It's a
>>>>>>>>>> resource swap and you need a temporary space to hold
the data.
>>>>>>>>>> Also,
>>>>>>>>>> if
>>>>>>>>>> the
>>>>>>>>>> requirement is to be able to recover and retry from
results of
>>>>>>>>>> stage
>>>>>>>>>> one,
>>>>>>>>>> then you need a fault tolerant swap space. If the
cluster does not
>>>>>>>>>> have
>>>>>>>>>> enough memory, then disk is a good option (SLA vs.
memory
>>>>>>>>>> tradeoff).
>>>>>>>>>>
>>>>>>>>>> I would also suggest to think beyond the single DAG
scenario.
>>>>>>>>>> Often
>>>>>>>>>> users
>>>>>>>>>> need to define pipelines that are composed of multiple
smaller
>>>>>>>>>> flows
>>>>>>>>>> (which
>>>>>>>>>> they may also want to reuse in multiple pipelines).
APEXCORE-408
>>>>>>>>>> gives
>>>>>>>>>> you
>>>>>>>>>> an option to compose such flows within a single Apex
application,
>>>>>>>>>> in
>>>>>>>>>> addition of covering the simplified use case that
we discuss
>>>>>>>>>> there.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 6, 2017 at 5:52 PM, Vlad Rozov <
>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> It is exactly the same use case with the exception
that it is not
>>>>>>>>>>
>>>>>>>>>> necessary to write data to files. Consider 3 operators,
an input
>>>>>>>>>>
>>>>>>>>>> operator,
>>>>>>>>>>> an aggregate operator and an output operator.
When the
>>>>>>>>>>> application
>>>>>>>>>>> starts,
>>>>>>>>>>> the output port of the aggregate operator should
be in the closed
>>>>>>>>>>> state,
>>>>>>>>>>> the stream between the second and the third would
be inactive and
>>>>>>>>>>> the
>>>>>>>>>>> output operator does not need to be allocated.
After the input
>>>>>>>>>>> operator
>>>>>>>>>>> process all data, it can close the output port
and the input
>>>>>>>>>>> operator
>>>>>>>>>>> may
>>>>>>>>>>> be de-allocated. Once the aggregator receives
EOS on it's input
>>>>>>>>>>> port,
>>>>>>>>>>> it
>>>>>>>>>>> should open the output port and start writing
to it. At this
>>>>>>>>>>> point,
>>>>>>>>>>> the
>>>>>>>>>>> output operator needs to be deployed and the
stream between the
>>>>>>>>>>> last
>>>>>>>>>>> two
>>>>>>>>>>> operators (aggregator and output) becomes active.
>>>>>>>>>>>
>>>>>>>>>>> In a real batch use case, it is preferable to
have full
>>>>>>>>>>> application
>>>>>>>>>>> DAG
>>>>>>>>>>> to
>>>>>>>>>>> be statically defined and delegate to platform
>>>>>>>>>>> activation/de-activation
>>>>>>>>>>> of
>>>>>>>>>>> stages. It is also preferable not to write intermediate
files to
>>>>>>>>>>> disk/HDFS,
>>>>>>>>>>> but instead pass data in-memory.
>>>>>>>>>>>
>>>>>>>>>>> Thank you,
>>>>>>>>>>>
>>>>>>>>>>> Vlad
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 4/6/17 09:37, Thomas Weise wrote:
>>>>>>>>>>>
>>>>>>>>>>> You would need to provide more specifics of the
use case you are
>>>>>>>>>>> thinking
>>>>>>>>>>>
>>>>>>>>>>> to address to make this a meaningful discussion.
>>>>>>>>>>>
>>>>>>>>>>> An example for APEXCORE-408 (based on real batch
use case): I
>>>>>>>>>>>> have
>>>>>>>>>>>> two
>>>>>>>>>>>> stages, first stage produces a set of files
that second stage
>>>>>>>>>>>> needs
>>>>>>>>>>>> as
>>>>>>>>>>>> input. Stage 1 operators to be released and
stage 2 operators
>>>>>>>>>>>> deployed
>>>>>>>>>>>> when
>>>>>>>>>>>> stage 2 starts. These can be independent
operators, they don't
>>>>>>>>>>>> need
>>>>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>> connected through a stream.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Apr 6, 2017 at 9:21 AM, Vlad Rozov
<
>>>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> It is not about a use case difference. My
proposal and
>>>>>>>>>>>> APEXCORE-408
>>>>>>>>>>>>
>>>>>>>>>>>> address the same use case - how to re-allocate
resources for
>>>>>>>>>>>> batch
>>>>>>>>>>>>
>>>>>>>>>>>> applications or applications where processing
happens in stages.
>>>>>>>>>>>>
>>>>>>>>>>>>> The
>>>>>>>>>>>>> difference between APEXCORE-408 and the
proposal is shift in
>>>>>>>>>>>>> complexity
>>>>>>>>>>>>> from application logic to the platform.
IMO, supporting batch
>>>>>>>>>>>>> applications
>>>>>>>>>>>>> using APEXCORE-408 will require more
coding on the application
>>>>>>>>>>>>> side.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 4/5/17 21:57, Thomas Weise wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think this needs more input on a use
case level. The ability
>>>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>>>>>>>> dynamically alter the DAG internally
will also address the
>>>>>>>>>>>>> resource
>>>>>>>>>>>>>
>>>>>>>>>>>>> allocation for operators:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/APEXCORE-408
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It can be used to implement stages
of a batch pipeline and is
>>>>>>>>>>>>>> very
>>>>>>>>>>>>>> flexible
>>>>>>>>>>>>>> in general. Considering the likely
implementation complexity
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> proposed feature I would like to
understand what benefits it
>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> the user (use cases that cannot be
addressed otherwise)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 12:23 PM,
Vlad Rozov <
>>>>>>>>>>>>>> v.rozov@datatorrent.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Correct, a statefull downstream operator
can only be
>>>>>>>>>>>>>> undeployed
>>>>>>>>>>>>>> at a
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> checkpoint window after it consumes
all data emitted by
>>>>>>>>>>>>>> upstream
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> on the closed port.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It will be necessary to distinguish
between closed port and
>>>>>>>>>>>>>>> inactive
>>>>>>>>>>>>>>> stream. After port is closed,
stream may still be active and
>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>> port
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> open, stream may still be inactive
(not yet ready).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The more contributors participate
in the discussion and
>>>>>>>>>>>>>>> implementation,
>>>>>>>>>>>>>>> the more solid the feature will
be.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Отправлено с iPhone
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Apr 1, 2017, at 11:03, Pramod
Immaneni <
>>>>>>>>>>>>>>> pramod@datatorrent.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Generally a good idea. Care should
be taken around fault
>>>>>>>>>>>>>>> tolerance
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> idempotency. Close stream would
need to stop accepting new
>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> can't actually close all
the streams and un-deploy operators
>>>>>>>>>>>>>>>> till
>>>>>>>>>>>>>>>> committed. Idempotency might
require the close stream to
>>>>>>>>>>>>>>>> take
>>>>>>>>>>>>>>>> effect
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> end of the window. What would
it then mean for re-opening
>>>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> within
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> window? Also, looks like
a larger undertaking, as Ram
>>>>>>>>>>>>>>>> suggested
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> good to understand the use cases
and I also suggest that
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>> folks
>>>>>>>>>>>>>>>> participate in the implementation
effort to ensure that we
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> able
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> address all the scenarios
and minimize chances of regression
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>> behavior.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Apr 1, 2017 at 8:12
AM, Vlad Rozov <
>>>>>>>>>>>>>>>> v.rozov@datatorrent.com
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Currently Apex assumes that
an operator can emit on any
>>>>>>>>>>>>>>>> defined
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> port and all streams defined
by a DAG are active. I'd like
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> ability for an operator
to open and close output ports. By
>>>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>> ports defined by an operator
will be open. In the case an
>>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> reason decides that it
will not emit tuples on the output
>>>>>>>>>>>>>>>>> port,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> close it. This will make
the stream inactive and the
>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> master
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> undeploy the downstream
(for that input stream) operators.
>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> leads to
>>>>>>>>>>>>>>>> containers that don't have
any active operators, those
>>>>>>>>>>>>>>>> containers
>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> undeployed as well leading
to better cluster resource
>>>>>>>>>>>>>>>> utilization
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> better Apex elasticity. Later,
the operator may be in a
>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>> needs to emit tuples
on the closed port. In this case, it
>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> re-open
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the port and wait till
the stream becomes active again
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> emitting
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> tuples on that port. Making
inactive stream active again,
>>>>>>>>>>>>>>>> requires
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> application master to re-allocate
containers and re-deploy
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> downstream
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> operators.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It should be also possible
for an application designer to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> mark
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> inactive when an application
starts. This will allow the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> master
>>>>>>>>>>>>>>>> avoid reserving all containers
when the application starts.
>>>>>>>>>>>>>>>> Later,
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> port
>>>>>>>>>>>>>>>> can be open and inactive
stream become active.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Vlad
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message