flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [Table API/SQL] Finding missing spots to resolve why 'no watermark' is presented in Flink UI
Date Wed, 04 Jul 2018 11:17:41 GMT
Hi Jungtaek,

If it is "only" about the missing support to parse a string as timestamp,
you could also implement a custom TimestampExtractor that works similar to
the ExistingField extractor [1].
You would need to adjust a few things and use the expression
"Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
the String to a Long.
So far this works only if the date is formatted like "2018-05-28
12:34:56.000"

Regarding the side outputs, these would not be handled as results but just
redirect late records into separate data streams. We would offer a
configuration to write them to a sink like HDFS or Kafka.

Best, Fabian

[1] https://github.com/apache/flink/blob/master/flink-
libraries/flink-table/src/main/scala/org/apache/flink/
table/sources/tsextractors/ExistingField.scala

2018-07-04 11:54 GMT+02:00 Jungtaek Lim <kabhwan@gmail.com>:

> Thanks Chesnay! Great news to hear. I'll try out with latest master branch.
>
> Thanks Fabian for providing the docs!
>
> I guess I already tried out with KafkaJsonTableSource and failed back to
> custom TableSource since the type of rowtime field is string unfortunately,
> and I needed to parse and map to new SQL timestamp field in order to use it
> to rowtime attribute.
>
> I guess JSON -> table fields mapping is provided only for renaming, and
> "withRowtimeAttribute" doesn't help defining new field to use it as rowtime.
>
> Are there better approaches on this scenario? Or would we be better to
> assume the type of rowtime field is always timestamp?
>
> Btw, providing late-data side output in Table API might be just a matter
> of how to define it correctly (not a technical or syntactic issue), though
> providing in SQL might be tricky (as the semantic of SQL query is not for
> multiple outputs).
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fhueske@gmail.com>님이 작성:
>
>> Hi Jungtaek,
>>
>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>> timestamp & watemark generation [2].
>> It would be great if you could let us know, if that addresses your use
>> case and if not what's missing or not working.
>>
>> So far Table API / SQL does not have support for late-data side outputs.
>> However, that's on the road map. The idea is to filter streams during
>> ingestion for late events and passing them to a side output.
>> Currently, operators just drop late events.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>> [2] https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/dev/table/sourceSinks.html#configuring-a-rowtime-attribute
>>
>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <chesnay@apache.org>:
>>
>>> The watermark display in the UI is bugged in 1.5.0.
>>>
>>> It is fixed on master and the release-1.5 branch, and will be included
>>> in 1.5.1 that is slated to be released next week.
>>>
>>>
>>> On 04.07.2018 10:22, Jungtaek Lim wrote:
>>>
>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the app in
>>> IntelliJ, not tried from cluster.
>>>
>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <kabhwan@gmail.com>님이
작성:
>>>
>>>> Hi Flink users,
>>>>
>>>> I'm new to Flink and trying to evaluate couple of streaming frameworks
>>>> via implementing same apps.
>>>>
>>>> While implementing apps with both Table API and SQL, I found there's
>>>> 'no watermark' presented in Flink UI, whereas I had been struggling to
>>>> apply row time attribute.
>>>>
>>>> For example, below is one of TableSource implementation which wraps
>>>> DataStream reading from Kafka.
>>>>
>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>>>> datasource/TruckSpeedSource.scala
>>>>
>>>> (Actually I ended up implementing TableSource to address adding rowtime
>>>> attribute as well as reading and parsing JSON. I'd be really happy if
>>>> someone can guide a way to get rid of needed of custom implementation of
>>>> TableSource.)
>>>>
>>>> and below is one of app I implemented:
>>>>
>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>
>>>> Btw, I'm about to experiment side-output with late events, but is it
>>>> possible to leverage side-output with Table API / SQL? Looks like
>>>> DataStream exposes late events only when it's converted to
>>>> AllWindowedStream.
>>>>
>>>> Thanks in advance!
>>>>
>>>> Best Regards,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>
>>>
>>

Mime
View raw message