Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 89D67200CDD for ; Mon, 7 Aug 2017 23:06:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1F6F51609C1; Mon, 7 Aug 2017 21:06:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 982C11609BC for ; Mon, 7 Aug 2017 23:06:10 +0200 (CEST) Received: (qmail 68173 invoked by uid 500); 7 Aug 2017 21:06:09 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 68164 invoked by uid 99); 7 Aug 2017 21:06:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Aug 2017 21:06:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 02C11DFA3C; Mon, 7 Aug 2017 21:06:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: takidau@apache.org To: commits@beam.apache.org Date: Mon, 07 Aug 2017 21:06:10 -0000 Message-Id: <0690b181c7854ba19914b19243a245bf@git.apache.org> In-Reply-To: <38840e5e6043451f8569b50e724ebf04@git.apache.org> References: <38840e5e6043451f8569b50e724ebf04@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] beam git commit: Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`. archived-at: Mon, 07 Aug 2017 21:06:32 -0000 Remove redundant windowing information from the BeamRecord itself element window information `BoundedWindow` is added in `BeamSqlExpression`. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0b1fed1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0b1fed1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0b1fed1 Branch: refs/heads/DSL_SQL Commit: c0b1fed1c7687a1e44e8ee6997be40aa1785ea51 Parents: 8f922f7 Author: mingmxu Authored: Fri Aug 4 11:12:45 2017 -0700 Committer: mingmxu Committed: Fri Aug 4 11:41:56 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 7 -- .../org/apache/beam/sdk/values/BeamRecord.java | 36 +------- .../interpreter/BeamSqlExpressionExecutor.java | 5 +- .../sql/impl/interpreter/BeamSqlFnExecutor.java | 5 +- .../operator/BeamSqlCaseExpression.java | 9 +- .../operator/BeamSqlCastExpression.java | 31 +++---- .../interpreter/operator/BeamSqlExpression.java | 9 +- .../operator/BeamSqlInputRefExpression.java | 3 +- .../interpreter/operator/BeamSqlPrimitive.java | 5 +- .../operator/BeamSqlReinterpretExpression.java | 7 +- .../operator/BeamSqlUdfExpression.java | 5 +- .../operator/BeamSqlWindowEndExpression.java | 12 ++- .../operator/BeamSqlWindowExpression.java | 5 +- .../operator/BeamSqlWindowStartExpression.java | 12 ++- .../arithmetic/BeamSqlArithmeticExpression.java | 8 +- .../comparison/BeamSqlCompareExpression.java | 7 +- .../comparison/BeamSqlIsNotNullExpression.java | 5 +- .../comparison/BeamSqlIsNullExpression.java | 5 +- .../date/BeamSqlCurrentDateExpression.java | 3 +- .../date/BeamSqlCurrentTimeExpression.java | 3 +- .../date/BeamSqlCurrentTimestampExpression.java | 3 +- .../date/BeamSqlDateCeilExpression.java | 5 +- .../date/BeamSqlDateFloorExpression.java | 5 +- .../operator/date/BeamSqlExtractExpression.java | 5 +- .../operator/logical/BeamSqlAndExpression.java | 5 +- .../operator/logical/BeamSqlNotExpression.java | 7 +- .../operator/logical/BeamSqlOrExpression.java | 5 +- .../math/BeamSqlMathBinaryExpression.java | 6 +- .../math/BeamSqlMathUnaryExpression.java | 6 +- .../operator/math/BeamSqlPiExpression.java | 3 +- .../operator/math/BeamSqlRandExpression.java | 5 +- .../math/BeamSqlRandIntegerExpression.java | 7 +- .../string/BeamSqlCharLengthExpression.java | 5 +- .../string/BeamSqlConcatExpression.java | 7 +- .../string/BeamSqlInitCapExpression.java | 5 +- .../operator/string/BeamSqlLowerExpression.java | 5 +- .../string/BeamSqlOverlayExpression.java | 11 +-- .../string/BeamSqlPositionExpression.java | 9 +- .../string/BeamSqlSubstringExpression.java | 9 +- .../operator/string/BeamSqlTrimExpression.java | 11 +-- .../operator/string/BeamSqlUpperExpression.java | 5 +- .../transform/BeamAggregationTransforms.java | 7 +- .../sql/impl/transform/BeamSqlFilterFn.java | 5 +- .../sql/impl/transform/BeamSqlProjectFn.java | 3 +- .../sql/BeamSqlDslAggregationTest.java | 17 ---- .../operator/BeamNullExperssionTest.java | 8 +- .../operator/BeamSqlAndOrExpressionTest.java | 8 +- .../operator/BeamSqlCaseExpressionTest.java | 6 +- .../operator/BeamSqlCastExpressionTest.java | 24 +++--- .../operator/BeamSqlCompareExpressionTest.java | 24 +++--- .../operator/BeamSqlInputRefExpressionTest.java | 12 +-- .../operator/BeamSqlPrimitiveTest.java | 10 +-- .../BeamSqlReinterpretExpressionTest.java | 2 +- .../operator/BeamSqlUdfExpressionTest.java | 2 +- .../BeamSqlArithmeticExpressionTest.java | 46 +++++----- .../date/BeamSqlCurrentDateExpressionTest.java | 2 +- .../date/BeamSqlCurrentTimeExpressionTest.java | 2 +- .../BeamSqlCurrentTimestampExpressionTest.java | 2 +- .../date/BeamSqlDateCeilExpressionTest.java | 4 +- .../date/BeamSqlDateFloorExpressionTest.java | 4 +- .../date/BeamSqlExtractExpressionTest.java | 14 +-- .../logical/BeamSqlNotExpressionTest.java | 6 +- .../math/BeamSqlMathBinaryExpressionTest.java | 58 ++++++++----- .../math/BeamSqlMathUnaryExpressionTest.java | 91 ++++++++++---------- .../string/BeamSqlCharLengthExpressionTest.java | 2 +- .../string/BeamSqlConcatExpressionTest.java | 2 +- .../string/BeamSqlInitCapExpressionTest.java | 6 +- .../string/BeamSqlLowerExpressionTest.java | 2 +- .../string/BeamSqlOverlayExpressionTest.java | 8 +- .../string/BeamSqlPositionExpressionTest.java | 6 +- .../string/BeamSqlSubstringExpressionTest.java | 14 +-- .../string/BeamSqlTrimExpressionTest.java | 8 +- .../string/BeamSqlUpperExpressionTest.java | 2 +- 73 files changed, 366 insertions(+), 352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index 06958a4..fe9c295 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.BeamRecordType; @Experimental public class BeamRecordCoder extends CustomCoder { private static final BitSetCoder nullListCoder = BitSetCoder.of(); - private static final InstantCoder instantCoder = InstantCoder.of(); private BeamRecordType recordType; private List coderArray; @@ -64,9 +63,6 @@ public class BeamRecordCoder extends CustomCoder { coderArray.get(idx).encode(value.getFieldValue(idx), outStream); } - - instantCoder.encode(value.getWindowStart(), outStream); - instantCoder.encode(value.getWindowEnd(), outStream); } @Override @@ -82,9 +78,6 @@ public class BeamRecordCoder extends CustomCoder { record.addField(idx, coderArray.get(idx).decode(inStream)); } - record.setWindowStart(instantCoder.decode(inStream)); - record.setWindowEnd(instantCoder.decode(inStream)); - return record; } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index bac649e..8d0aa42 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -24,11 +24,7 @@ import java.util.BitSet; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.joda.time.Instant; /** * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with @@ -42,9 +38,6 @@ public class BeamRecord implements Serializable { private BitSet nullFields; private BeamRecordType dataType; - private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); - private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); - public BeamRecord(BeamRecordType dataType) { this.dataType = dataType; this.nullFields = new BitSet(dataType.size()); @@ -62,17 +55,6 @@ public class BeamRecord implements Serializable { } } - public void updateWindowRange(BeamRecord upstreamRecord, BoundedWindow window){ - windowStart = upstreamRecord.windowStart; - windowEnd = upstreamRecord.windowEnd; - - if (window instanceof IntervalWindow) { - IntervalWindow iWindow = (IntervalWindow) window; - windowStart = iWindow.start(); - windowEnd = iWindow.end(); - } - } - public void addField(String fieldName, Object fieldValue) { addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); } @@ -211,26 +193,10 @@ public class BeamRecord implements Serializable { return nullFields.get(idx); } - public Instant getWindowStart() { - return windowStart; - } - - public Instant getWindowEnd() { - return windowEnd; - } - - public void setWindowStart(Instant windowStart) { - this.windowStart = windowStart; - } - - public void setWindowEnd(Instant windowEnd) { - this.windowEnd = windowEnd; - } - @Override public String toString() { return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" - + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; + + dataType + "]"; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java index 3cd6d65..3aaf505 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; /** @@ -34,10 +35,10 @@ public interface BeamSqlExpressionExecutor extends Serializable { void prepare(); /** - * apply transformation to input record {@link BeamRecord}. + * apply transformation to input record {@link BeamRecord} with {@link BoundedWindow}. * */ - List execute(BeamRecord inputRow); + List execute(BeamRecord inputRow, BoundedWindow window); void close(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java index 0f77ed8..8f9797b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -88,6 +88,7 @@ import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamS import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -427,10 +428,10 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { } @Override - public List execute(BeamRecord inputRow) { + public List execute(BeamRecord inputRow, BoundedWindow window) { List results = new ArrayList<>(); for (BeamSqlExpression exp : exps) { - results.add(exp.evaluate(inputRow).getValue()); + results.add(exp.evaluate(inputRow, window).getValue()); } return results; } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java index af48cbe..955444f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -48,16 +49,16 @@ public class BeamSqlCaseExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { for (int i = 0; i < operands.size() - 1; i += 2) { - if (opValueEvaluated(i, inputRow)) { + if (opValueEvaluated(i, inputRow, window)) { return BeamSqlPrimitive.of( outputType, - opValueEvaluated(i + 1, inputRow) + opValueEvaluated(i + 1, inputRow, window) ); } } return BeamSqlPrimitive.of(outputType, - opValueEvaluated(operands.size() - 1, inputRow)); + opValueEvaluated(operands.size() - 1, inputRow, window)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java index 3786281..9ea66c1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.sql.Date; import java.sql.Timestamp; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.runtime.SqlFunctions; import org.apache.calcite.sql.type.SqlTypeName; @@ -71,40 +72,40 @@ public class BeamSqlCastExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { SqlTypeName castOutputType = getOutputType(); switch (castOutputType) { case INTEGER: return BeamSqlPrimitive - .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); + .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow, window))); case DOUBLE: - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); + return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, + SqlFunctions.toDouble(opValueEvaluated(index, inputRow, window))); case SMALLINT: - return BeamSqlPrimitive - .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); + return BeamSqlPrimitive.of(SqlTypeName.SMALLINT, + SqlFunctions.toShort(opValueEvaluated(index, inputRow, window))); case TINYINT: - return BeamSqlPrimitive - .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); + return BeamSqlPrimitive.of(SqlTypeName.TINYINT, + SqlFunctions.toByte(opValueEvaluated(index, inputRow, window))); case BIGINT: return BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); + .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow, window))); case DECIMAL: return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, - SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); + SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow, window))); case FLOAT: return BeamSqlPrimitive - .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); + .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow, window))); case CHAR: case VARCHAR: return BeamSqlPrimitive - .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); + .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow, window).toString()); case DATE: - return BeamSqlPrimitive - .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); + return BeamSqlPrimitive.of(SqlTypeName.DATE, + toDate(opValueEvaluated(index, inputRow, window), outputDateFormat)); case TIMESTAMP: return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); + toTimeStamp(opValueEvaluated(index, inputRow, window), outputTimestampFormat)); } throw new UnsupportedOperationException( String.format("Cast to type %s not supported", castOutputType)); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java index f42a365..d18b141 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; @@ -49,8 +50,8 @@ public abstract class BeamSqlExpression implements Serializable { return op(idx).getOutputType(); } - public T opValueEvaluated(int idx, BeamRecord row) { - return (T) op(idx).evaluate(row).getValue(); + public T opValueEvaluated(int idx, BeamRecord row, BoundedWindow window) { + return (T) op(idx).evaluate(row, window).getValue(); } /** @@ -59,10 +60,10 @@ public abstract class BeamSqlExpression implements Serializable { public abstract boolean accept(); /** - * Apply input record {@link BeamRecord} to this expression, + * Apply input record {@link BeamRecord} with {@link BoundedWindow} to this expression, * the output value is wrapped with {@link BeamSqlPrimitive}. */ - public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow); + public abstract BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window); public List getOperands() { return operands; http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java index 8c3d4d4..a2d1624 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -37,7 +38,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java index f763898..9175caa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -21,13 +21,14 @@ import java.math.BigDecimal; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.NlsString; /** * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. - * It holds the value, and return it directly during {@link #evaluate(BeamRecord)}. + * It holds the value, and return it directly during {@link #evaluate(BeamRecord, BoundedWindow)}. * */ public class BeamSqlPrimitive extends BeamSqlExpression { @@ -145,7 +146,7 @@ public class BeamSqlPrimitive extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java index c1fa2c7..2ec4fb5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -41,13 +42,13 @@ public class BeamSqlReinterpretExpression extends BeamSqlExpression { && SqlTypeName.DATETIME_TYPES.contains(opType(0)); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { if (opType(0) == SqlTypeName.TIME) { - GregorianCalendar date = opValueEvaluated(0, inputRow); + GregorianCalendar date = opValueEvaluated(0, inputRow, window); return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); } else { - Date date = opValueEvaluated(0, inputRow); + Date date = opValueEvaluated(0, inputRow, window); return BeamSqlPrimitive.of(outputType, date.getTime()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java index da706f3..f1bcb66 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -51,14 +52,14 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { if (method == null) { reConstructMethod(); } try { List paras = new ArrayList<>(); for (BeamSqlExpression e : getOperands()) { - paras.add(e.evaluate(inputRow).getValue()); + paras.add(e.evaluate(inputRow, window).getValue()); } return BeamSqlPrimitive.of(getOutputType(), http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java index 2f4c165..919612e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -34,9 +36,13 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRow.getWindowEnd().getMillis())); + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + if (window instanceof IntervalWindow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).end().toDate()); + } else { + throw new UnsupportedOperationException( + "Cannot run HOP_END|TUMBLE_END|SESSION_END on GlobalWindow."); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java index 2f3dd5c..0298f26 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -42,9 +43,9 @@ public class BeamSqlWindowExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - (Date) operands.get(0).evaluate(inputRow).getValue()); + (Date) operands.get(0).evaluate(inputRow, window).getValue()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java index 9186ec0..4b250a5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.util.Date; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -35,9 +37,13 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - new Date(inputRow.getWindowStart().getMillis())); + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + if (window instanceof IntervalWindow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, ((IntervalWindow) window).start().toDate()); + } else { + throw new UnsupportedOperationException( + "Cannot run HOP_START|TUMBLE_START|SESSION_START on GlobalWindow."); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java index fd36457..cc15ff5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -50,11 +51,12 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { super(operands, outputType); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, + BoundedWindow window) { BigDecimal left = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(0, inputRow).toString())); + Double.valueOf(opValueEvaluated(0, inputRow, window).toString())); BigDecimal right = BigDecimal.valueOf( - Double.valueOf(opValueEvaluated(1, inputRow).toString())); + Double.valueOf(opValueEvaluated(1, inputRow, window).toString())); BigDecimal result = calc(left, right); return getCorrectlyTypedResult(result); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java index 93032ae..df8bd61 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlCompareExpression.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -51,9 +52,9 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Object leftValue = operands.get(0).evaluate(inputRow).getValue(); - Object rightValue = operands.get(1).evaluate(inputRow).getValue(); + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Object leftValue = operands.get(0).evaluate(inputRow, window).getValue(); + Object rightValue = operands.get(1).evaluate(inputRow, window).getValue(); switch (operands.get(0).getOutputType()) { case BIGINT: case DECIMAL: http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java index 7177d96..9a9739e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -46,8 +47,8 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Object leftValue = operands.get(0).evaluate(inputRow, window).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java index c74fcd9..6034344 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/comparison/BeamSqlIsNullExpression.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -46,8 +47,8 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Object leftValue = operands.get(0).evaluate(inputRow, window).getValue(); return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java index 86abe43..336772d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentDateExpression.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Date; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -38,7 +39,7 @@ public class BeamSqlCurrentDateExpression extends BeamSqlExpression { return getOperands().size() == 0; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java index d8de464..fe3feb8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimeExpression.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.TimeZone; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -44,7 +45,7 @@ public class BeamSqlCurrentTimeExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault()); ret.setTime(new Date()); return BeamSqlPrimitive.of(outputType, ret); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java index 4736571..ca4b3ce 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -42,7 +43,7 @@ public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression { return opCount <= 1; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return BeamSqlPrimitive.of(outputType, new Date()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java index 55767fa..0e1d3db 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateCeilExpression.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; @@ -41,8 +42,8 @@ public class BeamSqlDateCeilExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Date date = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Date date = opValueEvaluated(0, inputRow, window); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java index 3310da5..2593629 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlDateFloorExpression.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; @@ -41,8 +42,8 @@ public class BeamSqlDateFloorExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.SYMBOL; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Date date = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Date date = opValueEvaluated(0, inputRow, window); long time = date.getTime(); TimeUnitRange unit = ((BeamSqlPrimitive) op(1)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java index 47cd879..38afd0a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/date/BeamSqlExtractExpression.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.avatica.util.DateTimeUtils; import org.apache.calcite.avatica.util.TimeUnitRange; @@ -61,8 +62,8 @@ public class BeamSqlExtractExpression extends BeamSqlExpression { && opType(1) == SqlTypeName.BIGINT; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Long time = opValueEvaluated(1, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Long time = opValueEvaluated(1, inputRow, window); TimeUnitRange unit = ((BeamSqlPrimitive) op(0)).getValue(); http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java index b8964d5..2cae22b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlAndExpression.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,10 +33,10 @@ public class BeamSqlAndExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { boolean result = true; for (BeamSqlExpression exp : operands) { - BeamSqlPrimitive expOut = exp.evaluate(inputRow); + BeamSqlPrimitive expOut = exp.evaluate(inputRow, window); result = result && expOut.getValue(); if (!result) { break; http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java index f9578b9..72a6982 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlNotExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -42,10 +43,10 @@ public class BeamSqlNotExpression extends BeamSqlLogicalExpression { return super.accept(); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - Boolean value = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + Boolean value = opValueEvaluated(0, inputRow, window); if (value == null) { - return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, null); + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, window); } else { return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, !value); } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java index 88a3916..74dde7a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/logical/BeamSqlOrExpression.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,10 +33,10 @@ public class BeamSqlOrExpression extends BeamSqlLogicalExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { boolean result = false; for (BeamSqlExpression exp : operands) { - BeamSqlPrimitive expOut = exp.evaluate(inputRow); + BeamSqlPrimitive expOut = exp.evaluate(inputRow, window); result = result || expOut.getValue(); if (result) { break; http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java index 8f6c00c..ed0aac0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathBinaryExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -38,10 +39,11 @@ public abstract class BeamSqlMathBinaryExpression extends BeamSqlExpression { return numberOfOperands() == 2 && isOperandNumeric(opType(0)) && isOperandNumeric(opType(1)); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, + BoundedWindow window) { BeamSqlExpression leftOp = op(0); BeamSqlExpression rightOp = op(1); - return calculate(leftOp.evaluate(inputRow), rightOp.evaluate(inputRow)); + return calculate(leftOp.evaluate(inputRow, window), rightOp.evaluate(inputRow, window)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java index b225b8e..b1a210e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlMathUnaryExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -45,9 +46,10 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression { return acceptance; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, + BoundedWindow window) { BeamSqlExpression operand = op(0); - return calculate(operand.evaluate(inputRow)); + return calculate(operand.evaluate(inputRow, window)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java index 676f859..3072ea0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlPiExpression.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -36,7 +37,7 @@ public class BeamSqlPiExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, Math.PI); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java index 0575978..00f2693 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandExpression.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Random; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -42,9 +43,9 @@ public class BeamSqlRandExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRecord) { + public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) { if (operands.size() == 1) { - int rowSeed = opValueEvaluated(0, inputRecord); + int rowSeed = opValueEvaluated(0, inputRecord, window); if (seed == null || seed != rowSeed) { rand.setSeed(rowSeed); } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java index 52f0cc1..d055de6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/math/BeamSqlRandIntegerExpression.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Random; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -43,16 +44,16 @@ public class BeamSqlRandIntegerExpression extends BeamSqlExpression { } @Override - public BeamSqlPrimitive evaluate(BeamRecord inputRecord) { + public BeamSqlPrimitive evaluate(BeamRecord inputRecord, BoundedWindow window) { int numericIdx = 0; if (operands.size() == 2) { - int rowSeed = opValueEvaluated(0, inputRecord); + int rowSeed = opValueEvaluated(0, inputRecord, window); if (seed == null || seed != rowSeed) { rand.setSeed(rowSeed); } numericIdx = 1; } return BeamSqlPrimitive.of(SqlTypeName.INTEGER, - rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord))); + rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord, window))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java index 974e2bc..5146b14 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,8 +33,8 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.INTEGER); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java index 14ef55d..c2f317f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -51,9 +52,9 @@ public class BeamSqlConcatExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String left = opValueEvaluated(0, inputRow); - String right = opValueEvaluated(1, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String left = opValueEvaluated(0, inputRow, window); + String right = opValueEvaluated(1, inputRow, window); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, new StringBuilder(left.length() + right.length()) http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java index e50872b..bf0b8f5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,8 +33,8 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); StringBuilder ret = new StringBuilder(str); boolean isInit = true; http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java index 0f9a501..55f8d6d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,8 +33,8 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java index 2336876..62d5a64 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -54,15 +55,15 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); - String replaceStr = opValueEvaluated(1, inputRow); - int idx = opValueEvaluated(2, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); + String replaceStr = opValueEvaluated(1, inputRow, window); + int idx = opValueEvaluated(2, inputRow, window); // the index is 1 based. idx -= 1; int length = replaceStr.length(); if (operands.size() == 4) { - length = opValueEvaluated(3, inputRow); + length = opValueEvaluated(3, inputRow, window); } StringBuilder result = new StringBuilder( http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java index 06dce91..f97547e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -56,12 +57,12 @@ public class BeamSqlPositionExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String targetStr = opValueEvaluated(0, inputRow); - String containingStr = opValueEvaluated(1, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String targetStr = opValueEvaluated(0, inputRow, window); + String containingStr = opValueEvaluated(1, inputRow, window); int from = -1; if (operands.size() == 3) { - Number tmp = opValueEvaluated(2, inputRow); + Number tmp = opValueEvaluated(2, inputRow, window); from = tmp.intValue(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java index f8582aa..a521ef0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -54,9 +55,9 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); - int idx = opValueEvaluated(1, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); + int idx = opValueEvaluated(1, inputRow, window); int startIdx = idx; if (startIdx > 0) { // NOTE: SQL substring is 1 based(rather than 0 based) @@ -69,7 +70,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression { } if (operands.size() == 3) { - int length = opValueEvaluated(2, inputRow); + int length = opValueEvaluated(2, inputRow, window); if (length < 0) { length = 0; } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java index 9c2a7ae..3c3083c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.fun.SqlTrimFunction; import org.apache.calcite.sql.type.SqlTypeName; @@ -58,14 +59,14 @@ public class BeamSqlTrimExpression extends BeamSqlExpression { return true; } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { if (operands.size() == 1) { return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, - opValueEvaluated(0, inputRow).toString().trim()); + opValueEvaluated(0, inputRow, window).toString().trim()); } else { - SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow); - String targetStr = opValueEvaluated(1, inputRow); - String containingStr = opValueEvaluated(2, inputRow); + SqlTrimFunction.Flag type = opValueEvaluated(0, inputRow, window); + String targetStr = opValueEvaluated(1, inputRow, window); + String containingStr = opValueEvaluated(2, inputRow, window); switch (type) { case LEADING: http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java index 94ac2e2..bc29ec8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -32,8 +33,8 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression { super(operands, SqlTypeName.VARCHAR); } - @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow) { - String str = opValueEvaluated(0, inputRow); + @Override public BeamSqlPrimitive evaluate(BeamRecord inputRow, BoundedWindow window) { + String str = opValueEvaluated(0, inputRow, window); return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java index dab79a2..ce5444f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.core.AggregateCall; @@ -75,7 +76,6 @@ public class BeamAggregationTransforms implements Serializable{ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { BeamRecord outRecord = new BeamRecord(outRowType); - outRecord.updateWindowRange(c.element().getKey(), window); KV kvRecord = c.element(); for (String f : kvRecord.getKey().getDataType().getFieldsName()) { @@ -85,7 +85,7 @@ public class BeamAggregationTransforms implements Serializable{ outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); } if (windowStartFieldIdx != -1) { - outRecord.addField(windowStartFieldIdx, outRecord.getWindowStart().toDate()); + outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate()); } c.output(outRecord); @@ -112,7 +112,6 @@ public class BeamAggregationTransforms implements Serializable{ public BeamRecord apply(BeamRecord input) { BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input)); BeamRecord keyOfRecord = new BeamRecord(typeOfKey); - keyOfRecord.updateWindowRange(input, null); for (int idx = 0; idx < groupByKeys.size(); ++idx) { keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx))); @@ -223,7 +222,7 @@ public class BeamAggregationTransforms implements Serializable{ for (int idx = 0; idx < aggregators.size(); ++idx) { deltaAcc.accumulatorElements.add( aggregators.get(idx).add(accumulator.accumulatorElements.get(idx), - sourceFieldExps.get(idx).evaluate(input).getValue())); + sourceFieldExps.get(idx).evaluate(input, null).getValue())); } return deltaAcc; } http://git-wip-us.apache.org/repos/asf/beam/blob/c0b1fed1/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java index 31efeb7..d3a3f7b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlFilterFn.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.BeamRecord; /** @@ -44,10 +45,10 @@ public class BeamSqlFilterFn extends DoFn { } @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(ProcessContext c, BoundedWindow window) { BeamRecord in = c.element(); - List result = executor.execute(in); + List result = executor.execute(in, window); if ((Boolean) result.get(0)) { c.output(in);