beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: Small fixes to make the example run in a runner agnostic way: - Add direct runner default profile - Add findbugs validation and fix existing findbugs issues - Validate division by zero on arithmetic expression + other minor fixes -
Date Wed, 28 Jun 2017 01:26:47 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL bd99528af -> ab4b11886


Small fixes to make the example run in a runner agnostic way:
- Add direct runner default profile
- Add findbugs validation and fix existing findbugs issues
- Validate division by zero on arithmetic expression + other minor fixes
- Update Calcite version to 1.13


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32fbc9ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32fbc9ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32fbc9ce

Branch: refs/heads/DSL_SQL
Commit: 32fbc9cec1d0d86e04e3f453b0d75f2ff0e61b56
Parents: bd99528
Author: Ismaël Mejía <iemejia@gmail.com>
Authored: Mon Jun 26 16:37:51 2017 +0200
Committer: Tyler Akidau <takidau@apache.org>
Committed: Tue Jun 27 18:24:41 2017 -0700

----------------------------------------------------------------------
 dsls/pom.xml                                    | 14 ++++++
 dsls/sql/pom.xml                                | 48 +++++++++++++-------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  |  4 +-
 .../beam/dsls/sql/example/BeamSqlExample.java   |  2 +-
 .../interpreter/operator/BeamSqlExpression.java |  2 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |  4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |  7 ++-
 .../arithmetic/BeamSqlDivideExpression.java     |  3 ++
 .../operator/logical/BeamSqlNotExpression.java  |  1 -
 .../operator/math/BeamSqlAbsExpression.java     |  2 +
 .../operator/math/BeamSqlRoundExpression.java   |  3 +-
 .../operator/math/BeamSqlSignExpression.java    |  2 +
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  2 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |  3 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  4 +-
 .../dsls/sql/schema/BeamPCollectionTable.java   |  2 +-
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |  5 +-
 .../beam/dsls/sql/schema/BeamTableUtils.java    |  4 +-
 .../transform/BeamSetOperatorsTransforms.java   |  5 +-
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  2 +-
 20 files changed, 76 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index a741563..d932698 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -34,6 +34,20 @@
     <module>sql</module>
   </modules>
 
+  <profiles>
+    <profile>
+      <id>release</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>findbugs-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <build>
     <pluginManagement>
       <plugins>

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index d866313..a2279d5 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -18,11 +18,14 @@
 <project
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
   xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+
   <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-dsls-parent</artifactId>
     <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
   </parent>
 
   <artifactId>beam-dsls-sql</artifactId>
@@ -34,10 +37,30 @@
   <properties>
     <timestamp>${maven.build.timestamp}</timestamp>
     <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
-    <calcite-version>1.12.0</calcite-version>
-    <avatica-version>1.9.0</avatica-version>
+    <calcite-version>1.13.0</calcite-version>
+    <avatica-version>1.10.0</avatica-version>
   </properties>
 
+  <profiles>
+    <!--
+      The direct runner is available by default.
+      You can also include it on the classpath explicitly with -P direct-runner
+    -->
+    <profile>
+      <id>direct-runner</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-direct-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
   <build>
     <resources>
       <resource>
@@ -62,11 +85,6 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
         <argLine>-da</argLine> <!-- disable assert in Calcite converter validation
-->
@@ -75,11 +93,6 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <executions>
           <execution>
@@ -140,11 +153,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-kafka</artifactId>
       <scope>provided</scope>
     </dependency>
@@ -195,5 +203,11 @@
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 5f90380..a0e7cbc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -103,7 +103,7 @@ public class BeamSql {
    */
   private static class QueryTransform extends
       PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
-    private BeamSqlEnv sqlEnv;
+    private transient BeamSqlEnv sqlEnv;
     private String sqlQuery;
 
     public QueryTransform(String sqlQuery) {
@@ -153,7 +153,7 @@ public class BeamSql {
   private static class SimpleQueryTransform
       extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
{
     private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
-    BeamSqlEnv sqlEnv = new BeamSqlEnv();
+    private transient BeamSqlEnv sqlEnv = new BeamSqlEnv();
     private String sqlQuery;
 
     public SimpleQueryTransform(String sqlQuery) {

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 5f09fdd..04fe451 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -45,7 +45,7 @@ class BeamSqlExample {
   private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class);
 
   public static void main(String[] args) throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
     Pipeline p = Pipeline.create(options);
 
     //define the input row format

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 41dac76..33feb3e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -30,7 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
  * as its operands, and return a value with type {@link SqlTypeName}.
  *
  */
-public abstract class BeamSqlExpression implements Serializable{
+public abstract class BeamSqlExpression implements Serializable {
   protected List<BeamSqlExpression> operands;
   protected SqlTypeName outputType;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index b9d1559..92d1263 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -30,7 +30,7 @@ import org.apache.calcite.util.NlsString;
  * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
  *
  */
-public class BeamSqlPrimitive<T> extends BeamSqlExpression{
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
   private T value;
 
   private BeamSqlPrimitive() {
@@ -44,7 +44,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
    * A builder function to create from Type and value directly.
    */
   public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
-    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<T>();
+    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
     exp.outputType = outputType;
     exp.value = value;
     if (!exp.accept()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index 69f6f10..f3fd68f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -80,12 +80,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression
{
 
   private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
     Object raw = op.evaluate(inputRecord).getValue();
-    Double ret = null;
     if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {
-      ret = ((Number) raw).doubleValue();
+      return ((Number) raw).doubleValue();
     }
-
-    return ret;
+    throw new IllegalStateException(
+        String.format("Can't build a valid arithmetic expression with argument %s", raw));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
index c23f54c..907b1fc 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -35,6 +35,9 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression
{
   }
 
   @Override public Double calc(Number left, Number right) {
+    if (right.doubleValue() == 0) {
+      throw new IllegalArgumentException("divisor cannot be 0");
+    }
     return left.doubleValue() / right.doubleValue();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
index 21b1111..ffa0184 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -40,7 +40,6 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
     if (numberOfOperands() != 1) {
       return false;
     }
-
     return super.accept();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
index 2c6e6b4..5677fc3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java
@@ -66,6 +66,8 @@ public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression {
         result = BeamSqlPrimitive
             .of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble()));
         break;
+      default:
+        break;
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
index e03b9cb..21dc09e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java
@@ -46,7 +46,6 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression
{
   @Override public BeamSqlPrimitive<? extends Number> calculate(BeamSqlPrimitive leftOp,
       BeamSqlPrimitive rightOp) {
     BeamSqlPrimitive result = null;
-
     switch (leftOp.getOutputType()) {
       case SMALLINT:
         result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT,
@@ -72,6 +71,8 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression
{
         result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
             roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue())));
         break;
+      default:
+        break;
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
index 3ca42e6..311c9a0 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java
@@ -55,6 +55,8 @@ public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression {
         result = BeamSqlPrimitive
             .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue())));
         break;
+      default:
+        break;
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 6ae8a1e..93f9a2f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -73,7 +73,7 @@ public class BeamQueryPlanner {
       RelDataTypeSystem.DEFAULT);
 
   public BeamQueryPlanner(SchemaPlus schema) {
-    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
     traitDefs.add(RelCollationTraitDef.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index 58539f8..d70f94a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
@@ -68,7 +67,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
 
     BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
 
-    PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
+    upstream.apply(stageName, targetTable.buildIOWriter());
 
     return upstream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index a664ce1..d323d82 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -44,9 +44,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
       , BeamSqlEnv sqlEnv) throws Exception {
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    String stageName = BeamSqlRelUtils.getStageName(this);
-
-    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<BeamSqlRow>(sourceName);
+    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
     if (inputPCollections.has(sourceTupleTag)) {
       //choose PCollection from input PCollectionTuple if exists there.
       PCollection<BeamSqlRow> sourceStream = inputPCollections

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index ecd0d67..8309097 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
-  private PCollection<BeamSqlRow> upstream;
+  private transient PCollection<BeamSqlRow> upstream;
 
   protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) {
     super(beamSqlRecordType);

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 3a67303..213dcd5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -342,7 +342,7 @@ public class BeamSqlRow implements Serializable {
    * Return data fields as key=value.
    */
   public String valueInString() {
-    StringBuffer sb = new StringBuffer();
+    StringBuilder sb = new StringBuilder();
     for (int idx = 0; idx < size(); ++idx) {
       sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
     }
@@ -364,4 +364,7 @@ public class BeamSqlRow implements Serializable {
     return toString().equals(other.toString());
   }
 
+  @Override public int hashCode() {
+    return toString().hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index 79a9cb2..7157793 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -45,7 +45,7 @@ public final class BeamTableUtils {
 
       if (rawRecord.size() != beamSqlRecordType.size()) {
         throw new IllegalArgumentException(String.format(
-            "Expect %d fields, but actually %d", line,
+            "Expect %d fields, but actually %d",
             beamSqlRecordType.size(), rawRecord.size()
         ));
       } else {
@@ -75,7 +75,7 @@ public final class BeamTableUtils {
 
   public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj)
{
     if (rawObj == null) {
-      row.addField(idx, rawObj);
+      row.addField(idx, null);
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
index 56b3e14..a983cf5 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
@@ -85,9 +85,8 @@ public abstract class BeamSetOperatorsTransforms {
         case INTERSECT:
           if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
             if (all) {
-              Iterator<BeamSqlRow> iter = leftRows.iterator();
-              while (iter.hasNext()) {
-                ctx.output(iter.next());
+              for (BeamSqlRow leftRow : leftRows) {
+                ctx.output(leftRow);
               }
             } else {
               ctx.output(ctx.element().getKey());

http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index 739d548..5afd273 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -79,7 +79,7 @@ public class BeamSqlFnExecutorTestBase {
     record.addField(3, 1234567L);
 
     SchemaPlus schema = Frameworks.createRootSchema(true);
-    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    final List<RelTraitDef> traitDefs = new ArrayList<>();
     traitDefs.add(ConventionTraitDef.INSTANCE);
     traitDefs.add(RelCollationTraitDef.INSTANCE);
     FrameworkConfig config = Frameworks.newConfigBuilder()


Mime
View raw message