samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>
Subject Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API
Date Mon, 24 Aug 2015 08:35:52 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


Hi, Milinda, sorry for the late review. I have put down my comments below. Overall, there
are two things to be discussed:
1) Adding OperatorBuilder interface as well. It serves two purposes:
   a) I remember that we have discussed the need for this due to the fact that in the parsing/planning
phase, there are cases where the required parameters for the operator are not generated /
finalized yet (hence you have added some setter functions in OperatorSpec as workaround).
W/ OperatorBuilder, it is much easier that we just keep setting the parameters w/o calling
build()
   b) In the user code directly using operator layer API, using OperatorBuilder can help to
make the TopologyBuilder code more intuitive and helps to hide away all unnecessary specs
s.t. intermediate stream/table names and/or operator names
2) The implementation details of TopologyBuilder. I would prefer still keep a graph-based
implementation of TopologyBuilder internally, instead of a stack-based implementation, due
to the flexible representation the graph-based implementation is able to. At the API, we should
first focus on DAG-like operators. However, I would prefer to keep the implementation flexible
to avoid having to re-write the TopologyBuilder class later, when we need to support non-DAG-like
operators. p.s. It would be good if you can modify the example tasks using the fluent-style
APIs to illustrate how the user experience is. And w/ the help from OperatorBuilder, the TopologyBuilder
implementaion can achieve this: if user does not specify the input/output streams/tables (like
in DAG-like operators), TopologyBuilder should be able to figure out and generate the intermediate
streams/tables names and connect the operators via those intermediate streams/tables. This
is a step we must do anyways for DAG-like oper
 ators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables
are created as vertices in the graph and operators are now connected to those vertices if
they consume from those streams/tables. This is a simple extension from the DAG model that
does not need structure-change in the TopologyBuilder.

Just my two cents. Thanks!


samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 161)
<https://reviews.apache.org/r/37506/#comment151391>

    My original intention to introduce the anonymous stream here is to represent the intermediate
streams/tables. If we explicitly introduced the intermediate streams and tables in the following
methods, I think that we can drop the anonymous ones.



samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 174)
<https://reviews.apache.org/r/37506/#comment151392>

    Could you elaborate more on what to be fixed here?



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java (line
28)
<https://reviews.apache.org/r/37506/#comment151393>

    Is this going to the interface exposed to users who are writing SQL tasks? It would be
good to think of not using the generic Object class in the interface classes between the Samza
framework vs user code, to follow the spirit in SAMZA-697.



samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java (line
28)
<https://reviews.apache.org/r/37506/#comment151394>

    Same here.



samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java (line 19)
<https://reviews.apache.org/r/37506/#comment151395>

    I think that in the new TopologyBuilder + OperatorBuilder, there is a way to remove the
OperatorSink and OperatorSource interfaces. The main purpose for those interfaces to exist
is the requirement to refer to the partial topology that a) has one output; Or b) has one
input that has not been bound to a system stream/table or an intermediate stream/table. I
have thought about that if we follow an API similar to trident, any immediately connected
operators won't require the sink/source interfaces, and any not-immediately connected operators
will need to connect via a named intermediate stream/table. Hence, removing the need to create
OperatorSink/OperatorSource classes.



samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 
<https://reviews.apache.org/r/37506/#comment151396>

    nit: I still think that a note here stressing the need to get the real "event time" instead
of the message's receive time based on local system is important.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
(line 77)
<https://reviews.apache.org/r/37506/#comment151397>

    Why do we need this? I thought that we can directly produce to the system streams, w/
the isSystemStream flag in the EntityName class?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
(line 100)
<https://reviews.apache.org/r/37506/#comment151398>

    nit: why don't we call it addOperator() directly?



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
(line 133)
<https://reviews.apache.org/r/37506/#comment151399>

    Me neither. I don't see the need to emit the table to a stream either.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
(line 136)
<https://reviews.apache.org/r/37506/#comment151400>

    So, I assume that the stack is used as intermediate context for DAG computation? It works
for computations like algebra. What I am worried about is that when the non-algebra types
of operators (such as split operator in my previous examples, or in a case where one intermediate
result is used by multiple downstream operators as input) are needed, this builder will need
to be completely re-written, due to the strict stack-implementation that limits the types
of computation it can support. I would prefer to have a generic implementation that can support
more than DAG type of computation, but we can keep the API to look like fluent style for DAGs.



samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java (line
16)
<https://reviews.apache.org/r/37506/#comment151401>

    Question: I am not quite sure about why we need this. Is it simply a projection operator
that directly send output to the system streams?



samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java (line 89)
<https://reviews.apache.org/r/37506/#comment151402>

    The goal here is to use the topology builder to generate the query. Can you update the
code here to use the topology builder?



samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java (line 119)
<https://reviews.apache.org/r/37506/#comment151405>

    The previous discussion has led us to the point that we think that using OperatorBuilder
seems to be easier here:
    this.simpleRtr = TopologyBuilder.create()
        .join(OperatorBuilder.window()
            .size(10).source("kafka:inputstream2")
            .setCallback(this.wndCallback),
          OperatorBuilder.window()
            .size(10).source("kafka:inputstream1")
            .setCallback(this.wndCallback),
          OperatorBuilder.join().setJoinFields(new ArrayList<String>() {{ add("key1");
add("key2");}})
        .partition(OperatorBuilder.partition()
            .setPartitionKey("joinKey")
            .setPartitionNum(50)
            .setOutput("kafka:parOutputStrm1"))
        .build()
    
    In which, all intermediate streams that are immediately consumed by the downstream operators
are not named. Only the actual input/output streams are named. And OperatorBuilders are passed
in as parameters to TopologyBuilder, s.t. intermediate stream/table names are generated and
set to the OperatorBuilders within the Topology, w/o users to involved. Also, w/ the OperatorBuilder
model, it would be easier to build a more flexible non-DAG topology later: users can name
the operator's outputs s.t. it can be consumed by multiple downstream operators. I agree that
it should not be the first priority to implement it. But it would be nice to keep the door
open, instead of requiring re-implementing the TopologyBuilder layer later.


- Yi Pan (Data Infrastructure)


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which
demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
fb2aa89 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
0759638 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
c49a822 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
c3d2266 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
cbc84d0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
e66451f 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
56753b6 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
e570897 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
2854aeb 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
cc0aca0 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
b93d789 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
c47eed9 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
d81cc93 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
eec32ea 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
b29838a 
>   samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3

> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message