flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] hequn8128 commented on a change in pull request #6648: [FLINK-10261][table] fix insert into with order by
Date Tue, 04 Sep 2018 02:22:06 GMT
hequn8128 commented on a change in pull request #6648: [FLINK-10261][table] fix insert into
with order by
URL: https://github.com/apache/flink/pull/6648#discussion_r214773269
 
 

 ##########
 File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ##########
 @@ -742,6 +742,36 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
   }
 
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+        .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable order by rowtime, a desc"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "3,2,Hello world,1970-01-01 00:00:00.002",
+      "2,2,Hello,1970-01-01 00:00:00.002")
+    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
 
 Review comment:
   The reason why output randomly is sink tasks have not been chained with the sort task.
We can correct the `emitDataStream` in `UnsafeMemoryAppendTableSink` with:
   ```
       override def emitDataStream(dataStream: DataStream[Row]): Unit = {
         val inputParallelism = dataStream.getParallelism
         dataStream
           .addSink(new MemoryAppendSink)
           .setParallelism(inputParallelism)
           .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
       }
   ```
   Once we correct it, remove `sorted` in `assertEquals `.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message