beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: restrict the scope of BeamSqlEnv
Date Thu, 22 Jun 2017 20:20:21 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 887cf3a1a -> a680904a4


restrict the scope of BeamSqlEnv


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5db2777
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5db2777
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5db2777

Branch: refs/heads/DSL_SQL
Commit: c5db2777a4cc7d6d230fba74e7c12fd18fcc07c3
Parents: 887cf3a
Author: mingmxu <mingmxu@ebay.com>
Authored: Fri Jun 16 18:49:18 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Thu Jun 22 13:18:12 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 57 +++++++++++---------
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    | 18 +++----
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    | 12 ++---
 .../beam/dsls/sql/example/BeamSqlExample.java   |  4 +-
 .../math/BeamSqlMathBinaryExpression.java       |  1 -
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  7 +--
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  7 +--
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  8 +--
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |  9 ++--
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  7 ++-
 .../beam/dsls/sql/rel/BeamIntersectRel.java     |  8 +--
 .../apache/beam/dsls/sql/rel/BeamMinusRel.java  |  8 +--
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  7 +--
 .../apache/beam/dsls/sql/rel/BeamRelNode.java   |  7 +--
 .../dsls/sql/rel/BeamSetOperatorRelBase.java    | 10 ++--
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  7 +--
 .../apache/beam/dsls/sql/rel/BeamUnionRel.java  |  8 +--
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  6 +--
 .../beam/dsls/sql/utils/CalciteUtils.java       |  1 -
 .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 10 ++--
 .../beam/dsls/sql/rel/BeamMinusRelTest.java     | 10 ++--
 .../sql/rel/BeamSetOperatorRelBaseTest.java     |  8 +--
 .../beam/dsls/sql/rel/BeamSortRelTest.java      | 26 ++++-----
 .../beam/dsls/sql/rel/BeamUnionRelTest.java     |  8 +--
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    | 12 +++--
 25 files changed, 141 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 04fe055..e68188b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.dsls.sql;
 
-import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
-import static org.apache.beam.dsls.sql.BeamSqlEnv.registerTable;
-
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -47,17 +44,15 @@ PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline p = Pipeline.create(options);
 
 //create table from TextIO;
-TableSchema tableASchema = ...;
 PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
-    .apply(BeamSql.fromTextRow(tableASchema));
-TableSchema tableBSchema = ...;
+    .apply(...);
 PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
-    .apply(BeamSql.fromTextRow(tableBSchema));
+    .apply(...);
 
 //run a simple query, and register the output as a table in BeamSql;
 String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
-PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1))
-        .withUdf("MY_FUNC", myFunc);
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1)
+        .withUdf("MY_FUNC", myFunc));
 
 //run a JOIN with one table from TextIO, and one table from another query
 PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
@@ -107,35 +102,47 @@ public class BeamSql {
    */
   private static class QueryTransform extends
       PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+    private BeamSqlEnv sqlEnv;
     private String sqlQuery;
+
     public QueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
+      sqlEnv = new BeamSqlEnv();
+    }
+
+    public QueryTransform(String sqlQuery, BeamSqlEnv sqlEnv) {
+      this.sqlQuery = sqlQuery;
+      this.sqlEnv = sqlEnv;
     }
 
     @Override
     public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
-      //register tables
-      for (TupleTag<?> sourceTag : input.getAll().keySet()) {
-        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
-        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
-
-        registerTable(sourceTag.getId(),
-            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
-      }
+      registerTables(input);
 
       BeamRelNode beamRelNode = null;
       try {
-        beamRelNode = planner.convertToBeamRel(sqlQuery);
+        beamRelNode = sqlEnv.planner.convertToBeamRel(sqlQuery);
       } catch (ValidationException | RelConversionException | SqlParseException e) {
         throw new IllegalStateException(e);
       }
 
       try {
-        return beamRelNode.buildBeamPipeline(input);
+        return beamRelNode.buildBeamPipeline(input, sqlEnv);
       } catch (Exception e) {
         throw new IllegalStateException(e);
       }
     }
+
+    //register tables, related with input PCollections.
+    private void registerTables(PCollectionTuple input){
+      for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+        sqlEnv.registerTable(sourceTag.getId(),
+            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+      }
+    }
   }
 
   /**
@@ -144,21 +151,19 @@ public class BeamSql {
    */
   private static class SimpleQueryTransform
       extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+    BeamSqlEnv sqlEnv = new BeamSqlEnv();
     private String sqlQuery;
+
     public SimpleQueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
     }
 
-    public SimpleQueryTransform withUdf(String udfName){
-      throw new UnsupportedOperationException("Pending for UDF support");
-    }
-
     @Override
     public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
       SqlNode sqlNode;
       try {
-        sqlNode = planner.parseQuery(sqlQuery);
-        planner.getPlanner().close();
+        sqlNode = sqlEnv.planner.parseQuery(sqlQuery);
+        sqlEnv.planner.getPlanner().close();
       } catch (SqlParseException e) {
         throw new IllegalStateException(e);
       }
@@ -167,7 +172,7 @@ public class BeamSql {
         SqlSelect select = (SqlSelect) sqlNode;
         String tableName = select.getFrom().toString();
         return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
-            .apply(BeamSql.query(sqlQuery));
+            .apply(new QueryTransform(sqlQuery, sqlEnv));
       } else {
         throw new UnsupportedOperationException(
             "Sql operation: " + sqlNode.toString() + " is not supported!");

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
index dbf9a59..50da244 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.dsls.sql;
 
-import static org.apache.beam.dsls.sql.BeamSqlEnv.planner;
-
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
@@ -33,12 +31,11 @@ import org.apache.calcite.plan.RelOptUtil;
  */
 @Experimental
 public class BeamSqlCli {
-
   /**
    * Returns a human readable representation of the query execution plan.
    */
-  public static String explainQuery(String sqlString) throws Exception {
-    BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
+  public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+    BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
     String beamPlan = RelOptUtil.toString(exeTree);
     return beamPlan;
   }
@@ -46,22 +43,23 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+      throws Exception{
     PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
         .as(PipelineOptions.class); // FlinkPipelineOptions.class
     options.setJobName("BeamPlanCreator");
     Pipeline pipeline = Pipeline.create(options);
 
-    return compilePipeline(sqlStatement, pipeline);
+    return compilePipeline(sqlStatement, pipeline, sqlEnv);
   }
 
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
-      throws Exception{
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+      , BeamSqlEnv sqlEnv) throws Exception{
     PCollection<BeamSqlRow> resultStream =
-        planner.compileBeamPipeline(sqlStatement, basePipeline);
+        sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
     return resultStream;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
index d7715c7..baa2617 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -42,10 +42,10 @@ import org.apache.calcite.tools.Frameworks;
  * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
  */
 public class BeamSqlEnv {
-  static SchemaPlus schema;
-  static BeamQueryPlanner planner;
+  SchemaPlus schema;
+  BeamQueryPlanner planner;
 
-  static {
+  public BeamSqlEnv() {
     schema = Frameworks.createRootSchema(true);
     planner = new BeamQueryPlanner(schema);
   }
@@ -53,7 +53,7 @@ public class BeamSqlEnv {
   /**
    * Register a UDF function which can be used in SQL expression.
    */
-  public static void registerUdf(String functionName, Class<?> clazz, String methodName) {
+  public void registerUdf(String functionName, Class<?> clazz, String methodName) {
     schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
   }
 
@@ -61,7 +61,7 @@ public class BeamSqlEnv {
    * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
    *
    */
-  public static void registerTable(String tableName, BaseBeamTable table) {
+  public void registerTable(String tableName, BaseBeamTable table) {
     schema.add(tableName, new BeamCalciteTable(table.getRecordType()));
     planner.getSourceTables().put(tableName, table);
   }
@@ -69,7 +69,7 @@ public class BeamSqlEnv {
   /**
    * Find {@link BaseBeamTable} by table name.
    */
-  public static BaseBeamTable findTable(String tableName){
+  public BaseBeamTable findTable(String tableName){
     return planner.getSourceTables().get(tableName);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 31f8302..5f09fdd 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -63,13 +63,13 @@ class BeamSqlExample {
 
     //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
     PCollection<BeamSqlRow> outputStream = inputTable.apply(
-        BeamSql.simpleQuery("select c2, c3 from TABLE_A where c1=1"));
+        BeamSql.simpleQuery("select c2, c3 from PCOLLECTION where c1=1"));
 
     //log out the output record;
     outputStream.apply("log_result",
         MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
       public Void apply(BeamSqlRow input) {
-        System.out.println("TABLE_A: " + input);
+        System.out.println("PCOLLECTION: " + input);
         return null;
       }
     }));

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
index 11b867a..f79bcf6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathBinaryExpression.java
@@ -57,7 +57,6 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression {
 
   /**
    * The method to check whether operands are numeric or not.
-   * @param opType
    */
   public boolean isOperandNumeric(SqlTypeName opType) {
     return SqlTypeName.NUMERIC_TYPES.contains(opType);

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 2eaf9e7..6ae8a1e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
@@ -106,12 +107,12 @@ public class BeamQueryPlanner {
    * which is linked with the given {@code pipeline}. The final output stream is returned as
    * {@code PCollection} so more operations can be applied.
    */
-  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
-      throws Exception {
+  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline
+      , BeamSqlEnv sqlEnv) throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
     // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
-    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline));
+    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline), sqlEnv);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index bcdc44f..7a1d003a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.rel;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -72,13 +73,13 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
     PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
     if (windowFieldIdx != -1) {
       upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
           .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 40fe05c..07b5c7c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.rel;
 
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -49,14 +50,13 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
     PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index 88fff63..58539f8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -56,18 +56,17 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
    * which is the persisted PCollection.
    */
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
     PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName);
+    BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
 
     PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index ed2bf12..a664ce1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -40,9 +40,8 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
     String stageName = BeamSqlRelUtils.getStageName(this);
@@ -55,7 +54,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
       return sourceStream;
     } else {
       //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
-      BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName);
+      BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
       return sourceTable.buildIOReader(inputPCollections.getPipeline());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
index 01e1c33..7cab171 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -51,8 +51,8 @@ public class BeamIntersectRel extends Intersect implements BeamRelNode {
     return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections);
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
index bee6c11..b558f4b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -49,8 +49,8 @@ public class BeamMinusRel extends Minus implements BeamRelNode {
     return new BeamMinusRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections);
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index e6331c6..2cdfc72 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -18,6 +18,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -59,13 +60,13 @@ public class BeamProjectRel extends Project implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     String stageName = BeamSqlRelUtils.getStageName(this);
 
     PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
 
     BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index aed4b06..d4c98a3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -17,14 +17,14 @@
  */
 package org.apache.beam.dsls.sql.rel;
 
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.rel.RelNode;
 
 /**
- * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's
- * called by {@code BeamQueryPlanner}.
+ * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
  */
 public interface BeamRelNode extends RelNode {
 
@@ -33,5 +33,6 @@ public interface BeamRelNode extends RelNode {
    * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
    * algorithm.
    */
-  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception;
+  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
+      throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
index 3d41e3a..939c9c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.rel;
 
 import java.io.Serializable;
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -62,12 +62,12 @@ public class BeamSetOperatorRelBase {
     this.all = all;
   }
 
-  public PCollection<BeamSqlRow> buildBeamPipeline(
-      PCollectionTuple inputPCollections) throws Exception {
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
-        .buildBeamPipeline(inputPCollections);
+        .buildBeamPipeline(inputPCollections, sqlEnv);
     PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
-        .buildBeamPipeline(inputPCollections);
+        .buildBeamPipeline(inputPCollections, sqlEnv);
 
     WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
     WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 6c7be0b..75f9717 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
@@ -119,11 +120,11 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     }
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
     PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(inputPCollections);
+        .buildBeamPipeline(inputPCollections, sqlEnv);
     Type windowType = upstream.getWindowingStrategy().getWindowFn()
         .getWindowTypeDescriptor().getType();
     if (!windowType.equals(GlobalWindow.class)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
index 63cf11a..c661585 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
@@ -19,7 +19,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.PCollection;
@@ -81,8 +81,8 @@ public class BeamUnionRel extends Union implements BeamRelNode {
     return new BeamUnionRel(getCluster(), traitSet, inputs, all);
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections);
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index ce75768..030d2c8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
@@ -57,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode {
 
   }
 
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
-      throws Exception {
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
+      , BeamSqlEnv sqlEnv) throws Exception {
     List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
     String stageName = BeamSqlRelUtils.getStageName(this);
     if (tuples.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 69ca44b..ac395d3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -74,7 +74,6 @@ public class CalciteUtils {
 
   /**
    * Get the {@code SqlTypeName} for the specified column of a table.
-   * @return
    */
   public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
     return toCalciteType(schema.getFieldsType().get(index));

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
index 02223c2..47fdc16 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
  * Test for {@code BeamIntersectRel}.
  */
 public class BeamIntersectRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
@@ -57,8 +59,8 @@ public class BeamIntersectRelTest {
 
   @BeforeClass
   public static void setUp() {
-    BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
-    BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+    sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+    sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
   }
 
   @Test
@@ -70,7 +72,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         MockedBeamSqlTable.of(
         SqlTypeName.BIGINT, "order_id",
@@ -93,7 +95,7 @@ public class BeamIntersectRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(3));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
index cd6ba16..688ff8e 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
  * Test for {@code BeamMinusRel}.
  */
 public class BeamMinusRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
@@ -58,8 +60,8 @@ public class BeamMinusRelTest {
 
   @Before
   public void setUp() {
-    BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
-    BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+    sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+    sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
     MockedBeamSqlTable.CONTENT.clear();
   }
 
@@ -72,7 +74,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         MockedBeamSqlTable.of(
         SqlTypeName.BIGINT, "order_id",
@@ -93,7 +95,7 @@ public class BeamMinusRelTest {
         + "SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS2 ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).satisfies(new CheckSize(2));
 
     PAssert.that(rows).containsInAnyOrder(

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
index 4936062..f10a767 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -42,6 +42,8 @@ import org.junit.Test;
  * Test for {@code BeamSetOperatorRelBase}.
  */
 public class BeamSetOperatorRelBaseTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   public static final Date THE_DATE = new Date();
@@ -57,7 +59,7 @@ public class BeamSetOperatorRelBaseTest {
   @BeforeClass
   public static void prepare() {
     THE_DATE.setTime(100000);
-    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+    sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
   }
 
   @Test
@@ -71,7 +73,7 @@ public class BeamSetOperatorRelBaseTest {
         + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     List<BeamSqlRow> expRows =
         MockedBeamSqlTable.of(
         SqlTypeName.BIGINT, "order_id",
@@ -100,7 +102,7 @@ public class BeamSetOperatorRelBaseTest {
     // use a real pipeline rather than the TestPipeline because we are
     // testing exceptions, the pipeline will not actually run.
     Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
-    BeamSqlCli.compilePipeline(sql, pipeline1);
+    BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv);
     pipeline.run();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index cfdbd53..2519984 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
  * Test for {@code BeamSortRel}.
  */
 public class BeamSortRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
@@ -69,7 +71,7 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc, site_id desc limit 4";
 
     System.out.println(sql);
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -86,7 +88,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsFirst() throws Exception {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
+    sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -96,7 +98,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -106,7 +108,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -124,7 +126,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsLast() throws Exception {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
+    sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -134,7 +136,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
+    sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -144,7 +146,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -167,7 +169,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -190,7 +192,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -221,13 +223,13 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc limit 11";
 
     TestPipeline pipeline = TestPipeline.create();
-    BeamSqlCli.compilePipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
   }
 
   @Before
   public void prepare() {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
-    BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
+    sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
+    sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
     MockedBeamSqlTable.CONTENT.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
index c2a0597..c5aa132 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -34,6 +34,8 @@ import org.junit.Test;
  * Test for {@code BeamUnionRel}.
  */
 public class BeamUnionRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
@@ -46,7 +48,7 @@ public class BeamUnionRelTest {
 
   @BeforeClass
   public static void prepare() {
-    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+    sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
   }
 
   @Test
@@ -58,7 +60,7 @@ public class BeamUnionRelTest {
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS ";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         MockedBeamSqlTable.of(
             SqlTypeName.BIGINT, "order_id",
@@ -81,7 +83,7 @@ public class BeamUnionRelTest {
         + " SELECT order_id, site_id, price "
         + "FROM ORDER_DETAILS";
 
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(
         MockedBeamSqlTable.of(
             SqlTypeName.BIGINT, "order_id",

http://git-wip-us.apache.org/repos/asf/beam/blob/c5db2777/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index 4557e8e..9a5070a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -35,6 +35,8 @@ import org.junit.Test;
  * Test for {@code BeamValuesRel}.
  */
 public class BeamValuesRelTest {
+  static BeamSqlEnv sqlEnv = new BeamSqlEnv();
+
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
   private static MockedBeamSqlTable stringTable = MockedBeamSqlTable
@@ -49,7 +51,7 @@ public class BeamValuesRelTest {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
         SqlTypeName.VARCHAR, "name",
         SqlTypeName.VARCHAR, "description",
@@ -61,7 +63,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
         SqlTypeName.INTEGER, "c0",
         SqlTypeName.INTEGER, "c1",
@@ -73,7 +75,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
         SqlTypeName.INTEGER, "EXPR$0",
         SqlTypeName.CHAR, "EXPR$1",
@@ -84,8 +86,8 @@ public class BeamValuesRelTest {
 
   @BeforeClass
   public static void prepareClass() {
-    BeamSqlEnv.registerTable("string_table", stringTable);
-    BeamSqlEnv.registerTable("int_table", intTable);
+    sqlEnv.registerTable("string_table", stringTable);
+    sqlEnv.registerTable("int_table", intTable);
   }
 
   @Before


Mime
View raw message