beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2309] Implement VALUES and add support for data type CHAR (to be able to test VALUES)
Date Wed, 24 May 2017 08:21:19 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL ed9764124 -> db982cfe1


[BEAM-2309] Implement VALUES and add support for data type CHAR (to be able to test VALUES)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/433282f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/433282f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/433282f5

Branch: refs/heads/DSL_SQL
Commit: 433282f5eec56802d50f1e05d834f380034f0940
Parents: ed97641
Author: James Xu <xumingmingv@gmail.com>
Authored: Wed May 17 14:41:18 2017 +0800
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue May 23 08:46:24 2017 +0200

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 12 ++-
 .../interpreter/operator/BeamSqlPrimitive.java  |  1 +
 .../beam/dsls/sql/planner/BeamRuleSets.java     |  3 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 78 ++++++++++++++++
 .../beam/dsls/sql/rule/BeamValuesRule.java      | 48 ++++++++++
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java |  2 +
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  2 +
 .../beam/dsls/sql/schema/BeamTableUtils.java    | 71 +++++++++------
 .../dsls/sql/transform/BeamSQLProjectFn.java    |  3 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  2 +-
 .../BeamPlannerAggregationSubmitTest.java       | 13 ++-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  8 +-
 .../dsls/sql/planner/MockedBeamSQLTable.java    |  9 +-
 .../beam/dsls/sql/rel/BeamSortRelTest.java      | 22 +++--
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    | 95 ++++++++++++++++++++
 15 files changed, 318 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
index be388aa..9dcf003 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
@@ -63,7 +63,9 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
 
 /**
  * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
@@ -99,8 +101,14 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
   static BeamSqlExpression buildExpression(RexNode rexNode) {
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
-
-      return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+      // NlsString is not serializable, we need to convert
+      // it to string explicitly.
+      if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
+          && node.getValue() instanceof NlsString) {
+        return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue());
+      } else {
+        return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+      }
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
       return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index a5938f3..bc18c5e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
+
 import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 2cac5ae..1ad62bc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -28,6 +28,7 @@ import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
 import org.apache.beam.dsls.sql.rule.BeamProjectRule;
 import org.apache.beam.dsls.sql.rule.BeamSortRule;
+import org.apache.beam.dsls.sql.rule.BeamValuesRule;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.RuleSet;
@@ -41,7 +42,7 @@ public class BeamRuleSets {
   private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
       .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
           BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
-          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE)
+          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE)
       .build();
 
   public static RuleSet[] getRuleSets() {

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
new file mode 100644
index 0000000..4fbe7ec
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Values} node.
+ *
+ * <p>{@code BeamValuesRel} will be used in the following SQLs:
+ * <ul>
+ *   <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
+ *   <li>{@code select 1, '1', LOCALTIME}</li>
+ * </ul>
+ */
+public class BeamValuesRel extends Values implements BeamRelNode {
+
+  public BeamValuesRel(
+      RelOptCluster cluster,
+      RelDataType rowType,
+      ImmutableList<ImmutableList<RexLiteral>> tuples,
+      RelTraitSet traits) {
+    super(cluster, rowType, tuples, traits);
+
+  }
+
+  @Override public PCollection<BeamSQLRow> buildBeamPipeline(
+      BeamPipelineCreator planCreator) throws Exception {
+    List<BeamSQLRow> rows = new ArrayList<>(tuples.size());
+    String stageName = BeamSQLRelUtils.getStageName(this);
+    if (tuples.isEmpty()) {
+      throw new IllegalStateException("Values with empty tuples!");
+    }
+
+    BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from(this.getRowType());
+    for (ImmutableList<RexLiteral> tuple : tuples) {
+      BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
+      for (int i = 0; i < tuple.size(); i++) {
+        BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
+      }
+      rows.add(row);
+    }
+
+    return planCreator.getPipeline().apply(stageName, Create.of(rows));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
new file mode 100644
index 0000000..4ea9e60
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamValuesRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalValues;
+
+/**
+ * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
+ */
+public class BeamValuesRule extends ConverterRule {
+  public static final BeamValuesRule INSTANCE = new BeamValuesRule();
+  private BeamValuesRule() {
+    super(LogicalValues.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamValuesRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Values values = (Values) rel;
+    return new BeamValuesRel(
+        values.getCluster(),
+        values.getRowType(),
+        values.getTuples(),
+        values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
index 7b6428e..bc75eb1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
@@ -121,6 +121,7 @@ public class BeamSQLRow implements Serializable {
       }
       break;
     case VARCHAR:
+    case CHAR:
       if (!(fieldValue instanceof String)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
@@ -232,6 +233,7 @@ public class BeamSQLRow implements Serializable {
         return fieldValue;
       }
     case VARCHAR:
+    case CHAR:
       if (!(fieldValue instanceof String)) {
         throw new InvalidFieldException(
             String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 0accb9a..bfcb487 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -85,6 +85,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
           longCoder.encode(value.getLong(idx), outStream, context.nested());
           break;
         case VARCHAR:
+        case CHAR:
           stringCoder.encode(value.getString(idx), outStream, context.nested());
           break;
         case TIMESTAMP:
@@ -134,6 +135,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
           record.addField(idx, longCoder.decode(inStream, context.nested()));
           break;
         case VARCHAR:
+        case CHAR:
           record.addField(idx, stringCoder.decode(inStream, context.nested()));
           break;
         case TIMESTAMP:

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index bc622c2..c7397e1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -21,9 +21,11 @@ package org.apache.beam.dsls.sql.schema;
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.math.BigDecimal;
 
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVPrinter;
@@ -72,33 +74,50 @@ public final class BeamTableUtils {
     return writer.toString();
   }
 
-  public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) {
+  public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, Object rawObj)
{
+    if (rawObj == null) {
+      row.addField(idx, rawObj);
+      return;
+    }
+
     SqlTypeName columnType = row.getDataType().getFieldsType().get(idx);
-    switch (columnType) {
-      case TINYINT:
-        row.addField(idx, Byte.valueOf(raw));
-        break;
-      case SMALLINT:
-        row.addField(idx, Short.valueOf(raw));
-        break;
-      case INTEGER:
-        row.addField(idx, Integer.valueOf(raw));
-        break;
-      case BIGINT:
-        row.addField(idx, Long.valueOf(raw));
-        break;
-      case FLOAT:
-        row.addField(idx, Float.valueOf(raw));
-        break;
-      case DOUBLE:
-        row.addField(idx, Double.valueOf(raw));
-        break;
-      case VARCHAR:
-        row.addField(idx, raw);
-        break;
-      default:
-        throw new BeamSqlUnsupportedException(
-            String.format("Column type %s is not supported yet!", columnType));
+    // auto-casting for numberics
+    if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType))
+        || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) {
+      String raw = rawObj.toString();
+      switch (columnType) {
+        case TINYINT:
+          row.addField(idx, Byte.valueOf(raw));
+          break;
+        case SMALLINT:
+          row.addField(idx, Short.valueOf(raw));
+          break;
+        case INTEGER:
+          row.addField(idx, Integer.valueOf(raw));
+          break;
+        case BIGINT:
+          row.addField(idx, Long.valueOf(raw));
+          break;
+        case FLOAT:
+          row.addField(idx, Float.valueOf(raw));
+          break;
+        case DOUBLE:
+          row.addField(idx, Double.valueOf(raw));
+          break;
+        default:
+          throw new BeamSqlUnsupportedException(
+              String.format("Column type %s is not supported yet!", columnType));
+      }
+    } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) {
+      // convert NlsString to String
+      if (rawObj instanceof NlsString) {
+        row.addField(idx, ((NlsString) rawObj).getValue());
+      } else {
+        row.addField(idx, rawObj);
+      }
+    } else {
+      // keep the origin
+      row.addField(idx, rawObj);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
index 79dd67f..ef4dc0f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -22,6 +22,7 @@ import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
@@ -57,7 +58,7 @@ public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow>
{
     outRow.updateWindowRange(inputRecord, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {
-      outRow.addField(idx, results.get(idx));
+      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx));
     }
 
     c.output(outRow);

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index fe8a236..0d9d147 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -41,7 +41,7 @@ public class BasePlanner {
   public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
 
   @BeforeClass
-  public static void prepare() {
+  public static void prepareClass() {
     runner.addTableMetadata("ORDER_DETAILS", getTable());
     runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
     runner.addTableMetadata("SUB_ORDER_RAM", getTable());

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index 22f1848..ffc3e01 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -21,6 +21,7 @@ import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
+
 import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,11 +49,16 @@ public class BeamPlannerAggregationSubmitTest {
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
-  public static void prepare() throws ParseException {
+  public static void prepareClass() throws ParseException {
     runner.addTableMetadata("ORDER_DETAILS", getOrderTable());
     runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable());
   }
 
+  @Before
+  public void prepare() throws ParseException {
+    MockedBeamSQLTable.CONTENT.clear();
+  }
+
   private static BaseBeamTable getOrderTable() throws ParseException {
     final RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
@@ -118,7 +125,7 @@ public class BeamPlannerAggregationSubmitTest {
     pipeline.run().waitUntilFinish();
 
     Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    BeamSQLRow result = MockedBeamSQLTable.CONTENT.get(0);
+    BeamSQLRow result = MockedBeamSQLTable.CONTENT.peek();
     Assert.assertEquals(1, result.getInteger(0));
     Assert.assertEquals(format.parse("2017-01-01 01:00:00"), result.getDate(1));
     Assert.assertEquals(1L, result.getLong(2));
@@ -136,6 +143,6 @@ public class BeamPlannerAggregationSubmitTest {
 
     Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
     Assert.assertEquals("site_id=0,agg_hour=null,size=3",
-        MockedBeamSQLTable.CONTENT.get(0).valueInString());
+        MockedBeamSQLTable.CONTENT.peek().valueInString());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 17cea27..7219d11 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -21,6 +21,7 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -32,6 +33,11 @@ public class BeamPlannerSubmitTest extends BasePlanner {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
+  @Before
+  public void prepare() {
+    MockedBeamSQLTable.CONTENT.clear();
+  }
+
   @Test
   public void insertSelectFilter() throws Exception {
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
@@ -43,7 +49,7 @@ public class BeamPlannerSubmitTest extends BasePlanner {
     pipeline.run().waitUntilFinish();
 
     Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    Assert.assertTrue(MockedBeamSQLTable.CONTENT.get(0).valueInString()
+    Assert.assertTrue(MockedBeamSQLTable.CONTENT.peek().valueInString()
         .contains("order_id=12345,site_id=0,price=20.5,order_time="));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 78fd055..561f4be 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.planner;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
@@ -43,7 +44,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
  */
 public class MockedBeamSQLTable extends BaseBeamTable {
 
-  public static final List<BeamSQLRow> CONTENT = new ArrayList<>();
+  public static final ConcurrentLinkedQueue<BeamSQLRow> CONTENT = new ConcurrentLinkedQueue<>();
 
   private List<BeamSQLRow> inputRecords;
 
@@ -142,12 +143,6 @@ public class MockedBeamSQLTable extends BaseBeamTable {
     @Override
     public PDone expand(PCollection<BeamSQLRow> input) {
       input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() {
-
-        @Setup
-        public void setup() {
-          CONTENT.clear();
-        }
-
         @ProcessElement
         public void processElement(ProcessContext c) {
           CONTENT.add(c.element());

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index 864d4b7..4935c3b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.beam.dsls.sql.rel;
 
+import java.util.Collection;
 import java.util.Date;
-import java.util.List;
+import java.util.Iterator;
+
 import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
@@ -27,6 +29,7 @@ import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -62,7 +65,6 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_basic() throws Exception {
-    prepare();
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS "
@@ -162,7 +164,6 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_with_offset() throws Exception {
-    prepare();
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS "
@@ -186,7 +187,6 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_bigFetch() throws Exception {
-    prepare();
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price)  SELECT "
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS "
@@ -216,7 +216,6 @@ public class BeamSortRelTest {
 
   @Test(expected = BeamSqlUnsupportedException.class)
   public void testOrderBy_exception() throws Exception {
-    prepare();
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id)  SELECT "
         + " order_id, COUNT(*) "
         + "FROM ORDER_DETAILS "
@@ -227,15 +226,20 @@ public class BeamSortRelTest {
     runner.compileBeamPipeline(sql, pipeline);
   }
 
-  public static void prepare() {
+  @Before
+  public void prepare() {
     runner.addTableMetadata("ORDER_DETAILS", orderDetailTable);
     runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable);
+    MockedBeamSQLTable.CONTENT.clear();
   }
 
-  private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) {
+  private void assertEquals(Collection<BeamSQLRow> rows1, Collection<BeamSQLRow>
rows2) {
     Assert.assertEquals(rows1.size(), rows2.size());
-    for (int i = 0; i < rows1.size(); i++) {
-      Assert.assertEquals(rows1.get(i), rows2.get(i));
+
+    Iterator<BeamSQLRow> it1 = rows1.iterator();
+    Iterator<BeamSQLRow> it2 = rows2.iterator();
+    while (it1.hasNext()) {
+      Assert.assertEquals(it1.next(), it2.next());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/433282f5/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
new file mode 100644
index 0000000..d4e1db2
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamValuesRel}.
+ */
+public class BeamValuesRelTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
+  private static MockedBeamSQLTable stringTable = MockedBeamSQLTable
+      .of(SqlTypeName.VARCHAR, "name",
+          SqlTypeName.VARCHAR, "description");
+
+  private static MockedBeamSQLTable intTable = MockedBeamSQLTable
+      .of(SqlTypeName.INTEGER, "c0",
+          SqlTypeName.INTEGER, "c1");
+
+  @Test
+  public void testValues() throws Exception {
+    String sql = "insert into string_table(name, description) values "
+        + "('hello', 'world'), ('james', 'bond')";
+    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
+        SqlTypeName.VARCHAR, "name",
+        SqlTypeName.VARCHAR, "description",
+        "hello", "world",
+        "james", "bond").getInputRecords());
+    pipeline.run();
+  }
+
+  @Test
+  public void testValues_castInt() throws Exception {
+    String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
+    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
+        SqlTypeName.INTEGER, "c0",
+        SqlTypeName.INTEGER, "c1",
+        1, 2
+    ).getInputRecords());
+    pipeline.run();
+  }
+
+  @Test
+  public void testValues_onlySelect() throws Exception {
+    String sql = "select 1, '1'";
+    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
+        SqlTypeName.INTEGER, "EXPR$0",
+        SqlTypeName.CHAR, "EXPR$1",
+        1, "1"
+    ).getInputRecords());
+    pipeline.run();
+  }
+
+  @BeforeClass
+  public static void prepareClass() {
+    runner.addTableMetadata("string_table", stringTable);
+    runner.addTableMetadata("int_table", intTable);
+  }
+
+  @Before
+  public void prepare() {
+    MockedBeamSQLTable.CONTENT.clear();
+  }
+}


Mime
View raw message