Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8A84A200C83 for ; Sun, 14 May 2017 07:52:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 88EB7160BBB; Sun, 14 May 2017 05:52:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8102B160BC6 for ; Sun, 14 May 2017 07:52:46 +0200 (CEST) Received: (qmail 60040 invoked by uid 500); 14 May 2017 05:52:45 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 60020 invoked by uid 99); 14 May 2017 05:52:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 14 May 2017 05:52:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F2071E01BC; Sun, 14 May 2017 05:52:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jbonofre@apache.org To: commits@beam.apache.org Date: Sun, 14 May 2017 05:52:44 -0000 Message-Id: <0c940e0819b648eca272fe58a350d70c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-2234] Change return type of buildBeamPipeline to `PCollection` archived-at: Sun, 14 May 2017 05:52:47 -0000 Repository: beam Updated Branches: refs/heads/DSL_SQL 8bb59840b -> 868bcbdad [BEAM-2234] Change return type of buildBeamPipeline to `PCollection` Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d74a5c34 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d74a5c34 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d74a5c34 Branch: refs/heads/DSL_SQL Commit: d74a5c34f0732825c052d4c32096e5b91096a72c Parents: 8bb5984 Author: mingmxu Authored: Fri May 12 21:14:29 2017 -0700 Committer: Jean-Baptiste Onofré Committed: Sun May 14 07:51:49 2017 +0200 ---------------------------------------------------------------------- .../beam/dsls/sql/planner/BeamPipelineCreator.java | 15 --------------- .../beam/dsls/sql/planner/BeamQueryPlanner.java | 4 +++- .../beam/dsls/sql/rel/BeamAggregationRel.java | 12 +++++------- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 14 ++++++-------- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 17 ++++++++--------- .../apache/beam/dsls/sql/rel/BeamIOSourceRel.java | 8 +++----- .../apache/beam/dsls/sql/rel/BeamProjectRel.java | 13 +++++-------- .../org/apache/beam/dsls/sql/rel/BeamRelNode.java | 11 +++++------ 8 files changed, 35 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java index 1d7cfd1..98ccb57 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java @@ -19,9 +19,6 @@ package org.apache.beam.dsls.sql.planner; import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.beam.dsls.sql.rel.BeamRelNode; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; @@ -32,7 +29,6 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; /** * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam @@ -41,7 +37,6 @@ import org.apache.beam.sdk.values.PCollection; */ public class BeamPipelineCreator { private Map sourceTables; - private Queue> upStreamQueue; private PipelineOptions options; @@ -56,22 +51,12 @@ public class BeamPipelineCreator { .as(PipelineOptions.class); // FlinkPipelineOptions.class options.setJobName("BeamPlanCreator"); - upStreamQueue = new ConcurrentLinkedQueue<>(); - pipeline = Pipeline.create(options); CoderRegistry cr = pipeline.getCoderRegistry(); cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); } - public PCollection popUpstream() { - return upStreamQueue.poll(); - } - - public void pushUpstream(PCollection upstream) { - this.upStreamQueue.add(upstream); - } - public Map getSourceTables() { return sourceTables; } http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 935dae7..29b3f1d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -112,7 +112,9 @@ public class BeamQueryPlanner { BeamRelNode relNode = convertToBeamRel(sqlStatement); BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables); - return relNode.buildBeamPipeline(planCreator); + relNode.buildBeamPipeline(planCreator); + + return planCreator.getPipeline(); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index ab98cc4..3e147aa 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -68,13 +67,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); if (windowFieldIdx != -1) { upstream = upstream.apply("assignEventTimestamp", WithTimestamps .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx))); @@ -105,9 +104,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { PCollection mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( BeamSQLRecordType.from(getRowType()), getAggCallList()))); - planCreator.pushUpstream(mergedStream); - return planCreator.getPipeline(); + return mergedStream; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java index e1c5b3e..f2c1bba 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; @@ -49,23 +48,22 @@ public class BeamFilterRel extends Filter implements BeamRelNode { } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = BeamSQLRelUtils.getBeamRelInput(input) + .buildBeamPipeline(planCreator); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); - PCollection projectStream = upstream.apply(stageName, + PCollection filterStream = upstream.apply(stageName, ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor))); - planCreator.pushUpstream(projectStream); - - return planCreator.getPipeline(); + return filterStream; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index f38b9e1..bc94ab8 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; @@ -52,15 +51,17 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened()); } + /** + * Note that {@code BeamIOSinkRel} returns the input PCollection. + */ @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { - + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = + BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); @@ -68,9 +69,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { upstream.apply(stageName, targetTable.buildIOWriter()); - planCreator.setHasPersistent(true); - - return planCreator.getPipeline(); + return upstream; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index 3538273..61f53eb 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; @@ -41,7 +40,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", ""); @@ -52,9 +52,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { PCollection sourceStream = planCreator.getPipeline().apply(stageName, sourceTable.buildIOReader()); - planCreator.pushUpstream(sourceStream); - - return planCreator.getPipeline(); + return sourceStream; } } http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java index 65f5b20..954868d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java @@ -26,7 +26,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.plan.RelOptCluster; @@ -61,22 +60,20 @@ public class BeamProjectRel extends Project implements BeamRelNode { } @Override - public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception { + public PCollection buildBeamPipeline(BeamPipelineCreator planCreator) + throws Exception { RelNode input = getInput(); - BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator); - String stageName = BeamSQLRelUtils.getStageName(this); - PCollection upstream = planCreator.popUpstream(); + PCollection upstream = BeamSQLRelUtils.getBeamRelInput(input) + .buildBeamPipeline(planCreator); BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this); PCollection projectStream = upstream.apply(stageName, ParDo .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType)))); - planCreator.pushUpstream(projectStream); - - return planCreator.getPipeline(); + return projectStream; } http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java index e50d71a..ff2b5b6 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java @@ -18,7 +18,8 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.planner.BeamPipelineCreator; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.RelNode; /** @@ -29,10 +30,8 @@ import org.apache.calcite.rel.RelNode; public interface BeamRelNode extends RelNode { /** - * A {@link BeamRelNode} is a recursive structure, the - * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search) - * algorithm. - * + * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream, + * and generate an output {@code PCollection}. */ - Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; + PCollection buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception; }