flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hb <343122...@163.com>
Subject Re:Re:回复: Re: flink1.9 blink planner table ddl 使用问题
Date Mon, 26 Aug 2019 07:33:48 GMT
感谢,解决了, 指定 'connector.version' = '0.11' 就可以了.
Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.

在 2019-08-26 14:26:15,"hb" <343122422@163.com> 写道:
>kafka版本是 kafka_2.11-1.1.0,
>支持的kafka版本有哪些
>在 2019-08-26 14:23:19,"pengchenglin@bonc.com.cn" <pengchenglin@bonc.com.cn>
写道:
>>检查一下代码的kafka版本,可能是这方面的错误
>>
>>
>>
>>pengchenglin@bonc.com.cn
>> 
>>发件人: hb
>>发送时间: 2019-08-26 15:14
>>收件人: user-zh
>>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>>之前少了 flink-connector-kafka_2.11 依赖,
>>现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>>了
>> 
>> 
>>pom依赖:
>>```
>>    <dependencies>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-core</artifactId>
>>            <version>${flink.version}</version>
>> 
>> 
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-clients_2.11</artifactId>
>>            <version>${flink.version}</version>
>> 
>> 
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>> 
>> 
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-streaming-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>> 
>> 
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table</artifactId>
>>            <version>1.9.0</version>
>>            <type>pom</type>
>>            <scope>provided</scope>
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-common</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-cep-scala_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-filesystem_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
>>            <version>${flink.version}</version>
>><!--            <scope>provided</scope>-->
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-planner_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-runtime-blink_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-table-planner-blink_2.11</artifactId>
>>            <version>${flink.version}</version>
>>            <!--            <scope>provided</scope>-->
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>> 
>> 
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-connector-kafka_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-json</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>        <dependency>
>>            <groupId>org.apache.flink</groupId>
>>            <artifactId>flink-runtime-web_2.11</artifactId>
>>            <version>${flink.version}</version>
>>        </dependency>
>>    </dependencies>
>> 
>> 
>>```
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>在 2019-08-26 13:37:51,"Jark Wu" <imjark@gmail.com> 写道:
>>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>>
>>>Best,
>>>Jark
>>>
>>>> 在 2019年8月26日,13:57,hb <343122422@163.com> 写道:
>>>> 
>>>> 使用了你的ddl语句,还是报一样的错误.
>>>> 我是在idea里面执行的,maven 配置的依赖.
>>>> 
>>>> 在 2019-08-26 11:22:20,"Jark Wu" <imjark@gmail.com> 写道:
>>>>> Hi,
>>>>> 
>>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>> 
>>>>> 1. 缺少format properties
>>>>> 2. 缺少 connector.version
>>>>> 3. bootstrap.severs 的配置方式写的不对...
>>>>> 
>>>>> 
>>>>> 你可以参考下面这个作为example:
>>>>> 
>>>>> 
>>>>> CREATE TABLE kafka_json_source (
>>>>>   rowtime TIMESTAMP,
>>>>>   user_name VARCHAR,
>>>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>>>> ) WITH (
>>>>>   'connector.type' = 'kafka',
>>>>>   'connector.version' = 'universal',
>>>>>   'connector.topic' = 'test-json',
>>>>>   'connector.startup-mode' = 'earliest-offset',
>>>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>>>   'connector.properties.0.value' = 'localhost:2181',
>>>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>>>   'connector.properties.1.value' = 'localhost:9092',
>>>>>   'update-mode' = 'append',
>>>>>   'format.type' = 'json',
>>>>>   'format.derive-schema' = 'true'
>>>>> );
>>>>> 
>>>>> 
>>>>> Kafka 中的数据长这个样子:
>>>>> 
>>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": {
"message_type": "WARNING", "message": "This is a warning."}}
>>>>> 
>>>>> 
>>>>> Best,
>>>>> Jark
>>>>> 
>>>>> 
>>>>>> 在 2019年8月26日,09:52,hb <343122422@163.com> 写道:
>>>>>> 
>>>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了
定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>>>> 
>>>>>> 
>>>>>> 提示:  
>>>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. findAndCreateTableSource failed.
>>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
in
>>>>>> the classpath.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 代码:
>>>>>> ```
>>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment,
_}
>>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment,
_}
>>>>>> import org.apache.flink.types.Row
>>>>>> 
>>>>>> 
>>>>>> object KafkaInDDL extends App {
>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
settings)
>>>>>> 
>>>>>> 
>>>>>> val sourceDDL =
>>>>>>   """create table sourceTable(
>>>>>>                           id int,
>>>>>>                           name varchar
>>>>>>                         ) with (
>>>>>>                           'connector.type' = 'kafka',
>>>>>>                           'connector.property-version' = '1',
>>>>>>                           'update-mode' = 'append',
>>>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>>>                           'connector.topic' = 'hbtest1',
>>>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>>>                         )
>>>>>>   """
>>>>>> tEnv.sqlUpdate(sourceDDL)
>>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>>>> tEnv.execute("")
>>>>>> }
>>>>>> ```
>>>>> 
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message