flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jark Wu <imj...@gmail.com>
Subject Re: flink1.9 blink planner table ddl 使用问题
Date Mon, 26 Aug 2019 06:37:51 GMT
Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11

Best,
Jark

> 在 2019年8月26日,13:57,hb <343122422@163.com> 写道:
> 
> 使用了你的ddl语句,还是报一样的错误.
> 我是在idea里面执行的,maven 配置的依赖.
> 
> 在 2019-08-26 11:22:20,"Jark Wu" <imjark@gmail.com> 写道:
>> Hi,
>> 
>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>> 
>> 1. 缺少format properties
>> 2. 缺少 connector.version
>> 3. bootstrap.severs 的配置方式写的不对...
>> 
>> 
>> 你可以参考下面这个作为example:
>> 
>> 
>> CREATE TABLE kafka_json_source (
>>   rowtime TIMESTAMP,
>>   user_name VARCHAR,
>>   event ROW<message_type VARCHAR, message VARCHAR>
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   'connector.version' = 'universal',
>>   'connector.topic' = 'test-json',
>>   'connector.startup-mode' = 'earliest-offset',
>>   'connector.properties.0.key' = 'zookeeper.connect',
>>   'connector.properties.0.value' = 'localhost:2181',
>>   'connector.properties.1.key' = 'bootstrap.servers',
>>   'connector.properties.1.value' = 'localhost:9092',
>>   'update-mode' = 'append',
>>   'format.type' = 'json',
>>   'format.derive-schema' = 'true'
>> );
>> 
>> 
>> Kafka 中的数据长这个样子:
>> 
>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type":
"WARNING", "message": "This is a warning."}}
>> 
>> 
>> Best,
>> Jark
>> 
>> 
>>> 在 2019年8月26日,09:52,hb <343122422@163.com> 写道:
>>> 
>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了
定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>> 
>>> 
>>> 提示:  
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL
validation failed. findAndCreateTableSource failed.
>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
in
>>> the classpath.
>>> 
>>> 
>>> 
>>> 
>>> 代码:
>>> ```
>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>> import org.apache.flink.types.Row
>>> 
>>> 
>>> object KafkaInDDL extends App {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>> 
>>> 
>>> val sourceDDL =
>>>   """create table sourceTable(
>>>                           id int,
>>>                           name varchar
>>>                         ) with (
>>>                           'connector.type' = 'kafka',
>>>                           'connector.property-version' = '1',
>>>                           'update-mode' = 'append',
>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>                           'connector.topic' = 'hbtest1',
>>>                           'connector.startup-mode' = 'earliest-offset'
>>>                         )
>>>   """
>>> tEnv.sqlUpdate(sourceDDL)
>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>> tEnv.execute("")
>>> }
>>> ```
>> 


Mime
View raw message