Return-Path: X-Original-To: apmail-samza-dev-archive@minotaur.apache.org Delivered-To: apmail-samza-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB05F187AE for ; Mon, 24 Aug 2015 08:35:53 +0000 (UTC) Received: (qmail 95346 invoked by uid 500); 24 Aug 2015 08:35:53 -0000 Delivered-To: apmail-samza-dev-archive@samza.apache.org Received: (qmail 95290 invoked by uid 500); 24 Aug 2015 08:35:53 -0000 Mailing-List: contact dev-help@samza.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.apache.org Delivered-To: mailing list dev@samza.apache.org Received: (qmail 95271 invoked by uid 99); 24 Aug 2015 08:35:53 -0000 Received: from reviews-vm.apache.org (HELO reviews.apache.org) (140.211.11.40) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Aug 2015 08:35:53 +0000 Received: from reviews.apache.org (localhost [127.0.0.1]) by reviews.apache.org (Postfix) with ESMTP id C5C8F1DB6AB; Mon, 24 Aug 2015 08:35:52 +0000 (UTC) Content-Type: multipart/alternative; boundary="===============1455380641292157465==" MIME-Version: 1.0 Subject: Re: Review Request 37506: WIP: SAMZA-552 Operator API change: New Builder API From: "Yi Pan (Data Infrastructure)" To: "Navina Ramesh" , "Yi Pan (Data Infrastructure)" Cc: "Milinda Pathirage" , "samza" Date: Mon, 24 Aug 2015 08:35:52 -0000 Message-ID: <20150824083552.13583.36181@reviews.apache.org> X-ReviewBoard-URL: https://reviews.apache.org/ Auto-Submitted: auto-generated Sender: "Yi Pan (Data Infrastructure)" X-ReviewGroup: Samza X-Auto-Response-Suppress: DR, RN, OOF, AutoReply X-ReviewRequest-URL: https://reviews.apache.org/r/37506/ X-Sender: "Yi Pan (Data Infrastructure)" References: <20150816155756.2928.77087@reviews.apache.org> In-Reply-To: <20150816155756.2928.77087@reviews.apache.org> Reply-To: "Yi Pan (Data Infrastructure)" X-ReviewRequest-Repository: samza --===============1455380641292157465== MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 7bit ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/37506/#review96136 ----------------------------------------------------------- Hi, Milinda, sorry for the late review. I have put down my comments below. Overall, there are two things to be discussed: 1) Adding OperatorBuilder interface as well. It serves two purposes: a) I remember that we have discussed the need for this due to the fact that in the parsing/planning phase, there are cases where the required parameters for the operator are not generated / finalized yet (hence you have added some setter functions in OperatorSpec as workaround). W/ OperatorBuilder, it is much easier that we just keep setting the parameters w/o calling build() b) In the user code directly using operator layer API, using OperatorBuilder can help to make the TopologyBuilder code more intuitive and helps to hide away all unnecessary specs s.t. intermediate stream/table names and/or operator names 2) The implementation details of TopologyBuilder. I would prefer still keep a graph-based implementation of TopologyBuilder internally, instead of a stack-based implementation, due to the flexible representation the graph-based implementation is able to. At the API, we should first focus on DAG-like operators. However, I would prefer to keep the implementation flexible to avoid having to re-write the TopologyBuilder class later, when we need to support non-DAG-like operators. p.s. It would be good if you can modify the example tasks using the fluent-style APIs to illustrate how the user experience is. And w/ the help from OperatorBuilder, the TopologyBuilder implementaion can achieve this: if user does not specify the input/output streams/tables (like in DAG-like operators), TopologyBuilder should be able to figure out and generate the intermediate streams/tables names and connect the operators via those intermediate streams/tables. This is a step we must do anyways for DAG-like oper ators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables are created as vertices in the graph and operators are now connected to those vertices if they consume from those streams/tables. This is a simple extension from the DAG model that does not need structure-change in the TopologyBuilder. Just my two cents. Thanks! samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 161) My original intention to introduce the anonymous stream here is to represent the intermediate streams/tables. If we explicitly introduced the intermediate streams and tables in the following methods, I think that we can drop the anonymous ones. samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java (line 174) Could you elaborate more on what to be fixed here? samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java (line 28) Is this going to the interface exposed to users who are writing SQL tasks? It would be good to think of not using the generic Object class in the interface classes between the Samza framework vs user code, to follow the spirit in SAMZA-697. samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java (line 28) Same here. samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java (line 19) I think that in the new TopologyBuilder + OperatorBuilder, there is a way to remove the OperatorSink and OperatorSource interfaces. The main purpose for those interfaces to exist is the requirement to refer to the partial topology that a) has one output; Or b) has one input that has not been bound to a system stream/table or an intermediate stream/table. I have thought about that if we follow an API similar to trident, any immediately connected operators won't require the sink/source interfaces, and any not-immediately connected operators will need to connect via a named intermediate stream/table. Hence, removing the need to create OperatorSink/OperatorSource classes. samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java nit: I still think that a note here stressing the need to get the real "event time" instead of the message's receive time based on local system is important. samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java (line 77) Why do we need this? I thought that we can directly produce to the system streams, w/ the isSystemStream flag in the EntityName class? samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 100) nit: why don't we call it addOperator() directly? samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 133) Me neither. I don't see the need to emit the table to a stream either. samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java (line 136) So, I assume that the stack is used as intermediate context for DAG computation? It works for computations like algebra. What I am worried about is that when the non-algebra types of operators (such as split operator in my previous examples, or in a case where one intermediate result is used by multiple downstream operators as input) are needed, this builder will need to be completely re-written, due to the strict stack-implementation that limits the types of computation it can support. I would prefer to have a generic implementation that can support more than DAG type of computation, but we can keep the API to look like fluent style for DAGs. samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java (line 16) Question: I am not quite sure about why we need this. Is it simply a projection operator that directly send output to the system streams? samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java (line 89) The goal here is to use the topology builder to generate the query. Can you update the code here to use the topology builder? samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java (line 119) The previous discussion has led us to the point that we think that using OperatorBuilder seems to be easier here: this.simpleRtr = TopologyBuilder.create() .join(OperatorBuilder.window() .size(10).source("kafka:inputstream2") .setCallback(this.wndCallback), OperatorBuilder.window() .size(10).source("kafka:inputstream1") .setCallback(this.wndCallback), OperatorBuilder.join().setJoinFields(new ArrayList() {{ add("key1"); add("key2");}}) .partition(OperatorBuilder.partition() .setPartitionKey("joinKey") .setPartitionNum(50) .setOutput("kafka:parOutputStrm1")) .build() In which, all intermediate streams that are immediately consumed by the downstream operators are not named. Only the actual input/output streams are named. And OperatorBuilders are passed in as parameters to TopologyBuilder, s.t. intermediate stream/table names are generated and set to the OperatorBuilders within the Topology, w/o users to involved. Also, w/ the OperatorBuilder model, it would be easier to build a more flexible non-DAG topology later: users can name the operator's outputs s.t. it can be consumed by multiple downstream operators. I agree that it should not be the first priority to implement it. But it would be nice to keep the door open, instead of requiring re-implementing the TopologyBuilder layer later. - Yi Pan (Data Infrastructure) On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/37506/ > ----------------------------------------------------------- > > (Updated Aug. 16, 2015, 3:57 p.m.) > > > Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh. > > > Bugs: SAMZA-552 > https://issues.apache.org/jira/browse/SAMZA-552 > > > Repository: samza > > > Description > ------- > > New proposal for TopologuBuilder API proposed in rb34500 (https://reviews.apache.org/r/34500/). > > * Created a new class called TopologyBuilderV2 instead of changing existing TopologyBuilder > * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two tests which demonstrate the basic usage of the new API > * Window and aggregate related draft APIs are not done yet > * This is a WIP, please feel free to comment on the APIs > * This contains Yi's changes from RB 34500 > > > Diffs > ----- > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 80ba455 > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 1e8f192 > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 7b4d984 > samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java d6f6b57 > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java fb2aa89 > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java 0759638 > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java c49a822 > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java 72a59f2 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java c3d2266 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java cbc84d0 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java e66451f > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java 56753b6 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java e570897 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java 2854aeb > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java cc0aca0 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java b93d789 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java c47eed9 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java d81cc93 > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java eec32ea > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java b29838a > samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java 20dc701 > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 9124e3c > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java 96e96c3 > > Diff: https://reviews.apache.org/r/37506/diff/ > > > Testing > ------- > > ./gradlew :samza-sql-core:test passed > > > Thanks, > > Milinda Pathirage > > --===============1455380641292157465==--