Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B9BD200CB5 for ; Wed, 28 Jun 2017 03:26:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2A5E7160BDC; Wed, 28 Jun 2017 01:26:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C85BB160BE9 for ; Wed, 28 Jun 2017 03:26:48 +0200 (CEST) Received: (qmail 87881 invoked by uid 500); 28 Jun 2017 01:26:47 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 87797 invoked by uid 99); 28 Jun 2017 01:26:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 01:26:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C41ABDFD5A; Wed, 28 Jun 2017 01:26:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: takidau@apache.org To: commits@beam.apache.org Date: Wed, 28 Jun 2017 01:26:47 -0000 Message-Id: <83a5722c325c43c1a55e0265d42416d3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Small fixes to make the example run in a runner agnostic way: - Add direct runner default profile - Add findbugs validation and fix existing findbugs issues - Validate division by zero on arithmetic expression + other minor fixes - archived-at: Wed, 28 Jun 2017 01:26:50 -0000 Repository: beam Updated Branches: refs/heads/DSL_SQL bd99528af -> ab4b11886 Small fixes to make the example run in a runner agnostic way: - Add direct runner default profile - Add findbugs validation and fix existing findbugs issues - Validate division by zero on arithmetic expression + other minor fixes - Update Calcite version to 1.13 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32fbc9ce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32fbc9ce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32fbc9ce Branch: refs/heads/DSL_SQL Commit: 32fbc9cec1d0d86e04e3f453b0d75f2ff0e61b56 Parents: bd99528 Author: Ismaël Mejía Authored: Mon Jun 26 16:37:51 2017 +0200 Committer: Tyler Akidau Committed: Tue Jun 27 18:24:41 2017 -0700 ---------------------------------------------------------------------- dsls/pom.xml | 14 ++++++ dsls/sql/pom.xml | 48 +++++++++++++------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 4 +- .../beam/dsls/sql/example/BeamSqlExample.java | 2 +- .../interpreter/operator/BeamSqlExpression.java | 2 +- .../interpreter/operator/BeamSqlPrimitive.java | 4 +- .../arithmetic/BeamSqlArithmeticExpression.java | 7 ++- .../arithmetic/BeamSqlDivideExpression.java | 3 ++ .../operator/logical/BeamSqlNotExpression.java | 1 - .../operator/math/BeamSqlAbsExpression.java | 2 + .../operator/math/BeamSqlRoundExpression.java | 3 +- .../operator/math/BeamSqlSignExpression.java | 2 + .../beam/dsls/sql/planner/BeamQueryPlanner.java | 2 +- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 3 +- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 4 +- .../dsls/sql/schema/BeamPCollectionTable.java | 2 +- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 5 +- .../beam/dsls/sql/schema/BeamTableUtils.java | 4 +- .../transform/BeamSetOperatorsTransforms.java | 5 +- .../interpreter/BeamSqlFnExecutorTestBase.java | 2 +- 20 files changed, 76 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/pom.xml b/dsls/pom.xml index a741563..d932698 100644 --- a/dsls/pom.xml +++ b/dsls/pom.xml @@ -34,6 +34,20 @@ sql + + + release + + + + org.codehaus.mojo + findbugs-maven-plugin + + + + + + http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/pom.xml ---------------------------------------------------------------------- diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml index d866313..a2279d5 100644 --- a/dsls/sql/pom.xml +++ b/dsls/sql/pom.xml @@ -18,11 +18,14 @@ + 4.0.0 + org.apache.beam beam-dsls-parent 2.1.0-SNAPSHOT + ../pom.xml beam-dsls-sql @@ -34,10 +37,30 @@ ${maven.build.timestamp} yyyy-MM-dd HH:mm - 1.12.0 - 1.9.0 + 1.13.0 + 1.10.0 + + + + direct-runner + + true + + + + org.apache.beam + beam-runners-direct-java + runtime + + + + + @@ -62,11 +85,6 @@ org.apache.maven.plugins - maven-compiler-plugin - - - - org.apache.maven.plugins maven-surefire-plugin -da @@ -75,11 +93,6 @@ org.apache.maven.plugins - maven-jar-plugin - - - - org.apache.maven.plugins maven-shade-plugin @@ -140,11 +153,6 @@ org.apache.beam - beam-runners-direct-java - provided - - - org.apache.beam beam-sdks-java-io-kafka provided @@ -195,5 +203,11 @@ auto-value provided + + + org.apache.beam + beam-runners-direct-java + test + http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java index 5f90380..a0e7cbc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -103,7 +103,7 @@ public class BeamSql { */ private static class QueryTransform extends PTransform> { - private BeamSqlEnv sqlEnv; + private transient BeamSqlEnv sqlEnv; private String sqlQuery; public QueryTransform(String sqlQuery) { @@ -153,7 +153,7 @@ public class BeamSql { private static class SimpleQueryTransform extends PTransform, PCollection> { private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; - BeamSqlEnv sqlEnv = new BeamSqlEnv(); + private transient BeamSqlEnv sqlEnv = new BeamSqlEnv(); private String sqlQuery; public SimpleQueryTransform(String sqlQuery) { http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java index 5f09fdd..04fe451 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -45,7 +45,7 @@ class BeamSqlExample { private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class); public static void main(String[] args) throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); Pipeline p = Pipeline.create(options); //define the input row format http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java index 41dac76..33feb3e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java @@ -30,7 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName; * as its operands, and return a value with type {@link SqlTypeName}. * */ -public abstract class BeamSqlExpression implements Serializable{ +public abstract class BeamSqlExpression implements Serializable { protected List operands; protected SqlTypeName outputType; http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java index b9d1559..92d1263 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -30,7 +30,7 @@ import org.apache.calcite.util.NlsString; * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}. * */ -public class BeamSqlPrimitive extends BeamSqlExpression{ +public class BeamSqlPrimitive extends BeamSqlExpression { private T value; private BeamSqlPrimitive() { @@ -44,7 +44,7 @@ public class BeamSqlPrimitive extends BeamSqlExpression{ * A builder function to create from Type and value directly. */ public static BeamSqlPrimitive of(SqlTypeName outputType, T value){ - BeamSqlPrimitive exp = new BeamSqlPrimitive(); + BeamSqlPrimitive exp = new BeamSqlPrimitive<>(); exp.outputType = outputType; exp.value = value; if (!exp.accept()) { http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java index 69f6f10..f3fd68f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -80,12 +80,11 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) { Object raw = op.evaluate(inputRecord).getValue(); - Double ret = null; if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) { - ret = ((Number) raw).doubleValue(); + return ((Number) raw).doubleValue(); } - - return ret; + throw new IllegalStateException( + String.format("Can't build a valid arithmetic expression with argument %s", raw)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java index c23f54c..907b1fc 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java @@ -35,6 +35,9 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression { } @Override public Double calc(Number left, Number right) { + if (right.doubleValue() == 0) { + throw new IllegalArgumentException("divisor cannot be 0"); + } return left.doubleValue() / right.doubleValue(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java index 21b1111..ffa0184 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java @@ -40,7 +40,6 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression { if (numberOfOperands() != 1) { return false; } - return super.accept(); } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java index 2c6e6b4..5677fc3 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlAbsExpression.java @@ -66,6 +66,8 @@ public class BeamSqlAbsExpression extends BeamSqlMathUnaryExpression { result = BeamSqlPrimitive .of(SqlTypeName.DOUBLE, SqlFunctions.abs(op.getDouble())); break; + default: + break; } return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java index e03b9cb..21dc09e 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRoundExpression.java @@ -46,7 +46,6 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression { @Override public BeamSqlPrimitive calculate(BeamSqlPrimitive leftOp, BeamSqlPrimitive rightOp) { BeamSqlPrimitive result = null; - switch (leftOp.getOutputType()) { case SMALLINT: result = BeamSqlPrimitive.of(SqlTypeName.SMALLINT, @@ -72,6 +71,8 @@ public class BeamSqlRoundExpression extends BeamSqlMathBinaryExpression { result = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, roundBigDecimal(toBigDecimal(leftOp.getValue()), toInt(rightOp.getValue()))); break; + default: + break; } return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java index 3ca42e6..311c9a0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlSignExpression.java @@ -55,6 +55,8 @@ public class BeamSqlSignExpression extends BeamSqlMathUnaryExpression { result = BeamSqlPrimitive .of(SqlTypeName.DECIMAL, SqlFunctions.sign(SqlFunctions.toBigDecimal(op.getValue()))); break; + default: + break; } return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 6ae8a1e..93f9a2f 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -73,7 +73,7 @@ public class BeamQueryPlanner { RelDataTypeSystem.DEFAULT); public BeamQueryPlanner(SchemaPlus schema) { - final List traitDefs = new ArrayList(); + final List traitDefs = new ArrayList<>(); traitDefs.add(ConventionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java index 58539f8..d70f94a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java @@ -24,7 +24,6 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PDone; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelTraitSet; @@ -68,7 +67,7 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode { BaseBeamTable targetTable = sqlEnv.findTable(sourceName); - PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter()); + upstream.apply(stageName, targetTable.buildIOWriter()); return upstream; } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java index a664ce1..d323d82 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java @@ -44,9 +44,7 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode { , BeamSqlEnv sqlEnv) throws Exception { String sourceName = Joiner.on('.').join(getTable().getQualifiedName()); - String stageName = BeamSqlRelUtils.getStageName(this); - - TupleTag sourceTupleTag = new TupleTag(sourceName); + TupleTag sourceTupleTag = new TupleTag<>(sourceName); if (inputPCollections.has(sourceTupleTag)) { //choose PCollection from input PCollectionTuple if exists there. PCollection sourceStream = inputPCollections http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java index ecd0d67..8309097 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java @@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.PDone; */ public class BeamPCollectionTable extends BaseBeamTable { private BeamIOType ioType; - private PCollection upstream; + private transient PCollection upstream; protected BeamPCollectionTable(BeamSqlRecordType beamSqlRecordType) { super(beamSqlRecordType); http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index 3a67303..213dcd5 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -342,7 +342,7 @@ public class BeamSqlRow implements Serializable { * Return data fields as key=value. */ public String valueInString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); for (int idx = 0; idx < size(); ++idx) { sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); } @@ -364,4 +364,7 @@ public class BeamSqlRow implements Serializable { return toString().equals(other.toString()); } + @Override public int hashCode() { + return toString().hashCode(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java index 79a9cb2..7157793 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java @@ -45,7 +45,7 @@ public final class BeamTableUtils { if (rawRecord.size() != beamSqlRecordType.size()) { throw new IllegalArgumentException(String.format( - "Expect %d fields, but actually %d", line, + "Expect %d fields, but actually %d", beamSqlRecordType.size(), rawRecord.size() )); } else { @@ -75,7 +75,7 @@ public final class BeamTableUtils { public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) { if (rawObj == null) { - row.addField(idx, rawObj); + row.addField(idx, null); return; } http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java index 56b3e14..a983cf5 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java @@ -85,9 +85,8 @@ public abstract class BeamSetOperatorsTransforms { case INTERSECT: if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { if (all) { - Iterator iter = leftRows.iterator(); - while (iter.hasNext()) { - ctx.output(iter.next()); + for (BeamSqlRow leftRow : leftRows) { + ctx.output(leftRow); } } else { ctx.output(ctx.element().getKey()); http://git-wip-us.apache.org/repos/asf/beam/blob/32fbc9ce/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java index 739d548..5afd273 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java @@ -79,7 +79,7 @@ public class BeamSqlFnExecutorTestBase { record.addField(3, 1234567L); SchemaPlus schema = Frameworks.createRootSchema(true); - final List traitDefs = new ArrayList(); + final List traitDefs = new ArrayList<>(); traitDefs.add(ConventionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); FrameworkConfig config = Frameworks.newConfigBuilder()