Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13124200D41 for ; Tue, 7 Nov 2017 17:54:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 11EFA160C0E; Tue, 7 Nov 2017 16:54:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E07DA160C01 for ; Tue, 7 Nov 2017 17:54:19 +0100 (CET) Received: (qmail 38414 invoked by uid 500); 7 Nov 2017 16:54:18 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 38034 invoked by uid 99); 7 Nov 2017 16:54:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Nov 2017 16:54:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42001E78B1; Tue, 7 Nov 2017 16:54:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 07 Nov 2017 16:54:28 -0000 Message-Id: <5325278ec3b44f52bd7bf44edc11f178@git.apache.org> In-Reply-To: <3df2cdcdb808479dba94f89e78aa2670@git.apache.org> References: <3df2cdcdb808479dba94f89e78aa2670@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] beam git commit: [BEAM-2203] Implement TIMESTAMPADD archived-at: Tue, 07 Nov 2017 16:54:22 -0000 [BEAM-2203] Implement TIMESTAMPADD Add support for TIMESTAMPADD(interval, multiplier, TIMESTAMP) fixup! [BEAM-2203] Implement TIMESTAMPADD Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/820f8aff Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/820f8aff Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/820f8aff Branch: refs/heads/mr-runner Commit: 820f8aff944d1eda69c9226086848d8b39fcf62f Parents: a33c717 Author: Anton Kedin Authored: Fri Oct 27 10:15:08 2017 -0700 Committer: mingmxu Committed: Wed Nov 1 16:29:07 2017 -0700 ---------------------------------------------------------------------- .../sql/impl/interpreter/BeamSqlFnExecutor.java | 19 ++- .../interpreter/operator/BeamSqlPrimitive.java | 8 +- .../date/BeamSqlDatetimePlusExpression.java | 129 +++++++++++++++ .../date/BeamSqlIntervalMultiplyExpression.java | 103 ++++++++++++ .../operator/date/TimeUnitUtils.java | 54 +++++++ .../extensions/sql/impl/utils/SqlTypeUtils.java | 46 ++++++ .../impl/interpreter/BeamSqlFnExecutorTest.java | 30 ++++ .../date/BeamSqlDateExpressionTestBase.java | 5 +- .../date/BeamSqlDatetimePlusExpressionTest.java | 155 +++++++++++++++++++ .../BeamSqlIntervalMultiplyExpressionTest.java | 107 +++++++++++++ .../operator/date/TimeUnitUtilsTest.java | 54 +++++++ .../sql/impl/utils/SqlTypeUtilsTest.java | 76 +++++++++ .../BeamSqlDateFunctionsIntegrationTest.java | 39 ++++- 13 files changed, 818 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index 8f9797b..8770055 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Calendar; import java.util.List; + import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; @@ -49,7 +50,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression; @@ -143,7 +146,7 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { // NlsString is not serializable, we need to convert // it to string explicitly. return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); - } else if (type == SqlTypeName.DATE && value instanceof Calendar) { + } else if (isDateNode(type, value)) { // does this actually make sense? // Calcite actually treat Calendar as the java type of Date Literal return BeamSqlPrimitive.of(type, ((Calendar) value).getTime()); @@ -235,7 +238,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { ret = new BeamSqlMinusExpression(subExps); break; case "*": - ret = new BeamSqlMultiplyExpression(subExps); + if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { + ret = new BeamSqlMultiplyExpression(subExps); + } else { + ret = new BeamSqlIntervalMultiplyExpression(subExps); + } break; case "/": case "/INT": @@ -369,6 +376,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { case "CURRENT_DATE": return new BeamSqlCurrentDateExpression(); + case "DATETIME_PLUS": + return new BeamSqlDatetimePlusExpression(subExps); + case "CASE": ret = new BeamSqlCaseExpression(subExps); @@ -423,6 +433,11 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { return ret; } + private static boolean isDateNode(SqlTypeName type, Object value) { + return (type == SqlTypeName.DATE || type == SqlTypeName.TIMESTAMP) + && value instanceof Calendar; + } + @Override public void prepare() { } http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java index 9175caa..21cbc80 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; + import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -133,9 +134,12 @@ public class BeamSqlPrimitive extends BeamSqlExpression { case TIMESTAMP: case DATE: return value instanceof Date; - case INTERVAL_HOUR: - return value instanceof BigDecimal; + case INTERVAL_SECOND: case INTERVAL_MINUTE: + case INTERVAL_HOUR: + case INTERVAL_DAY: + case INTERVAL_MONTH: + case INTERVAL_YEAR: return value instanceof BigDecimal; case SYMBOL: // for SYMBOL, it supports anything... http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java new file mode 100644 index 0000000..426cda0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpression.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier; +import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType; + +import com.google.common.collect.ImmutableSet; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Set; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.DateTime; + +/** + * DATETIME_PLUS operation. + * Calcite converts 'TIMESTAMPADD(..)' or 'DATE + INTERVAL' from the user input + * into DATETIME_PLUS. + * + *

Input and output are expected to be of type TIMESTAMP. + */ +public class BeamSqlDatetimePlusExpression extends BeamSqlExpression { + + private static final Set SUPPORTED_INTERVAL_TYPES = ImmutableSet.of( + SqlTypeName.INTERVAL_SECOND, + SqlTypeName.INTERVAL_MINUTE, + SqlTypeName.INTERVAL_HOUR, + SqlTypeName.INTERVAL_DAY, + SqlTypeName.INTERVAL_MONTH, + SqlTypeName.INTERVAL_YEAR); + + public BeamSqlDatetimePlusExpression(List operands) { + super(operands, SqlTypeName.TIMESTAMP); + } + + /** + * Requires exactly 2 operands. One should be a timestamp, another an interval + */ + @Override + public boolean accept() { + return operands.size() == 2 + && SqlTypeName.TIMESTAMP.equals(operands.get(0).getOutputType()) + && SUPPORTED_INTERVAL_TYPES.contains(operands.get(1).getOutputType()); + } + + /** + * Adds interval to the timestamp. + * + *

Interval has a value of 'multiplier * TimeUnit.multiplier'. + * + *

For example, '3 years' is going to have a type of INTERVAL_YEAR, and a value of 36. + * And '2 minutes' is going to be an INTERVAL_MINUTE with a value of 120000. This is the way + * Calcite handles interval expressions, and {@link BeamSqlIntervalMultiplyExpression} also works + * the same way. + */ + @Override + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + DateTime timestamp = getTimestampOperand(inputRow, window); + BeamSqlPrimitive intervalOperandPrimitive = getIntervalOperand(inputRow, window); + SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType(); + int intervalMultiplier = getIntervalMultiplier(intervalOperandPrimitive); + + DateTime newDate = addInterval(timestamp, intervalOperandType, intervalMultiplier); + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, newDate.toDate()); + } + + private int getIntervalMultiplier(BeamSqlPrimitive intervalOperandPrimitive) { + BigDecimal intervalOperandValue = intervalOperandPrimitive.getDecimal(); + BigDecimal multiplier = intervalOperandValue.divide( + timeUnitInternalMultiplier(intervalOperandPrimitive.getOutputType()), + BigDecimal.ROUND_CEILING); + return multiplier.intValueExact(); + } + + private BeamSqlPrimitive getIntervalOperand(BeamRecord inputRow, BoundedWindow window) { + return findExpressionOfType(operands, SUPPORTED_INTERVAL_TYPES).get() + .evaluate(inputRow, window); + } + + private DateTime getTimestampOperand(BeamRecord inputRow, BoundedWindow window) { + BeamSqlPrimitive timestampOperandPrimitive = + findExpressionOfType(operands, SqlTypeName.TIMESTAMP).get().evaluate(inputRow, window); + return new DateTime(timestampOperandPrimitive.getDate()); + } + + private DateTime addInterval( + DateTime dateTime, SqlTypeName intervalType, int numberOfIntervals) { + + switch (intervalType) { + case INTERVAL_SECOND: + return dateTime.plusSeconds(numberOfIntervals); + case INTERVAL_MINUTE: + return dateTime.plusMinutes(numberOfIntervals); + case INTERVAL_HOUR: + return dateTime.plusHours(numberOfIntervals); + case INTERVAL_DAY: + return dateTime.plusDays(numberOfIntervals); + case INTERVAL_MONTH: + return dateTime.plusMonths(numberOfIntervals); + case INTERVAL_YEAR: + return dateTime.plusYears(numberOfIntervals); + default: + throw new IllegalArgumentException("Adding " + + intervalType.getName() + " to date is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java new file mode 100644 index 0000000..f4ddf71 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpression.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier; +import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType; + +import com.google.common.base.Optional; + +import java.math.BigDecimal; +import java.util.List; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Multiplication operator for intervals. + * For example, allows to express things like '3 years'. + * + *

One use case of this is implementation of TIMESTAMPADD(). + * Calcite converts TIMESTAMPADD(date, multiplier, inteval) into + * DATETIME_PLUS(date, multiplier * interval). + * The 'multiplier * interval' part is what this class implements. It's not a regular + * numerical multiplication because the return type is expected to be an interval, and the value + * is expected to use corresponding TimeUnit's internal value (e.g. 12 for YEAR, 60000 for MINUTE). + */ +public class BeamSqlIntervalMultiplyExpression extends BeamSqlExpression { + public BeamSqlIntervalMultiplyExpression(List operands) { + super(operands, deduceOutputType(operands)); + } + + /** + * Output type is null if no operands found with matching types. + * Execution will later fail when calling accept() + */ + private static SqlTypeName deduceOutputType(List operands) { + Optional intervalOperand = + findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES); + + return intervalOperand.isPresent() + ? intervalOperand.get().getOutputType() + : null; + } + + /** + * Requires exactly 2 operands. One should be integer, another should be interval + */ + @Override + public boolean accept() { + return operands.size() == 2 + && findExpressionOfType(operands, SqlTypeName.INTEGER).isPresent() + && findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).isPresent(); + } + /** + * Evaluates the number of times the interval should be repeated, times the TimeUnit multiplier. + * For example for '3 * MONTH' this will return an object with type INTERVAL_MONTH and value 36. + * + *

This is due to the fact that TimeUnit has different internal multipliers for each interval, + * e.g. MONTH is 12, but MINUTE is 60000. When Calcite parses SQL interval literals, it returns + * those internal multipliers. This means we need to do similar thing, so that this multiplication + * expression behaves the same way as literal interval expression. + * + *

That is, we need to make sure that this: + * "TIMESTAMP '1984-04-19 01:02:03' + INTERVAL '2' YEAR" + * is equivalent tot this: + * "TIMESTAMPADD(YEAR, 2, TIMESTAMP '1984-04-19 01:02:03')" + */ + @Override + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + BeamSqlPrimitive intervalOperandPrimitive = + findExpressionOfType(operands, SqlTypeName.INTERVAL_TYPES).get().evaluate(inputRow, window); + SqlTypeName intervalOperandType = intervalOperandPrimitive.getOutputType(); + + BeamSqlPrimitive integerOperandPrimitive = + findExpressionOfType(operands, SqlTypeName.INTEGER).get().evaluate(inputRow, window); + BigDecimal integerOperandValue = new BigDecimal(integerOperandPrimitive.getInteger()); + + BigDecimal multiplicationResult = + integerOperandValue.multiply( + timeUnitInternalMultiplier(intervalOperandType)); + + return BeamSqlPrimitive.of(outputType, multiplicationResult); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java new file mode 100644 index 0000000..b432d20 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import java.math.BigDecimal; + +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Utils to convert between Calcite's TimeUnit and Sql intervals. + */ +public abstract class TimeUnitUtils { + + /** + * @return internal multiplier of a TimeUnit, e.g. YEAR is 12, MINUTE is 60000 + * @throws IllegalArgumentException if interval type is not supported + */ + public static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) { + switch (sqlIntervalType) { + case INTERVAL_SECOND: + return TimeUnit.SECOND.multiplier; + case INTERVAL_MINUTE: + return TimeUnit.MINUTE.multiplier; + case INTERVAL_HOUR: + return TimeUnit.HOUR.multiplier; + case INTERVAL_DAY: + return TimeUnit.DAY.multiplier; + case INTERVAL_MONTH: + return TimeUnit.MONTH.multiplier; + case INTERVAL_YEAR: + return TimeUnit.YEAR.multiplier; + default: + throw new IllegalArgumentException("Interval " + sqlIntervalType + + " cannot be converted to TimeUnit"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java new file mode 100644 index 0000000..1ab703e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtils.java @@ -0,0 +1,46 @@ +package org.apache.beam.sdk.extensions.sql.impl.utils; + +import com.google.common.base.Optional; + +import java.util.Collection; +import java.util.List; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Utils to help with SqlTypes. + */ +public class SqlTypeUtils { + /** + * Finds an operand with provided type. + * Returns Optional.absent() if no operand found with matching type + */ + public static Optional findExpressionOfType( + List operands, SqlTypeName type) { + + for (BeamSqlExpression operand : operands) { + if (type.equals(operand.getOutputType())) { + return Optional.of(operand); + } + } + + return Optional.absent(); + } + + /** + * Finds an operand with the type in typesToFind. + * Returns Optional.absent() if no operand found with matching type + */ + public static Optional findExpressionOfType( + List operands, Collection typesToFind) { + + for (BeamSqlExpression operand : operands) { + if (typesToFind.contains(operand.getOutputType())) { + return Optional.of(operand); + } + } + + return Optional.absent(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java index f350087..c4583ec 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.TimeZone; + import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; @@ -40,7 +41,9 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSql import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDatetimePlusExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlIntervalMultiplyExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression; @@ -57,12 +60,15 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.fun.SqlTrimFunction; +import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; import org.junit.Test; @@ -412,5 +418,29 @@ public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase { ); exp = BeamSqlFnExecutor.buildExpression(rexNode); assertTrue(exp instanceof BeamSqlCurrentTimestampExpression); + + // DATETIME_PLUS + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.DATETIME_PLUS, + Arrays.asList( + rexBuilder.makeDateLiteral(calendar), + rexBuilder.makeIntervalLiteral( + new BigDecimal(10), + new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlDatetimePlusExpression); + + // * for intervals + rexNode = rexBuilder.makeCall(SqlStdOperatorTable.MULTIPLY, + Arrays.asList( + rexBuilder.makeExactLiteral(new BigDecimal(1)), + rexBuilder.makeIntervalLiteral( + new BigDecimal(10), + new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)) + ) + ); + exp = BeamSqlFnExecutor.buildExpression(rexNode); + assertTrue(exp instanceof BeamSqlIntervalMultiplyExpression); } } http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java index 0e57404..cb0b6ec 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateExpressionTestBase.java @@ -22,13 +22,14 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; + import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlFnExecutorTestBase; /** * Base class for all date related expression test. */ public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase { - protected long str2LongTime(String dateStr) { + static long str2LongTime(String dateStr) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { Date date = format.parse(dateStr); @@ -38,7 +39,7 @@ public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase { } } - protected Date str2DateTime(String dateStr) { + static Date str2DateTime(String dateStr) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { format.setTimeZone(TimeZone.getTimeZone("GMT")); http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java new file mode 100644 index 0000000..57e709f --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDatetimePlusExpressionTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Date; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.DateTime; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Test for {@link BeamSqlDatetimePlusExpression}. + */ +public class BeamSqlDatetimePlusExpressionTest extends BeamSqlDateExpressionTestBase { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final BeamRecord NULL_INPUT_ROW = null; + private static final BoundedWindow NULL_WINDOW = null; + private static final Date DATE = str2DateTime("1984-04-19 01:02:03"); + + private static final Date DATE_PLUS_15_SECONDS = new DateTime(DATE).plusSeconds(15).toDate(); + private static final Date DATE_PLUS_10_MINUTES = new DateTime(DATE).plusMinutes(10).toDate(); + private static final Date DATE_PLUS_7_HOURS = new DateTime(DATE).plusHours(7).toDate(); + private static final Date DATE_PLUS_3_DAYS = new DateTime(DATE).plusDays(3).toDate(); + private static final Date DATE_PLUS_2_MONTHS = new DateTime(DATE).plusMonths(2).toDate(); + private static final Date DATE_PLUS_11_YEARS = new DateTime(DATE).plusYears(11).toDate(); + + private static final BeamSqlExpression SQL_INTERVAL_15_SECONDS = + interval(SqlTypeName.INTERVAL_SECOND, 15); + private static final BeamSqlExpression SQL_INTERVAL_10_MINUTES = + interval(SqlTypeName.INTERVAL_MINUTE, 10); + private static final BeamSqlExpression SQL_INTERVAL_7_HOURS = + interval(SqlTypeName.INTERVAL_HOUR, 7); + private static final BeamSqlExpression SQL_INTERVAL_3_DAYS = + interval(SqlTypeName.INTERVAL_DAY, 3); + private static final BeamSqlExpression SQL_INTERVAL_2_MONTHS = + interval(SqlTypeName.INTERVAL_MONTH, 2); + private static final BeamSqlExpression SQL_INTERVAL_4_MONTHS = + interval(SqlTypeName.INTERVAL_MONTH, 4); + private static final BeamSqlExpression SQL_INTERVAL_11_YEARS = + interval(SqlTypeName.INTERVAL_YEAR, 11); + + private static final BeamSqlExpression SQL_TIMESTAMP = + BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, DATE); + + @Test public void testHappyPath_outputTypeAndAccept() { + BeamSqlExpression plusExpression = dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS); + + assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType()); + assertTrue(plusExpression.accept()); + } + + @Test public void testDoesNotAcceptTreeOperands() { + BeamSqlDatetimePlusExpression plusExpression = + dateTimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS); + + assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType()); + assertFalse(plusExpression.accept()); + } + + @Test public void testDoesNotAcceptWithoutTimestampOperand() { + BeamSqlDatetimePlusExpression plusExpression = + dateTimePlus(SQL_INTERVAL_3_DAYS, SQL_INTERVAL_4_MONTHS); + + assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType()); + assertFalse(plusExpression.accept()); + } + + @Test public void testDoesNotAcceptWithoutIntervalOperand() { + BeamSqlDatetimePlusExpression plusExpression = + dateTimePlus(SQL_TIMESTAMP, SQL_TIMESTAMP); + + assertEquals(SqlTypeName.TIMESTAMP, plusExpression.getOutputType()); + assertFalse(plusExpression.accept()); + } + + @Test public void testEvaluate() { + assertEquals(DATE_PLUS_15_SECONDS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_15_SECONDS)); + assertEquals(DATE_PLUS_10_MINUTES, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_10_MINUTES)); + assertEquals(DATE_PLUS_7_HOURS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_7_HOURS)); + assertEquals(DATE_PLUS_3_DAYS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_3_DAYS)); + assertEquals(DATE_PLUS_2_MONTHS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_2_MONTHS)); + assertEquals(DATE_PLUS_11_YEARS, evalDatetimePlus(SQL_TIMESTAMP, SQL_INTERVAL_11_YEARS)); + } + + @Test public void testEvaluateThrowsForUnsupportedIntervalType() { + thrown.expect(UnsupportedOperationException.class); + + BeamSqlPrimitive unsupportedInterval = BeamSqlPrimitive.of(SqlTypeName.INTERVAL_YEAR_MONTH, 3); + evalDatetimePlus(SQL_TIMESTAMP, unsupportedInterval); + } + + private static Date evalDatetimePlus(BeamSqlExpression date, BeamSqlExpression interval) { + return dateTimePlus(date, interval).evaluate(NULL_INPUT_ROW, NULL_WINDOW).getDate(); + } + + private static BeamSqlDatetimePlusExpression dateTimePlus(BeamSqlExpression ... operands) { + return new BeamSqlDatetimePlusExpression(Arrays.asList(operands)); + } + + private static BeamSqlExpression interval(SqlTypeName type, int multiplier) { + return BeamSqlPrimitive.of(type, + timeUnitInternalMultiplier(type) + .multiply(new BigDecimal(multiplier))); + } + + private static BigDecimal timeUnitInternalMultiplier(final SqlTypeName sqlIntervalType) { + switch (sqlIntervalType) { + case INTERVAL_SECOND: + return TimeUnit.SECOND.multiplier; + case INTERVAL_MINUTE: + return TimeUnit.MINUTE.multiplier; + case INTERVAL_HOUR: + return TimeUnit.HOUR.multiplier; + case INTERVAL_DAY: + return TimeUnit.DAY.multiplier; + case INTERVAL_MONTH: + return TimeUnit.MONTH.multiplier; + case INTERVAL_YEAR: + return TimeUnit.YEAR.multiplier; + default: + throw new IllegalArgumentException("Interval " + sqlIntervalType + + " cannot be converted to TimeUnit"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java new file mode 100644 index 0000000..0c91f40 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlIntervalMultiplyExpressionTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import static org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.TimeUnitUtils.timeUnitInternalMultiplier; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; +import java.util.Arrays; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.BeamRecord; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Test for BeamSqlIntervalMultiplyExpression. + */ +public class BeamSqlIntervalMultiplyExpressionTest { + private static final BeamRecord NULL_INPUT_ROW = null; + private static final BoundedWindow NULL_WINDOW = null; + private static final BigDecimal DECIMAL_THREE = new BigDecimal(3); + private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4); + + private static final BeamSqlExpression SQL_INTERVAL_DAY = + BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE); + + private static final BeamSqlExpression SQL_INTERVAL_MONTH = + BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR); + + private static final BeamSqlExpression SQL_INTEGER_FOUR = + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4); + + private static final BeamSqlExpression SQL_INTEGER_FIVE = + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5); + + @Test public void testHappyPath_outputTypeAndAccept() { + BeamSqlExpression multiplyExpression = + newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR); + + assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType()); + assertTrue(multiplyExpression.accept()); + } + + @Test public void testDoesNotAcceptTreeOperands() { + BeamSqlIntervalMultiplyExpression multiplyExpression = + newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FIVE, SQL_INTEGER_FOUR); + + assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType()); + assertFalse(multiplyExpression.accept()); + } + + @Test public void testDoesNotAcceptWithoutIntervalOperand() { + BeamSqlIntervalMultiplyExpression multiplyExpression = + newMultiplyExpression(SQL_INTEGER_FOUR, SQL_INTEGER_FIVE); + + assertNull(multiplyExpression.getOutputType()); + assertFalse(multiplyExpression.accept()); + } + + @Test public void testDoesNotAcceptWithoutIntegerOperand() { + BeamSqlIntervalMultiplyExpression multiplyExpression = + newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTERVAL_MONTH); + + assertEquals(SqlTypeName.INTERVAL_DAY, multiplyExpression.getOutputType()); + assertFalse(multiplyExpression.accept()); + } + + @Test public void testEvaluate_integerOperand() { + BeamSqlIntervalMultiplyExpression multiplyExpression = + newMultiplyExpression(SQL_INTERVAL_DAY, SQL_INTEGER_FOUR); + + BeamSqlPrimitive multiplicationResult = + multiplyExpression.evaluate(NULL_INPUT_ROW, NULL_WINDOW); + + BigDecimal expectedResult = + DECIMAL_FOUR.multiply(timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY)); + + assertEquals(expectedResult, multiplicationResult.getDecimal()); + assertEquals(SqlTypeName.INTERVAL_DAY, multiplicationResult.getOutputType()); + } + + private BeamSqlIntervalMultiplyExpression newMultiplyExpression(BeamSqlExpression ... operands) { + return new BeamSqlIntervalMultiplyExpression(Arrays.asList(operands)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java new file mode 100644 index 0000000..91552ae --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/TimeUnitUtilsTest.java @@ -0,0 +1,54 @@ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date; + +import static org.junit.Assert.assertEquals; + +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Unit tests for {@link TimeUnitUtils}. + */ +public class TimeUnitUtilsTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test public void testReturnsInternalTimeUnitMultipliers() { + assertEquals(TimeUnit.SECOND.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_SECOND)); + assertEquals(TimeUnit.MINUTE.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MINUTE)); + assertEquals(TimeUnit.HOUR.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_HOUR)); + assertEquals(TimeUnit.DAY.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY)); + assertEquals(TimeUnit.MONTH.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_MONTH)); + assertEquals(TimeUnit.YEAR.multiplier, + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_YEAR)); + } + + @Test public void testThrowsForUnsupportedIntervalType() { + thrown.expect(IllegalArgumentException.class); + TimeUnitUtils.timeUnitInternalMultiplier(SqlTypeName.INTERVAL_DAY_MINUTE); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java new file mode 100644 index 0000000..1a14256 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/SqlTypeUtilsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.utils; + +import static org.apache.beam.sdk.extensions.sql.impl.utils.SqlTypeUtils.findExpressionOfType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.common.base.Optional; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Test; + +/** + * Tests for {@link SqlTypeUtils}. + */ +public class SqlTypeUtilsTest { + private static final BigDecimal DECIMAL_THREE = new BigDecimal(3); + private static final BigDecimal DECIMAL_FOUR = new BigDecimal(4); + + private static final List EXPRESSIONS = Arrays. asList( + BeamSqlPrimitive.of(SqlTypeName.INTERVAL_DAY, DECIMAL_THREE), + BeamSqlPrimitive.of(SqlTypeName.INTERVAL_MONTH, DECIMAL_FOUR), + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 4), + BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5)); + + @Test public void testFindExpressionOfType_success() { + Optional typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INTEGER); + + assertTrue(typeName.isPresent()); + assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType()); + } + + @Test public void testFindExpressionOfType_failure() { + Optional typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.VARCHAR); + + assertFalse(typeName.isPresent()); + } + + @Test public void testFindExpressionOfTypes_success() { + Optional typeName = findExpressionOfType(EXPRESSIONS, SqlTypeName.INT_TYPES); + + assertTrue(typeName.isPresent()); + assertEquals(SqlTypeName.INTEGER, typeName.get().getOutputType()); + } + + @Test public void testFindExpressionOfTypes_failure() { + Optional typeName = + findExpressionOfType(EXPRESSIONS, SqlTypeName.CHAR_TYPES); + + assertFalse(typeName.isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/820f8aff/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java index 1fdb35f..6937a18 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.Date; import java.util.Iterator; + import org.apache.beam.sdk.extensions.sql.BeamSql; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -35,7 +36,7 @@ import org.junit.Test; */ public class BeamSqlDateFunctionsIntegrationTest extends BeamSqlBuiltinFunctionsIntegrationTestBase { - @Test public void testDateTimeFunctions() throws Exception { + @Test public void testBasicDateTimeFunctions() throws Exception { ExpressionChecker checker = new ExpressionChecker() .addExpr("EXTRACT(YEAR FROM ts)", 1986L) .addExpr("YEAR(ts)", 1986L) @@ -54,6 +55,42 @@ public class BeamSqlDateFunctionsIntegrationTest checker.buildRunAndCheck(); } + @Test public void testDatetimePlusFunction() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("TIMESTAMPADD(SECOND, 3, TIMESTAMP '1984-04-19 01:02:03')", + parseDate("1984-04-19 01:02:06")) + .addExpr("TIMESTAMPADD(MINUTE, 3, TIMESTAMP '1984-04-19 01:02:03')", + parseDate("1984-04-19 01:05:03")) + .addExpr("TIMESTAMPADD(HOUR, 3, TIMESTAMP '1984-04-19 01:02:03')", + parseDate("1984-04-19 04:02:03")) + .addExpr("TIMESTAMPADD(DAY, 3, TIMESTAMP '1984-04-19 01:02:03')", + parseDate("1984-04-22 01:02:03")) + .addExpr("TIMESTAMPADD(MONTH, 2, TIMESTAMP '1984-01-19 01:02:03')", + parseDate("1984-03-19 01:02:03")) + .addExpr("TIMESTAMPADD(YEAR, 2, TIMESTAMP '1985-01-19 01:02:03')", + parseDate("1987-01-19 01:02:03")) + ; + checker.buildRunAndCheck(); + } + + @Test public void testDatetimeInfixPlus() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '3' SECOND", + parseDate("1984-01-19 01:02:06")) + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MINUTE", + parseDate("1984-01-19 01:04:03")) + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' HOUR", + parseDate("1984-01-19 03:02:03")) + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' DAY", + parseDate("1984-01-21 01:02:03")) + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' MONTH", + parseDate("1984-03-19 01:02:03")) + .addExpr("TIMESTAMP '1984-01-19 01:02:03' + INTERVAL '2' YEAR", + parseDate("1986-01-19 01:02:03")) + ; + checker.buildRunAndCheck(); + } + @Test public void testDateTimeFunctions_currentTime() throws Exception { String sql = "SELECT " + "LOCALTIME as l,"