flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "pengchenglin@bonc.com.cn" <pengcheng...@bonc.com.cn>
Subject 回复: Re: flink1.9 blink planner table ddl 使用问题
Date Mon, 26 Aug 2019 07:23:19 GMT
检查一下代码的kafka版本,可能是这方面的错误



pengchenglin@bonc.com.cn
 
发件人: hb
发送时间: 2019-08-26 15:14
收件人: user-zh
主题: Re:Re: flink1.9 blink planner table ddl 使用问题
之前少了 flink-connector-kafka_2.11 依赖,
现在错误变成  Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
了
 
 
pom依赖:
```
    <dependencies>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
 
 
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table</artifactId>
            <version>1.9.0</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
 
 
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
 
 
```
 
 
 
 
 
 
 
 
在 2019-08-26 13:37:51,"Jark Wu" <imjark@gmail.com> 写道:
>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>
>Best,
>Jark
>
>> 在 2019年8月26日,13:57,hb <343122422@163.com> 写道:
>> 
>> 使用了你的ddl语句,还是报一样的错误.
>> 我是在idea里面执行的,maven 配置的依赖.
>> 
>> 在 2019-08-26 11:22:20,"Jark Wu" <imjark@gmail.com> 写道:
>>> Hi,
>>> 
>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>> 
>>> 1. 缺少format properties
>>> 2. 缺少 connector.version
>>> 3. bootstrap.severs 的配置方式写的不对...
>>> 
>>> 
>>> 你可以参考下面这个作为example:
>>> 
>>> 
>>> CREATE TABLE kafka_json_source (
>>>   rowtime TIMESTAMP,
>>>   user_name VARCHAR,
>>>   event ROW<message_type VARCHAR, message VARCHAR>
>>> ) WITH (
>>>   'connector.type' = 'kafka',
>>>   'connector.version' = 'universal',
>>>   'connector.topic' = 'test-json',
>>>   'connector.startup-mode' = 'earliest-offset',
>>>   'connector.properties.0.key' = 'zookeeper.connect',
>>>   'connector.properties.0.value' = 'localhost:2181',
>>>   'connector.properties.1.key' = 'bootstrap.servers',
>>>   'connector.properties.1.value' = 'localhost:9092',
>>>   'update-mode' = 'append',
>>>   'format.type' = 'json',
>>>   'format.derive-schema' = 'true'
>>> );
>>> 
>>> 
>>> Kafka 中的数据长这个样子:
>>> 
>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type":
"WARNING", "message": "This is a warning."}}
>>> 
>>> 
>>> Best,
>>> Jark
>>> 
>>> 
>>>> 在 2019年8月26日,09:52,hb <343122422@163.com> 写道:
>>>> 
>>>> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了
定义属性还是 需要实现TableSourceFactory,还是其他什么.
>>>> 
>>>> 
>>>> 提示:  
>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. findAndCreateTableSource failed.
>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
in
>>>> the classpath.
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 代码:
>>>> ```
>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment,
_}
>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>> import org.apache.flink.types.Row
>>>> 
>>>> 
>>>> object KafkaInDDL extends App {
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
>>>> 
>>>> 
>>>> val sourceDDL =
>>>>   """create table sourceTable(
>>>>                           id int,
>>>>                           name varchar
>>>>                         ) with (
>>>>                           'connector.type' = 'kafka',
>>>>                           'connector.property-version' = '1',
>>>>                           'update-mode' = 'append',
>>>>                           'bootstrap.servers' = '192.168.1.160:19092',
>>>>                           'connector.topic' = 'hbtest1',
>>>>                           'connector.startup-mode' = 'earliest-offset'
>>>>                         )
>>>>   """
>>>> tEnv.sqlUpdate(sourceDDL)
>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>> tEnv.execute("")
>>>> }
>>>> ```
>>> 
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message