apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Siyuan Hua <siy...@datatorrent.com>
Subject Re: Java Stream API Pull Request
Date Sat, 30 Apr 2016 00:44:53 GMT
Sorry, here is the link for the pull request

On Fri, Apr 29, 2016 at 5:43 PM, Siyuan Hua <siyuan@datatorrent.com> wrote:

> Hi Community,
> Happy Friday!
> I just sent out an initial pull request for Java High Level Stream API.
> This is the very first attempt to bring the functional paradigm into Apex
> programming model. We will keep working on this for months, if you are
> interested, please take time to have a look and make comments. Any
> suggestions are welcome. Thanks!
> For those who are not familiar with the idea, here is some write-up
> First iteration of Java Stream API.
> Java Stream API is following the popular functional programming paradigm
> to construct an Apex Application.
> The goal for this API is:
>    - Easy to construct a dag
>    - Easy to migrate other streaming application to Apex
>    - Fully compatible with existing DAG API
>    - Provide useful build-in transformations with abstracted pluggable
>    components in one place
> To achieve the goal and split the work, we categorize all different kind
> of transformations into 2 different types:
>    - 1 input, 1+ output (map, filter, flatmap);
>    - Multiple input, 1 output (Aggregations, Joins, Unions)
> This first iteration is only about the first category, which is, 1 in, 1+
> out. For transformations like this, it is just like distributed function
> call. So we abstract out some function types instead of operators.
> Internally, there are some pre-build function operators which wrap the
> function and connect together.
> The core interface is the ApexStream. The ApexStream is designed in a
> method chain fashion, which all transformation method returns a new
> ApexStream object with new output type.
> Here are some examples, if you want to do a filter then a map, you can do
> stream.filter(new FilterFunction())
> .map(new MapFunction()).
> You can also mix this with existing operator API. For example, if you want
> to add a operator after map, you can do this
> stream.filter(..)
> .map(..)
> .addOperator(opt, opt.input, opt.output)
> (ps: the opt.input here is to connect to the output of last stream and
> opt.output is going to be connected to the next)
> If you want to set the locality or attributes for operator/ports/dag, you
> can use *with* clause, for example you want filter and map to be
> container local and you want to set checkpoint window count for the new
> operator you just added, you can do something like this
> stream.filter(..)
> .map(..).with(Locality.CONTAINER_LOCAL)
> .addOperator(..).with(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5)
> .with(someProp, someVal)(ps:engine will figure out which
> operator/ports/dag this attribute applies to)`
> Like the dag API, you can run the stream in a distributed mode or local
> mode, For example,
> stream...populateDag(dag) —distributed mode
> stream...runLocally() —local mode
> The stream is implemented in a lazy build mode, which means until you call
> populateDag or run**, all the transformations and the order of them will
> be kept in memory in a graph data structure (*DagMeta**). This will allow
> us to solve some technical difficulties such as logical plan optimization
> etc.
> Roadmap for next phase
> Mainly for the 2nd category we mentioned above, to achieve that, we will
> support the following features in the next iteration
>    - Watermark - Ingestion time watermark / watermark from tuple
>    - Early Triggers - How frequent to emit real-time partial result
>    - Late Triggers - When to emit updated result with tuple comes after
>    watermark
>    - Spool state - When in memory aggregation run out of memory and how
>    - 3 different aggregation recovery: ignore, accumulation, accumulation
>    + delta
>    - Window support, sliding window, moving window, session window base
>    on 3 different tuple time
>    - 3 tuple time support: event time, system time, ingestion time
> ​

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message