flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eyal Pe'er <eyal.p...@startapp.com>
Subject RE: timeout error while connecting to Kafka
Date Sun, 25 Aug 2019 11:20:02 GMT
Hi,
I removed that dependency, but it still fails.
The reason why I used Kafka 1.5.0 is because I followed a training which used it (https://www.baeldung.com/kafka-flink-data-pipeline).
If needed, I can change it.

I’m not sure, but maybe in order to consume events from Kafka 0.9 I need to connect zookeeper,
instead of the bootstrap servers ?
I know that in Spark streaming we consume via zookeeper ("zookeeper.connect").
I saw that in Apache Flink-Kafka connector zookeeper.connect  only required for Kafka 0.8,
but maybe I still need to use it ?
Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]

From: miki haiat <miko5054@gmail.com>
Sent: Thursday, August 22, 2019 2:29 PM
To: Eyal Pe'er <eyal.peer@startapp.com>
Cc: user@flink.apache.org
Subject: Re: timeout error while connecting to Kafka

Can you try to remove this from your pom file .
 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>


Is their any reason why you are using flink 1.5 and not latest release.


Best,

Miki

On Thu, Aug 22, 2019 at 2:19 PM Eyal Pe'er <eyal.peer@startapp.com<mailto:eyal.peer@startapp.com>>
wrote:
Hi Miki,
First, I would like to thank you for the fast response.
I recheck Kafka and it is up and running fine.
I’m still getting the same error (Timeout expired while fetching topic metadata).
Maybe my Flink version is wrong (Kafka version is 0.9)?

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>


Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]

From: miki haiat <miko5054@gmail.com<mailto:miko5054@gmail.com>>
Sent: Thursday, August 22, 2019 11:03 AM
To: Eyal Pe'er <eyal.peer@startapp.com<mailto:eyal.peer@startapp.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: timeout error while connecting to Kafka

Can you double check that the kafka instance is up ?
The code looks fine.


Best,

Miki

On Thu, Aug 22, 2019 at 10:45 AM Eyal Pe'er <eyal.peer@startapp.com<mailto:eyal.peer@startapp.com>>
wrote:
Hi,
I'm trying to consume events using Apache Flink.
The code is very basic, trying to connect the topic split words by space and print it to the
console. Kafka version is 0.9.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class KafkaStreaming {

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka servers:9092...");
    props.setProperty("group.id<http://group.id>", "flinkPOC");
    FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic",
new SimpleStringSchema(), props);

    DataStream<String> dataStream = env.addSource(consumer);

    DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
    wordDataStream.print();
    env.execute("Word Split");

}

public static class Splitter implements FlatMapFunction<String, String> {

    public void flatMap(String sentence, Collector<String> out) throws Exception {

        for (String word : sentence.split(" ")) {
            out.collect(word);
        }
    }

}
}

The app does not print anything to the screen (although I produced events to Kafka).
I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any kind
of authentication is not required from Kafka.
This is the error that I found in the logs:
2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Source: Custom Source -> Flat Map -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b)
switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

The Kafka’s topic has only one partition, so the topic metadata supposed to be very basic.
I ran Kafka and the Flink locally in order to eliminate network related issues, but the issue
persists. So my assumption is that I’m doing something wrong…
Did you encounter such issue? Does someone have different code for consuming Kafka events
?

Best regards
Eyal Peer / Data Platform Developer
[cid:image001.png@01D55B4F.C03EE8F0]

Mime
View raw message