samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Milinda Pathirage" <mili...@apache.org>
Subject Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes
Date Wed, 05 Aug 2015 18:56:07 GMT


> On Aug. 5, 2015, 5:34 p.m., Milinda Pathirage wrote:
> > I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java)
to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts.
> > 
> > * I agree with Guozhang that we should first focus on simple use cases and I think
we should not try to integrate support for building complex DAGs which contains multiple complex
queries via this builder API.
> > * IMHO, TopologyBuilder is closer to query execution than to the query. And if we
need people to compose SQL queries through a Java API, its better to have an API similar to
jOOQ (http://www.jooq.org) for streaming SQL.
> > * AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query
plans because SQL operators always has one output (@Yi please correct me if I am wrong).
> > * IMHO, supporting something similar to views through the builder API may be useful.
We can allow to refer the result from builder (may be not through *build* method but via method
like *buildView*) method as inputs to the other queries to facilitate this .
> > 
> > So I'm proposing builder similar to following based on Calcite's RelBuilder API:
> > 
> > ```java
> > TopologyBuilder builder = TopologyBuilder.create(..);
> > 
> > OperatorRouter router = builder.scan("stream1")
> >                           .window(10, 2)
> >                           .aggregate(builder.groupKey(...), builder.aggregateCall(...),
...)
> >                           .scan("stream2")
> >                           .window(10, 2)
> >                           .aggregate(builder.groupKey(...), builder.aggregateCall(...),
...)
> >                           .join(JoinType.INNER, builder.condition(...))
> >                           .scan("stream2")
> >                           .project(..)
> >                           .window(10, 2)
> >                           .join(joinType, condition)
> >                           .partition(partionKey, number)
> >                           .modify(Operation.INSERT, ..)
> > ```
> > 
> > * In above mentioned API, *beginStream* is renamed to *scan* to take to API closer
to physical plan.
> > * *scan* in the middle means a start of a new input or input sub-query
> > * *join* takes last two sub-trees (sub-queries) as inputs
> > * *modify* is used to insert/update tuples to streams or tables
> > * Builder will provide utility methods to create conditions, function calls, aggregates
and ```GROUP BY``` clauses.
> > * Above assumes that there is no multi-output operators.
> > * Reusable sub-queries are not present in the above example, I'll think about it
and introduce a mechanism to re-use sub-queries (Possibly introducing the view concept)
> > 
> > Please feel free to comment on this.

Instead of group keys, aggregate calls or conditions we can directly take OperatorSpec instances,
given that OperatorSpecs already encapsulate all the things necessary for an operator.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage,
Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION

>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION

> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message