flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Polarisary <polaris...@gmail.com>
Subject Flink 1.9 Sql Rowtime Error
Date Fri, 01 Nov 2019 07:49:50 GMT
Hi All:
I have define kafka connector Descriptor, and registe Table

tEnv.connect(new Kafka()
        .version("universal")
        .topic(tableName)
        .startFromEarliest()
        .property("zookeeper.connect", “xxx")
        .property("bootstrap.servers", “xxx")
        .property("group.id", “xxx"))
        .withFormat(new Json().deriveSchema())
        .withSchema(new Schema()
                .field("rowtime", Types.SQL_TIMESTAMP)
                    .rowtime(new Rowtime()
                            .timestampsFromField("createTime")
                            .watermarksPeriodicBounded(300_000))
                .field("data", Types.ROW(dataFieldTypes)))
        .inAppendMode().registerTableSource(tableName);


kafka input is:  
{
   "data": [
      18140781,
   ],
   "createTime": 1572577137596
}
Exception as follows:
Caused by: java.io.IOException: Failed to deserialize JSON object.
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed
at index 0
	at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
	at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
	... 7 more

polarisary@gmail.com





Mime
View raw message