From "Roman Wozniak (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-9935) Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
Date Tue, 24 Jul 2018 10:23:00 GMT
Roman Wozniak created FLINK-9935:

             Summary: Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
                 Key: FLINK-9935
                 URL: https://issues.apache.org/jira/browse/FLINK-9935
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.5.1, 1.5.0
            Reporter: Roman Wozniak

 Grouping by window AND some other attribute(s) seems broken. Test case attached:
class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {

  trait BatchContext {
    implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)

    val data = Seq(
      (1532424567000L, "id1", "location1"),
      (1532424567000L, "id2", "location1"),
      (1532424567000L, "id3", "location1"),
      (1532424568000L, "id1", "location2"),
      (1532424568000L, "id2", "location3")

    val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)

    val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location)

  it should "be possible to run Table API queries with grouping by tumble window and column(s)
on batch data" in new BatchContext {
    val results = table
      .window(Tumble over 1.second on 'rowtime as 'w)
      .groupBy('w, 'location)
      .toDataSet[(Long, Long, String, Long)]

    results should contain theSameElementsAs Seq(
      (1532424567000L, 1532424568000L, "location1", 3L),
      (1532424568000L, 1532424569000L, "location2", 1L),
      (1532424568000L, 1532424569000L, "location3", 1L)
It seems like during execution time, the 'rowtime attribute replaces 'location and that causes
[info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
[info]   at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
[info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
[info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
[info]   at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info]   at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
[info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
[info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
[info]   at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
[info]   at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
[info]   at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)

Here is some debug information that I was able to get. So, field serializers don't match the
type of Row fields:
this.instance = {Row@68451} "1532424567000,(3),1532424567000"
 fields = {Object[3]@68461} 
  0 = {Long@68462} 1532424567000
  1 = {CountAccumulator@68463} "(3)"
  2 = {Long@68462} 1532424567000
this.serializer = {RowSerializer@68452} 
 fieldSerializers = {TypeSerializer[3]@68455} 
  0 = {StringSerializer@68458} 
  1 = {TupleSerializer@68459} 
  2 = {LongSerializer@68460} 
 arity = 3
 nullMask = {boolean[3]@68457} 

