flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7426) Table API does not support null values in keys
Date Mon, 14 Aug 2017 08:04:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125352#comment-16125352
] 

Timo Walther commented on FLINK-7426:
-------------------------------------

As far as I know, the DataStream API does not support null values at different locations for
performance reasons. Instead of Tuple we would need to use Row or a special null-aware Tuple
type. [~aljoscha] what is your opinions here? Should we support null values in keys for the
DataStream API? I'm fine with just fixing the Table API. The Table API should at least provide
full null support.

> Table API does not support null values in keys
> ----------------------------------------------
>
>                 Key: FLINK-7426
>                 URL: https://issues.apache.org/jira/browse/FLINK-7426
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.2
>            Reporter: Timo Walther
>
> The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses
instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This
causes issues during checkpointing or when using the RocksDB state backend. We need to replace
all {{keyBy}} calls with a custom {{RowKeySelector}}.
> {code}
> class AggregateITCase extends StreamingWithStateTestBase {
>   private val queryConfig = new StreamQueryConfig()
>   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
>   @Test
>   def testDistinct(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStateBackend(getStateBackend)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     StreamITCase.clear
>     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
>       .select('b, Null(Types.LONG)).distinct()
>     val results = t.toRetractStream[Row](queryConfig)
>     results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
>     env.execute()
>     val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null",
"6,null")
>     assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message