flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ddwcg <3149768...@qq.com>
Subject flink 1.9 消费kafka报错
Date Mon, 26 Aug 2019 06:56:43 GMT
大家好,
升级到1.9后有几个问题:
1.现在的线上版本是1.7.2,以前的代码的kafka consumer是使用的FlinkKafkaConsumer011

val consumer = new FlinkKafkaConsumer011[String](kafkaTopic, new SimpleStringSchema, properties)
但是现在这个类已经找不到了

2.所以我使用了 FlinkKafkaConsumer
val consumer = new FlinkKafkaConsumer[String](kafkaTopic, new SimpleStringSchema(), properties)
不知道这个consumer背后对应的kafka版本是多少

3.使用FlinkKafkaConsumer后报错,而且必须要引入flink-table-api-java-bridge_${scala.binary.version}

     不然会提示找不到类:Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory

   

引入flink-table-api-java-bridge_${scala.binary.version}后还是报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate
the executor. Make sure a planner module is on the classpath
	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:329)
	at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:286)
	at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:366)
	at org.apache.flink.table.api.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:336)
	at com.test.StreamingJob$.main(StreamingJob.scala:52)
	at com.test.StreamingJob.main(StreamingJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable
table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.

The following properties are requested:
batch-mode=false

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory


我的pom文件如下:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-scala_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>

<!-- Scala Library, provided by Flink as well. -->
<dependency>
   <groupId>org.scala-lang</groupId>
   <artifactId>scala-library</artifactId>
   <version>${scala.version}</version>
   <scope>provided</scope>
</dependency>

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka_2.11</artifactId>
   <version>${flink.version}</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-common</artifactId>
   <version>${flink.version}</version>
</dependency>


<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>


谢谢大家
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message