flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ddwcg <3149768...@qq.com>
Subject Re: flink 1.9 消费kafka报错
Date Mon, 26 Aug 2019 09:56:40 GMT
都加了,还是不行,下面是我的pom文件和 libraires的截图

<repositories>
   <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
         <enabled>false</enabled>
      </releases>
      <snapshots>
         <enabled>true</enabled>
      </snapshots>
   </repository>
</repositories>

<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.9.0</flink.version>
   <scala.binary.version>2.11</scala.binary.version>
   <scala.version>2.11.8</scala.version>
</properties>

<dependencies>
   <!-- Apache Flink dependencies -->
   <!-- These dependencies are provided, because they should not be packaged into the JAR
file. -->
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <!-- Scala Library, provided by Flink as well. -->
   <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>compile</scope>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
   </dependency>



   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>

   <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>runtime</scope>
   </dependency>
   <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>runtime</scope>
   </dependency>
</dependencies>


> 在 2019年8月26日,17:39,Jark Wu <imjark@gmail.com> 写道:
> 
> pom.xml依赖中有flink-table-planner-blink_2.11的依赖么? 确认下版本号。如果要再
IDE 中运行的话,确保没有加 <scope>provided</scope>. 
> 
> 
>> 在 2019年8月26日,17:25,ddwcg <3149768603@qq.com> 写道:
>> 
>> 
>> hi,我指定了使用blinkplanner,还是报一样的错
>> object StreamingJob {
>> def main(args: Array[String]) {
>> 
>>   val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>   val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>   val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>>   //val bsTableEnv2 = TableEnvironment.create(bsSettings)
>>   bsEnv.execute("jobname")
>> }
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate
the executor. Make sure a planner module is on the classpath
>> 	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>> 	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>> 	at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>> 	at com.test.StreamingJob$.main(StreamingJob.scala:13)
>> 	at com.test.StreamingJob.main(StreamingJob.scala)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
>> the classpath.
>> 
>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>> 
>> The following properties are requested:
>> batch-mode=false
>> class-name=org.apache.flink.table.executor.BlinkExecutorFactory
>> 
>> The following factories have been considered:
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>> 
>> 
>> 
>>> 在 2019年8月26日,15:07,Jark Wu <imjark@gmail.com> 写道:
>>> 
>>> Hi,
>>> 
>>> 你这个错误不是 kafka 的问题。 是 planner 使用的问题,如果要使用
blink planner,需要用 EnvironmentSetting 声明 blink planner。
>>> 详细请见: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment>
>>> 
>>> 
>>> Best,
>>> Jark
>>> 
>>>> 在 2019年8月26日,14:56,ddwcg <3149768603@qq.com> 写道:
>>>> 
>>>> 大家好,
>>>> 升级到1.9后有几个问题:
>>>> 1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011
>>>> 
>>>> val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema,
properties)
>>>> 但是现在这个类已经找不到了
>>>> 
>>>> 2.所以我使用了 FlinkKafkaConsumer
>>>> val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(),
properties)
>>>> 不知道这个consumer背后对应的kafka版本是多少
>>>> 
>>>> 3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}
>>>> 
>>>>  不然会提示找不到类:Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.factories.StreamTableSourceFactory
>>>> 
>>>> 
>>>> 
>>>> 引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
>>>> Exception in thread "main" org.apache.flink.table.api.TableException: Could
not instantiate the executor. Make sure a planner module is on the classpath
>>>> 	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
>>>> 	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
>>>> 	at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
>>>> 	at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
>>>> 	at com.test.StreamingJob$.main(StreamingJob.scala:52)
>>>> 	at com.test.StreamingJob.main(StreamingJob.scala)
>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> 	at java.lang.reflect.Method.invoke(Method.java:498)
>>>> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory'
in
>>>> the classpath.
>>>> 
>>>> Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
>>>> 
>>>> The following properties are requested:
>>>> batch-mode=false
>>>> 
>>>> The following factories have been considered:
>>>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
>>>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>>>> 
>>>> 
>>>> 我的pom文件如下:
>>>> 
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-scala_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> <scope>provided</scope>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> <scope>provided</scope>
>>>> </dependency>
>>>> 
>>>> <!-- Scala Library, provided by Flink as well. -->
>>>> <dependency>
>>>> <groupId>org.scala-lang</groupId>
>>>> <artifactId>scala-library</artifactId>
>>>> <version>${scala.version}</version>
>>>> <scope>provided</scope>
>>>> </dependency>
>>>> 
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-connector-kafka_2.11</artifactId>
>>>> <version>${flink.version}</version>
>>>> <scope>compile</scope>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-common</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> 
>>>> 
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.flink</groupId>
>>>> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>> <version>${flink.version}</version>
>>>> </dependency>
>>>> 
>>>> 
>>>> 谢谢大家
>>> 
>> 
> 
> 


Mime
View raw message