beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [1/2] beam git commit: Add NOT operator on DSL_SQL branch (plus some refactoring)
Date Wed, 14 Jun 2017 16:56:17 GMT
Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 1e080e2ba -> 07fda7c2d


Add NOT operator on DSL_SQL branch (plus some refactoring)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/447bed59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/447bed59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/447bed59

Branch: refs/heads/DSL_SQL
Commit: 447bed598da810f3c2fa9f1daa67f714799a2403
Parents: 1e080e2
Author: James Xu <xumingmingv@gmail.com>
Authored: Wed May 24 12:41:47 2017 +0800
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jun 14 09:48:20 2017 -0700

----------------------------------------------------------------------
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 184 +++++++++++++------
 .../operator/BeamSqlAndExpression.java          |  60 ------
 .../operator/BeamSqlInputRefExpression.java     |   7 +-
 .../operator/BeamSqlOrExpression.java           |  60 ------
 .../interpreter/operator/BeamSqlPrimitive.java  |   1 -
 .../operator/logical/BeamSqlAndExpression.java  |  48 +++++
 .../logical/BeamSqlLogicalExpression.java       |  47 +++++
 .../operator/logical/BeamSqlNotExpression.java  |  55 ++++++
 .../operator/logical/BeamSqlOrExpression.java   |  48 +++++
 .../operator/logical/package-info.java          |  22 +++
 .../sql/interpreter/BeamSqlFnExecutorTest.java  |  78 +++++++-
 .../operator/BeamSqlAndOrExpressionTest.java    |   3 +
 .../logical/BeamSqlNotExpressionTest.java       |  48 +++++
 13 files changed, 480 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 4cea280..2f5ae76 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.dsls.sql.interpreter;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
@@ -34,7 +34,6 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
@@ -53,6 +52,9 @@ import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpres
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimeExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimestampExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
@@ -111,6 +113,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
    * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
    */
   static BeamSqlExpression buildExpression(RexNode rexNode) {
+    BeamSqlExpression ret = null;
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
       SqlTypeName type = node.getTypeName();
@@ -126,11 +129,45 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor
{
         // Calcite actually treat Calendar as the java type of Date Literal
         return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
       } else {
-        return BeamSqlPrimitive.of(type, value);
+        // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+        // e.g. sql: "select 1"
+        // here the literal 1 will be parsed as a RexLiteral where:
+        //     node.getType().getSqlTypeName() = INTEGER (the display type)
+        //     node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+        // So we need to do a convert here.
+        // check RexBuilder#makeLiteral for more information.
+        SqlTypeName realType = node.getType().getSqlTypeName();
+        Object realValue = value;
+        if (type == SqlTypeName.DECIMAL) {
+          BigDecimal rawValue = (BigDecimal) value;
+          switch (realType) {
+            case TINYINT:
+              realValue = (byte) rawValue.intValue();
+              break;
+            case SMALLINT:
+              realValue = (short) rawValue.intValue();
+              break;
+            case INTEGER:
+              realValue = rawValue.intValue();
+              break;
+            case BIGINT:
+              realValue = rawValue.longValue();
+              break;
+            default:
+              throw new IllegalStateException("type/realType mismatch: "
+                  + type + " VS " + realType);
+          }
+        } else if (type == SqlTypeName.DOUBLE) {
+          Double rawValue = (Double) value;
+          if (realType == SqlTypeName.FLOAT) {
+            realValue = rawValue.floatValue();
+          }
+        }
+        return BeamSqlPrimitive.of(realType, realValue);
       }
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
-      return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+      ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
     } else if (rexNode instanceof RexCall) {
       RexCall node = (RexCall) rexNode;
       String opName = node.op.getName();
@@ -139,64 +176,92 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor
{
         subExps.add(buildExpression(subNode));
       }
       switch (opName) {
+        // logical operators
         case "AND":
-        return new BeamSqlAndExpression(subExps);
+          ret = new BeamSqlAndExpression(subExps);
+          break;
         case "OR":
-          return new BeamSqlOrExpression(subExps);
-
+          ret = new BeamSqlOrExpression(subExps);
+          break;
+        case "NOT":
+          ret = new BeamSqlNotExpression(subExps);
+          break;
         case "=":
-          return new BeamSqlEqualExpression(subExps);
-        case "<>=":
-          return new BeamSqlNotEqualExpression(subExps);
+          ret = new BeamSqlEqualExpression(subExps);
+          break;
+        case "<>":
+          ret = new BeamSqlNotEqualExpression(subExps);
+          break;
         case ">":
-          return new BeamSqlLargerThanExpression(subExps);
+          ret = new BeamSqlLargerThanExpression(subExps);
+          break;
         case ">=":
-          return new BeamSqlLargerThanEqualExpression(subExps);
+          ret = new BeamSqlLargerThanEqualExpression(subExps);
+          break;
         case "<":
-          return new BeamSqlLessThanExpression(subExps);
+          ret = new BeamSqlLessThanExpression(subExps);
+          break;
         case "<=":
-          return new BeamSqlLessThanEqualExpression(subExps);
+          ret = new BeamSqlLessThanEqualExpression(subExps);
+          break;
 
         // arithmetic operators
         case "+":
-          return new BeamSqlPlusExpression(subExps);
+          ret = new BeamSqlPlusExpression(subExps);
+          break;
         case "-":
-          return new BeamSqlMinusExpression(subExps);
+          ret = new BeamSqlMinusExpression(subExps);
+          break;
         case "*":
-          return new BeamSqlMultiplyExpression(subExps);
+          ret = new BeamSqlMultiplyExpression(subExps);
+          break;
         case "/":
         case "/INT":
-          return new BeamSqlDivideExpression(subExps);
+          ret = new BeamSqlDivideExpression(subExps);
+          break;
         case "MOD":
-          return new BeamSqlModExpression(subExps);
+          ret = new BeamSqlModExpression(subExps);
+          break;
 
         case "ABS":
-          return new BeamSqlAbsExpression(subExps);
+          ret = new BeamSqlAbsExpression(subExps);
+          break;
         case "SQRT":
-          return new BeamSqlSqrtExpression(subExps);
+          ret = new BeamSqlSqrtExpression(subExps);
+          break;
         case "ROUND":
-          return new BeamSqlRoundExpression(subExps);
+          ret = new BeamSqlRoundExpression(subExps);
+          break;
 
         // string operators
         case "||":
-          return new BeamSqlConcatExpression(subExps);
+          ret = new BeamSqlConcatExpression(subExps);
+          break;
         case "POSITION":
-          return new BeamSqlPositionExpression(subExps);
+          ret = new BeamSqlPositionExpression(subExps);
+          break;
         case "CHAR_LENGTH":
         case "CHARACTER_LENGTH":
-          return new BeamSqlCharLengthExpression(subExps);
+          ret = new BeamSqlCharLengthExpression(subExps);
+          break;
         case "UPPER":
-          return new BeamSqlUpperExpression(subExps);
+          ret = new BeamSqlUpperExpression(subExps);
+          break;
         case "LOWER":
-          return new BeamSqlLowerExpression(subExps);
+          ret = new BeamSqlLowerExpression(subExps);
+          break;
         case "TRIM":
-          return new BeamSqlTrimExpression(subExps);
+          ret = new BeamSqlTrimExpression(subExps);
+          break;
         case "SUBSTRING":
-          return new BeamSqlSubstringExpression(subExps);
+          ret = new BeamSqlSubstringExpression(subExps);
+          break;
         case "OVERLAY":
-          return new BeamSqlOverlayExpression(subExps);
+          ret = new BeamSqlOverlayExpression(subExps);
+          break;
         case "INITCAP":
-          return new BeamSqlInitCapExpression(subExps);
+          ret = new BeamSqlInitCapExpression(subExps);
+          break;
 
         // date functions
         case "REINTERPRET":
@@ -220,31 +285,37 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor
{
 
 
         case "CASE":
-          return new BeamSqlCaseExpression(subExps);
+          ret = new BeamSqlCaseExpression(subExps);
+          break;
 
         case "IS NULL":
-          return new BeamSqlIsNullExpression(subExps.get(0));
-      case "IS NOT NULL":
-        return new BeamSqlIsNotNullExpression(subExps.get(0));
+          ret = new BeamSqlIsNullExpression(subExps.get(0));
+          break;
+        case "IS NOT NULL":
+          ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+          break;
 
-      case "HOP":
-      case "TUMBLE":
-      case "SESSION":
-        return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
-      case "HOP_START":
-      case "TUMBLE_START":
-      case "SESSION_START":
-        return new BeamSqlWindowStartExpression();
-      case "HOP_END":
-      case "TUMBLE_END":
-      case "SESSION_END":
-        return new BeamSqlWindowEndExpression();
-      default:
-        //handle UDF
-        if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
-          SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
-          ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
-          return new BeamSqlUdfExpression(fn.method, subExps,
+        case "HOP":
+        case "TUMBLE":
+        case "SESSION":
+          ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+          break;
+        case "HOP_START":
+        case "TUMBLE_START":
+        case "SESSION_START":
+          ret = new BeamSqlWindowStartExpression();
+          break;
+        case "HOP_END":
+        case "TUMBLE_END":
+        case "SESSION_END":
+          ret = new BeamSqlWindowEndExpression();
+          break;
+        default:
+          //handle UDF
+          if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+            SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+            ret = new BeamSqlUdfExpression(fn.method, subExps,
               ((RexCall) rexNode).type.getSqlTypeName());
         } else {
           throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
@@ -254,6 +325,13 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
       throw new BeamSqlUnsupportedException(
           String.format("%s is not supported yet", rexNode.getClass().toString()));
     }
+
+    if (ret != null && !ret.accept()) {
+      throw new IllegalStateException(ret.getClass().getSimpleName()
+          + " does not accept the operands.(" + rexNode + ")");
+    }
+
+    return ret;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
deleted file mode 100644
index d7dc7d7..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for 'AND' operation.
- */
-public class BeamSqlAndExpression extends BeamSqlExpression {
-
-  private BeamSqlAndExpression(List<BeamSqlExpression> operands, SqlTypeName outputType)
{
-    super(operands, outputType);
-  }
-  public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
-    this(operands, SqlTypeName.BOOLEAN);
-  }
-
-  @Override
-  public boolean accept() {
-    for (BeamSqlExpression exp : operands) {
-      // only accept BOOLEAN expression as operand
-      if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    boolean result = true;
-    for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
-      result = result && expOut.getValue();
-      if (!result) {
-        break;
-      }
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index 3e99caf..b6d2b0b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -23,7 +23,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 /**
  * An primitive operation for direct field extraction.
  */
-public class BeamSqlInputRefExpression extends BeamSqlExpression{
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
   private int inputRef;
 
   public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
@@ -33,14 +33,11 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression{
 
   @Override
   public boolean accept() {
-    // TODO Auto-generated method stub
-    return false;
+    return true;
   }
 
   @Override
   public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
deleted file mode 100644
index e47ed45..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter.operator;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamSqlExpression} for 'OR' operation.
- */
-public class BeamSqlOrExpression extends BeamSqlExpression {
-
-  private BeamSqlOrExpression(List<BeamSqlExpression> operands, SqlTypeName outputType)
{
-    super(operands, outputType);
-  }
-  public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
-    this(operands, SqlTypeName.BOOLEAN);
-  }
-
-  @Override
-  public boolean accept() {
-    for (BeamSqlExpression exp : operands) {
-      // only accept BOOLEAN expression as operand
-      if (!exp.outputType.equals(SqlTypeName.BOOLEAN)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
-    boolean result = false;
-    for (BeamSqlExpression exp : operands) {
-      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
-        result = result || expOut.getValue();
-        if (result) {
-          break;
-        }
-    }
-    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index a0b3a55..99f8473 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -34,7 +34,6 @@ import org.apache.calcite.util.NlsString;
  *
  */
 public class BeamSqlPrimitive<T> extends BeamSqlExpression{
-  private SqlTypeName outputType;
   private T value;
 
   private BeamSqlPrimitive() {

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
new file mode 100644
index 0000000..5da43f4
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'AND' operation.
+ */
+public class BeamSqlAndExpression extends BeamSqlLogicalExpression {
+  public BeamSqlAndExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+    boolean result = true;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+      result = result && expOut.getValue();
+      if (!result) {
+        break;
+      }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
new file mode 100644
index 0000000..c9ff186
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlLogicalExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for Logical operators.
+ */
+public abstract class BeamSqlLogicalExpression extends BeamSqlExpression {
+  private BeamSqlLogicalExpression(List<BeamSqlExpression> operands, SqlTypeName outputType)
{
+    super(operands, outputType);
+  }
+  public BeamSqlLogicalExpression(List<BeamSqlExpression> operands) {
+    this(operands, SqlTypeName.BOOLEAN);
+  }
+
+  @Override
+  public boolean accept() {
+    for (BeamSqlExpression exp : operands) {
+      // only accept BOOLEAN expression as operand
+      if (!exp.getOutputType().equals(SqlTypeName.BOOLEAN)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
new file mode 100644
index 0000000..21b1111
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for logical operator: NOT.
+ *
+ * <p>Whether boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN.
+ */
+public class BeamSqlNotExpression extends BeamSqlLogicalExpression {
+  public BeamSqlNotExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public boolean accept() {
+    if (numberOfOperands() != 1) {
+      return false;
+    }
+
+    return super.accept();
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    Boolean value = opValueEvaluated(0, inputRecord);
+    if (value == null) {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null);
+    } else {
+      return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
new file mode 100644
index 0000000..9ca57f0
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlOrExpression.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for 'OR' operation.
+ */
+public class BeamSqlOrExpression extends BeamSqlLogicalExpression {
+  public BeamSqlOrExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
+    boolean result = false;
+    for (BeamSqlExpression exp : operands) {
+      BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);
+        result = result || expOut.getValue();
+        if (result) {
+          break;
+        }
+    }
+    return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
new file mode 100644
index 0000000..7862045
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Logical operators.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.logical;

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
index 46d8326..d97acc7 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -25,7 +25,6 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
 
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
@@ -44,6 +43,9 @@ import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpres
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimeExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimestampExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
@@ -53,6 +55,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpre
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -130,6 +133,76 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase
{
 
 
   @Test
+  public void testBuildExpression_logical() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlAndExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OR,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(false)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOrExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlNotExpression);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_andOr_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperand() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+
+  @Test(expected = IllegalStateException.class)
+  public void testBuildExpression_logical_not_invalidOperandCount() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.NOT,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral(true)
+        )
+    );
+    BeamSqlFnExecutor.buildExpression(rexNode);
+  }
+
+  @Test
   public void testBuildExpression_arithmetic() {
     testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, BeamSqlPlusExpression.class);
     testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, BeamSqlMinusExpression.class);
@@ -177,7 +250,8 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
         Arrays.asList(
             rexBuilder.makeLiteral("hello"),
             rexBuilder.makeLiteral("worldhello"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+            rexBuilder.makeCast(BeamQueryPlanner.TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER),
+                rexBuilder.makeBigintLiteral(BigDecimal.ONE))
         )
     );
     exp = BeamSqlFnExecutor.buildExpression(rexNode);

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
index 9c9d3d2..01c57a8 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -19,7 +19,10 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/beam/blob/447bed59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
new file mode 100644
index 0000000..1dd602b
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlNotExpressionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlNotExpression}.
+ */
+public class BeamSqlNotExpressionTest extends BeamSqlFnExecutorTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    Assert.assertTrue(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    Assert.assertFalse(new BeamSqlNotExpression(operands).evaluate(record).getBoolean());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null));
+    Assert.assertNull(new BeamSqlNotExpression(operands).evaluate(record).getValue());
+  }
+}


Mime
View raw message