flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [flink] branch master updated: [FLINK-24081][table-planner] Fix StreamExecOverAggregate and StreamExecTemporalSort operator didn't support TIMESTAMP_LTZ rowtime (#17074)
Date Wed, 01 Sep 2021 02:13:43 GMT
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e68b3a4  [FLINK-24081][table-planner] Fix StreamExecOverAggregate and StreamExecTemporalSort
operator didn't support TIMESTAMP_LTZ rowtime (#17074)
e68b3a4 is described below

commit e68b3a4e6ba3ca8f6b9256a2a34c622d5ef7cda0
Author: Leonard Xu <xbjtdcq@163.com>
AuthorDate: Wed Sep 1 10:13:18 2021 +0800

    [FLINK-24081][table-planner] Fix StreamExecOverAggregate and StreamExecTemporalSort operator
didn't support TIMESTAMP_LTZ rowtime (#17074)
---
 .../nodes/exec/stream/StreamExecOverAggregate.java |  11 +--
 .../exec/stream/StreamExecPythonOverAggregate.java |  11 +--
 .../nodes/exec/stream/StreamExecTemporalSort.java  |  32 +++---
 .../runtime/stream/sql/OverAggregateITCase.scala   | 107 +++++++++++++++++----
 .../runtime/stream/sql/TemporalSortITCase.scala    | 106 ++++++++++++++++----
 5 files changed, 197 insertions(+), 70 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
index 86bc2d8..d0fd7f3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java
@@ -51,11 +51,8 @@ import org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPreceding
 import org.apache.flink.table.runtime.operators.over.RowTimeRowsUnboundedPrecedingFunction;
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TimestampType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -74,6 +71,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -149,11 +148,9 @@ public class StreamExecOverAggregate extends ExecNodeBase<RowData>
         final LogicalType orderKeyType = inputRowType.getFields().get(orderKey).getType();
         // check time field && identify window rowtime attribute
         final int rowTimeIdx;
-        if (orderKeyType instanceof TimestampType
-                && ((TimestampType) orderKeyType).getKind() == TimestampKind.ROWTIME)
{
+        if (isRowtimeAttribute(orderKeyType)) {
             rowTimeIdx = orderKey;
-        } else if (orderKeyType instanceof LocalZonedTimestampType
-                && ((LocalZonedTimestampType) orderKeyType).getKind() == TimestampKind.PROCTIME)
{
+        } else if (isProctimeAttribute(orderKeyType)) {
             rowTimeIdx = -1;
         } else {
             throw new TableException(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index f566f6f..5f9bd11 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -40,11 +40,8 @@ import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TimestampType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -59,6 +56,8 @@ import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -150,11 +149,9 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
         final LogicalType orderKeyType = inputRowType.getFields().get(orderKey).getType();
         // check time field && identify window rowtime attribute
         final int rowTimeIdx;
-        if (orderKeyType instanceof TimestampType
-                && ((TimestampType) orderKeyType).getKind() == TimestampKind.ROWTIME)
{
+        if (isRowtimeAttribute(orderKeyType)) {
             rowTimeIdx = orderKey;
-        } else if (orderKeyType instanceof LocalZonedTimestampType
-                && ((LocalZonedTimestampType) orderKeyType).getKind() == TimestampKind.PROCTIME)
{
+        } else if (isProctimeAttribute(orderKeyType)) {
             rowTimeIdx = -1;
         } else {
             throw new TableException(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
index 4d34521..a177011 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
@@ -35,11 +35,8 @@ import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sort.ProcTimeSortOperator;
 import org.apache.flink.table.runtime.operators.sort.RowTimeSortOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TimestampType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -47,6 +44,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isProctimeAttribute;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -101,24 +100,17 @@ public class StreamExecTemporalSort extends ExecNodeBase<RowData>
         RowType inputType = (RowType) inputEdge.getOutputType();
         LogicalType timeType = inputType.getTypeAt(sortSpec.getFieldSpec(0).getFieldIndex());
         TableConfig config = planner.getTableConfig();
-        if (timeType instanceof TimestampType) {
-            TimestampType keyType = (TimestampType) timeType;
-            if (keyType.getKind() == TimestampKind.ROWTIME) {
-                return createSortRowTime(inputType, inputTransform, config);
-            }
-        }
-        if (timeType instanceof LocalZonedTimestampType) {
-            LocalZonedTimestampType keyType = (LocalZonedTimestampType) timeType;
-            if (keyType.getKind() == TimestampKind.PROCTIME) {
-                return createSortProcTime(inputType, inputTransform, config);
-            }
+        if (isRowtimeAttribute(timeType)) {
+            return createSortRowTime(inputType, inputTransform, config);
+        } else if (isProctimeAttribute(timeType)) {
+            return createSortProcTime(inputType, inputTransform, config);
+        } else {
+            throw new TableException(
+                    String.format(
+                            "Sort: Internal Error\n"
+                                    + "First field in temporal sort is not a time attribute,
%s is given.",
+                            timeType));
         }
-
-        throw new TableException(
-                String.format(
-                        "Sort: Internal Error\n"
-                                + "First field in temporal sort is not a time attribute,
%s is given.",
-                        timeType));
     }
 
     /** Create Sort logic based on processing time. */
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
index acc3ba0..e43ce4c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala
@@ -22,6 +22,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeProcessOperator
 import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.{CountNullNonNull,
CountPairs, LargerThanCount}
@@ -33,7 +35,10 @@ import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import scala.collection.mutable
+
+import java.time.{Instant, LocalDateTime}
+
+import scala.collection.{Seq, mutable}
 
 @RunWith(classOf[Parameterized])
 class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode)
{
@@ -1004,28 +1009,94 @@ class OverAggregateITCase(mode: StateBackendMode) extends StreamingWithStateTest
   }
 
   @Test
-  def testRowTimeDistinctUnboundedPartitionedRangeOverWithNullValues(): Unit = {
-    val data = List(
-      (1L, 1, null),
-      (2L, 1, null),
-      (3L, 2, null),
-      (4L, 1, "Hello"),
-      (5L, 1, "Hello"),
-      (6L, 2, "Hello"),
-      (7L, 1, "Hello World"),
-      (8L, 2, "Hello World"),
-      (9L, 2, "Hello World"),
-      (10L, 1, null))
+  def testTimestampRowTimeDistinctUnboundedPartitionedRangeOverWithNullValues(): Unit = {
+    val rows = Seq(
+      row(LocalDateTime.parse("1970-01-01T00:00:01"), 1, null),
+      row(LocalDateTime.parse("1970-01-01T00:00:02"), 1, null),
+      row(LocalDateTime.parse("1970-01-01T00:00:03"), 2, null),
+      row(LocalDateTime.parse("1970-01-01T00:00:04"), 1, "Hello"),
+      row(LocalDateTime.parse("1970-01-01T00:00:05"), 1, "Hello"),
+      row(LocalDateTime.parse("1970-01-01T00:00:06"), 2, "Hello"),
+      row(LocalDateTime.parse("1970-01-01T00:00:07"), 1, "Hello World"),
+      row(LocalDateTime.parse("1970-01-01T00:00:08"), 2, "Hello World"),
+      row(LocalDateTime.parse("1970-01-01T00:00:09"), 2, "Hello World"),
+      row(LocalDateTime.parse("1970-01-01T00:00:10"), 1, null))
+
+    val tableId = TestValuesTableFactory.registerData(rows)
 
     // for sum aggregation ensure that every time the order of each element is consistent
     env.setParallelism(1)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyTable (
+         |  rowtime TIMESTAMP(3),
+         |  b INT,
+         |  c STRING,
+         |  WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$tableId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin)
+
+    tEnv.createTemporaryFunction("CntNullNonNull", new CountNullNonNull)
 
-    val table = failingDataSource(data)
-      .assignAscendingTimestamps(_._1)
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    val sqlQuery = "SELECT " +
+      "  c, " +
+      "  b, " +
+      "  COUNT(DISTINCT c) " +
+      "    OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding), " +
+      "  CntNullNonNull(DISTINCT c) " +
+      "    OVER (PARTITION BY b ORDER BY rowtime RANGE UNBOUNDED preceding)" +
+      "FROM " +
+      "  MyTable"
 
-    tEnv.registerTable("MyTable", table)
-    tEnv.registerFunction("CntNullNonNull", new CountNullNonNull)
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "null,1,0,0|1", "null,1,0,0|1", "null,2,0,0|1", "null,1,2,2|1",
+      "Hello,1,1,1|1", "Hello,1,1,1|1", "Hello,2,1,1|1",
+      "Hello World,1,2,2|1", "Hello World,2,2,2|1", "Hello World,2,2,2|1")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testTimestampLtzRowTimeDistinctUnboundedPartitionedRangeOverWithNullValues(): Unit
= {
+    val rows = Seq(
+      row(Instant.ofEpochSecond(1L), 1, null),
+      row(Instant.ofEpochSecond(2L), 1, null),
+      row(Instant.ofEpochSecond(3L), 2, null),
+      row(Instant.ofEpochSecond(4L), 1, "Hello"),
+      row(Instant.ofEpochSecond(5L), 1, "Hello"),
+      row(Instant.ofEpochSecond(6L), 2, "Hello"),
+      row(Instant.ofEpochSecond(7L), 1, "Hello World"),
+      row(Instant.ofEpochSecond(8L), 2, "Hello World"),
+      row(Instant.ofEpochSecond(9L), 2, "Hello World"),
+      row(Instant.ofEpochSecond(10L), 1, null))
+
+    val tableId = TestValuesTableFactory.registerData(rows)
+
+    // for sum aggregation ensure that every time the order of each element is consistent
+    env.setParallelism(1)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyTable (
+         |  rowtime TIMESTAMP_LTZ(3),
+         |  b INT,
+         |  c STRING,
+         |  WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$tableId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin)
+
+    tEnv.createTemporaryFunction("CntNullNonNull", new CountNullNonNull)
 
     val sqlQuery = "SELECT " +
       "  c, " +
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
index ed90898..e8179d3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalSortITCase.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.planner.runtime.stream.sql
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.planner.runtime.utils._
@@ -31,6 +33,10 @@ import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import java.time.{Instant, LocalDateTime}
+
+import scala.collection.Seq
+
 @RunWith(classOf[Parameterized])
 class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode)
{
 
@@ -122,25 +128,89 @@ class TemporalSortITCase(mode: StateBackendMode) extends StreamingWithStateTestB
   }
 
   @Test
-  def testEventTimeAndOtherFieldOrderBy(): Unit = {
-    val data = List(
-      (3L, 2L, "Hello world", 3),
-      (2L, 2L, "Hello", 2),
-      (6L, 3L, "Luke Skywalker", 6),
-      (5L, 3L, "I am fine.", 5),
-      (7L, 4L, "Comment#1", 7),
-      (9L, 4L, "Comment#3", 9),
-      (10L, 4L, "Comment#4", 10),
-      (8L, 4L, "Comment#2", 8),
-      (1L, 1L, "Hi", 2),
-      (1L, 1L, "Hi", 1),
-      (4L, 3L, "Helloworld, how are you?", 4))
+  def testTimestampEventTimeAndOtherFieldOrderBy(): Unit = {
+    val rows = Seq(
+      row(LocalDateTime.parse("1970-01-01T00:00:03"), 2L, "Hello world", 3),
+      row(LocalDateTime.parse("1970-01-01T00:00:02"), 2L, "Hello", 2),
+      row(LocalDateTime.parse("1970-01-01T00:00:06"), 3L, "Luke Skywalker", 6),
+      row(LocalDateTime.parse("1970-01-01T00:00:05"), 3L, "I am fine.", 5),
+      row(LocalDateTime.parse("1970-01-01T00:00:07"), 4L, "Comment#1", 7),
+      row(LocalDateTime.parse("1970-01-01T00:00:09"), 4L, "Comment#3", 9),
+      row(LocalDateTime.parse("1970-01-01T00:00:10"), 4L, "Comment#4", 10),
+      row(LocalDateTime.parse("1970-01-01T00:00:08"), 4L, "Comment#2", 8),
+      row(LocalDateTime.parse("1970-01-01T00:00:01"), 1L, "Hi", 2),
+      row(LocalDateTime.parse("1970-01-01T00:00:01"), 1L, "Hi", 1),
+      row(LocalDateTime.parse("1970-01-01T00:00:04"), 3L, "Helloworld, how are you?", 4))
 
-    val t = failingDataSource(data)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Long, String, Int)](10L))
-      .toTable(tEnv, 'rowtime.rowtime, 'key, 'str, 'int)
-    tEnv.registerTable("T", t)
+    val tableId = TestValuesTableFactory.registerData(rows)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE T (
+         |  rowtime TIMESTAMP(3),
+         |  key BIGINT,
+         |  str STRING,
+         |  `int` INT,
+         |  WATERMARK FOR rowtime AS rowtime - interval '10' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$tableId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin)
+
+    val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime, `int`"
+
+    val sink = new TestingRetractSink
+    val results = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
+    results.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq(
+      "1,Hi,1",
+      "1,Hi,2",
+      "2,Hello,2",
+      "2,Hello world,3",
+      "3,Helloworld, how are you?,4",
+      "3,I am fine.,5",
+      "3,Luke Skywalker,6",
+      "4,Comment#1,7",
+      "4,Comment#2,8",
+      "4,Comment#3,9",
+      "4,Comment#4,10")
+
+    assertEquals(expected, sink.getRetractResults)
+  }
+
+  @Test
+  def testTimestampLtzEventTimeAndOtherFieldOrderBy(): Unit = {
+    val rows = Seq(
+      row(Instant.ofEpochSecond(3L), 2L, "Hello world", 3),
+      row(Instant.ofEpochSecond(2L), 2L, "Hello", 2),
+      row(Instant.ofEpochSecond(6L), 3L, "Luke Skywalker", 6),
+      row(Instant.ofEpochSecond(5L), 3L, "I am fine.", 5),
+      row(Instant.ofEpochSecond(7L), 4L, "Comment#1", 7),
+      row(Instant.ofEpochSecond(9L), 4L, "Comment#3", 9),
+      row(Instant.ofEpochSecond(10L), 4L, "Comment#4", 10),
+      row(Instant.ofEpochSecond(8L), 4L, "Comment#2", 8),
+      row(Instant.ofEpochSecond(1L), 1L, "Hi", 2),
+      row(Instant.ofEpochSecond(1L), 1L, "Hi", 1),
+      row(Instant.ofEpochSecond(4L), 3L, "Helloworld, how are you?", 4))
+
+    val tableId = TestValuesTableFactory.registerData(rows)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE T (
+         |  rowtime TIMESTAMP_LTZ(3),
+         |  key BIGINT,
+         |  str STRING,
+         |  `int` INT,
+         |  WATERMARK FOR rowtime AS rowtime - interval '10' SECOND
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$tableId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin)
 
     val sqlQuery = "SELECT key, str, `int` FROM T ORDER BY rowtime, `int`"
 

Mime
View raw message