flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Roman Wozniak (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-9935) Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
Date Tue, 24 Jul 2018 10:23:00 GMT
Roman Wozniak created FLINK-9935:
------------------------------------

             Summary: Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
                 Key: FLINK-9935
                 URL: https://issues.apache.org/jira/browse/FLINK-9935
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.5.1, 1.5.0
            Reporter: Roman Wozniak


 Grouping by window AND some other attribute(s) seems broken. Test case attached:
{code}
class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {

  trait BatchContext {
    implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)

    val data = Seq(
      (1532424567000L, "id1", "location1"),
      (1532424567000L, "id2", "location1"),
      (1532424567000L, "id3", "location1"),
      (1532424568000L, "id1", "location2"),
      (1532424568000L, "id2", "location3")
    )

    val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)

    val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location)
  }

  it should "be possible to run Table API queries with grouping by tumble window and column(s)
on batch data" in new BatchContext {
    val results = table
      .window(Tumble over 1.second on 'rowtime as 'w)
      .groupBy('w, 'location)
      .select(
        'w.start.cast(Types.LONG),
        'w.end.cast(Types.LONG),
        'location,
        'id.count
      )
      .toDataSet[(Long, Long, String, Long)]
      .collect()

    results should contain theSameElementsAs Seq(
      (1532424567000L, 1532424568000L, "location1", 3L),
      (1532424568000L, 1532424569000L, "location2", 1L),
      (1532424568000L, 1532424569000L, "location3", 1L)
    )
  }
}
{code}
It seems like during execution time, the 'rowtime attribute replaces 'location and that causes
ClassCastException.
{code:java}
[info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
[info]   at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
[info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
[info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
[info]   at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info]   at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
[info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
[info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
[info]   at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
[info]   at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
[info]   at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
{code}

Here is some debug information that I was able to get. So, field serializers don't match the
type of Row fields:
{code}
this.instance = {Row@68451} "1532424567000,(3),1532424567000"
 fields = {Object[3]@68461} 
  0 = {Long@68462} 1532424567000
  1 = {CountAccumulator@68463} "(3)"
  2 = {Long@68462} 1532424567000
this.serializer = {RowSerializer@68452} 
 fieldSerializers = {TypeSerializer[3]@68455} 
  0 = {StringSerializer@68458} 
  1 = {TupleSerializer@68459} 
  2 = {LongSerializer@68460} 
 arity = 3
 nullMask = {boolean[3]@68457} 
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message