flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Kafka integration error
Date Mon, 14 Mar 2016 10:47:00 GMT
Hi Stefanos,

this looks like an issue with Kafka. Which version does your broker have?
Can you check the logs of the broker you are trying to connect to?

On Fri, Mar 11, 2016 at 5:27 PM, Stefanos Antaris <
antaris.stefanos@gmail.com> wrote:

> Hi to all,
>
> i am trying to make Flink to work with Kafka but i always have the
> following exception. It works perfect on my laptop but when i try to use it
> on the cluster it always fails.
>
> java.lang.Exception
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> 	at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
> 	at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
> 	at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)
>
>
>
> Here is my pom.xml if it helps with the error
>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>1.0.0</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
> <version>1.0.0</version>
> </dependency>
>
>
> </dependencies>
>
> Best regards,
> Stefanos
>

Mime
View raw message