flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-7426) Table API does not support null values in keys
Date Fri, 11 Aug 2017 08:52:00 GMT
Timo Walther created FLINK-7426:

             Summary: Table API does not support null values in keys
                 Key: FLINK-7426
                 URL: https://issues.apache.org/jira/browse/FLINK-7426
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther

The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses instances
of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This causes issues
during checkpointing or when using the RocksDB state backend. We need to replace all {{keyBy}}
calls with a custom {{RowKeySelector}}.

class AggregateITCase extends StreamingWithStateTestBase {
  private val queryConfig = new StreamQueryConfig()
  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))

  def testDistinct(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
      .select('b, Null(Types.LONG)).distinct()

    val results = t.toRetractStream[Row](queryConfig)
    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)

    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null")
    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)

This message was sent by Atlassian JIRA

View raw message