flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hb <343122...@163.com>
Subject Re:Re: flink1.9 blink planner table ddl 使用问题
Date Mon, 26 Aug 2019 05:57:43 GMT
使用了你的ddl语句,还是报一样的错误.
我是在idea里面执行的,maven 配置的依赖.

在 2019-08-26 11:22:20,"Jark Wu" <imjark@gmail.com> 写道:
>Hi,
>
>初步看下来你的 DDL 中有这几部分定义的有问题。
>
>1. 缺少format properties
>2. 缺少 connector.version
>3. bootstrap.severs 的配置方式写的不对...
>
>
>你可以参考下面这个作为example:
>
>
>CREATE TABLE kafka_json_source (
>    rowtime TIMESTAMP,
>    user_name VARCHAR,
>    event ROW<message_type VARCHAR, message VARCHAR>
>) WITH (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = 'test-json',
>    'connector.startup-mode' = 'earliest-offset',
>    'connector.properties.0.key' = 'zookeeper.connect',
>    'connector.properties.0.value' = 'localhost:2181',
>    'connector.properties.1.key' = 'bootstrap.servers',
>    'connector.properties.1.value' = 'localhost:9092',
>    'update-mode' = 'append',
>    'format.type' = 'json',
>    'format.derive-schema' = 'true'
>);
>
>
>Kafka 中的数据长这个样子:
>
>{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING",
"message": "This is a warning."}}
>
>
>Best,
>Jark
>
>
>> 在 2019年8月26日,09:52,hb <343122422@163.com> 写道:
>> 
>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了
定义属性还是 需要实现TableSourceFactory,还是其他什么.
>> 
>> 
>> 提示:  
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation
failed. findAndCreateTableSource failed.
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> 
>> 
>> 
>> 代码:
>> ```
>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>> import org.apache.flink.types.Row
>> 
>> 
>> object KafkaInDDL extends App {
>>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>>  val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>> 
>> 
>>  val sourceDDL =
>>    """create table sourceTable(
>>                            id int,
>>                            name varchar
>>                          ) with (
>>                            'connector.type' = 'kafka',
>>                            'connector.property-version' = '1',
>>                            'update-mode' = 'append',
>>                            'bootstrap.servers' = '192.168.1.160:19092',
>>                            'connector.topic' = 'hbtest1',
>>                            'connector.startup-mode' = 'earliest-offset'
>>                          )
>>    """
>>  tEnv.sqlUpdate(sourceDDL)
>>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>  tEnv.execute("")
>> }
>> ```
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message