beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2234] Change return type of buildBeamPipeline to `PCollection<BeamSQLRow>`
Date Sun, 14 May 2017 05:52:44 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 8bb59840b -> 868bcbdad


[BEAM-2234] Change return type of buildBeamPipeline to `PCollection<BeamSQLRow>`


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d74a5c34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d74a5c34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d74a5c34

Branch: refs/heads/DSL_SQL
Commit: d74a5c34f0732825c052d4c32096e5b91096a72c
Parents: 8bb5984
Author: mingmxu <mingmxu@ebay.com>
Authored: Fri May 12 21:14:29 2017 -0700
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Sun May 14 07:51:49 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/planner/BeamPipelineCreator.java | 15 ---------------
 .../beam/dsls/sql/planner/BeamQueryPlanner.java    |  4 +++-
 .../beam/dsls/sql/rel/BeamAggregationRel.java      | 12 +++++-------
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java    | 14 ++++++--------
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java    | 17 ++++++++---------
 .../apache/beam/dsls/sql/rel/BeamIOSourceRel.java  |  8 +++-----
 .../apache/beam/dsls/sql/rel/BeamProjectRel.java   | 13 +++++--------
 .../org/apache/beam/dsls/sql/rel/BeamRelNode.java  | 11 +++++------
 8 files changed, 35 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 1d7cfd1..98ccb57 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -19,9 +19,6 @@ package org.apache.beam.dsls.sql.planner;
 
 import java.util.Map;
 
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -32,7 +29,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.values.PCollection;
 
 /**
  * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam
@@ -41,7 +37,6 @@ import org.apache.beam.sdk.values.PCollection;
  */
 public class BeamPipelineCreator {
   private Map<String, BaseBeamTable> sourceTables;
-  private Queue<PCollection<BeamSQLRow>> upStreamQueue;
 
   private PipelineOptions options;
 
@@ -56,22 +51,12 @@ public class BeamPipelineCreator {
         .as(PipelineOptions.class); // FlinkPipelineOptions.class
     options.setJobName("BeamPlanCreator");
 
-    upStreamQueue = new ConcurrentLinkedQueue<>();
-
     pipeline = Pipeline.create(options);
     CoderRegistry cr = pipeline.getCoderRegistry();
     cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
     cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
   }
 
-  public PCollection<BeamSQLRow> popUpstream() {
-    return upStreamQueue.poll();
-  }
-
-  public void pushUpstream(PCollection<BeamSQLRow> upstream) {
-    this.upStreamQueue.add(upstream);
-  }
-
   public Map<String, BaseBeamTable> getSourceTables() {
     return sourceTables;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 935dae7..29b3f1d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -112,7 +112,9 @@ public class BeamQueryPlanner {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
     BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables);
-    return relNode.buildBeamPipeline(planCreator);
+    relNode.buildBeamPipeline(planCreator);
+
+    return planCreator.getPipeline();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index ab98cc4..3e147aa 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -68,13 +67,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode
{
   }
 
   @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+      throws Exception {
     RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+    PCollection<BeamSQLRow> upstream =
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
     if (windowFieldIdx != -1) {
       upstream = upstream.apply("assignEventTimestamp", WithTimestamps
           .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)));
@@ -105,9 +104,8 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode
{
     PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
             BeamSQLRecordType.from(getRowType()), getAggCallList())));
-    planCreator.pushUpstream(mergedStream);
 
-    return planCreator.getPipeline();
+    return mergedStream;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index e1c5b3e..f2c1bba 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptCluster;
@@ -49,23 +48,22 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
   }
 
   @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+      throws Exception {
 
     RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
+        .buildBeamPipeline(planCreator);
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
-    PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
+    PCollection<BeamSQLRow> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
 
-    planCreator.pushUpstream(projectStream);
-
-    return planCreator.getPipeline();
+    return filterStream;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index f38b9e1..bc94ab8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -52,15 +51,17 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode
{
         getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
   }
 
+  /**
+   * Note that {@code BeamIOSinkRel} returns the input PCollection.
+   */
   @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
+  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+      throws Exception {
     RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+    PCollection<BeamSQLRow> upstream =
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
@@ -68,9 +69,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
 
     upstream.apply(stageName, targetTable.buildIOWriter());
 
-    planCreator.setHasPersistent(true);
-
-    return planCreator.getPipeline();
+    return upstream;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 3538273..61f53eb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -23,7 +23,6 @@ import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -41,7 +40,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
   }
 
   @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+      throws Exception {
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)",
"");
 
@@ -52,9 +52,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
     PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
         sourceTable.buildIOReader());
 
-    planCreator.pushUpstream(sourceStream);
-
-    return planCreator.getPipeline();
+    return sourceStream;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 65f5b20..954868d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -26,7 +26,6 @@ import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.plan.RelOptCluster;
@@ -61,22 +60,20 @@ public class BeamProjectRel extends Project implements BeamRelNode {
   }
 
   @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
+  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+      throws Exception {
     RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = planCreator.popUpstream();
+    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
+        .buildBeamPipeline(planCreator);
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
 
-    planCreator.pushUpstream(projectStream);
-
-    return planCreator.getPipeline();
+    return projectStream;
 
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d74a5c34/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index e50d71a..ff2b5b6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -18,7 +18,8 @@
 package org.apache.beam.dsls.sql.rel;
 
 import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.RelNode;
 
 /**
@@ -29,10 +30,8 @@ import org.apache.calcite.rel.RelNode;
 public interface BeamRelNode extends RelNode {
 
   /**
-   * A {@link BeamRelNode} is a recursive structure, the
-   * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
-   * algorithm.
-   *
+   * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream,
+   * and generate an output {@code PCollection}.
    */
-  Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
+  PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) throws
Exception;
 }


Mime
View raw message