flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Qi Kang <miraisen...@126.com>
Subject Re: timeout error while connecting to Kafka
Date Thu, 22 Aug 2019 10:01:09 GMT
The code itself is fine. Turning the app’s log level to DEBUG will give you more information.

BTW, please make sure that the addresses of Kafka brokers are properly resolved.


> On Aug 22, 2019, at 15:45, Eyal Pe'er <eyal.peer@startapp.com> wrote:
> 
> Hi,
> 
> I'm trying to consume events using Apache Flink.
> 
> The code is very basic, trying to connect the topic split words by space and print it
to the console. Kafka version is 0.9.
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>  
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.util.Collector;
> import java.util.Properties;
>  
> public class KafkaStreaming {
>  
> public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>  
>     Properties props = new Properties();
>     props.setProperty("bootstrap.servers", "kafka servers:9092...");
>     props.setProperty("group.id <http://group.id/>", "flinkPOC");
>     FlinkKafkaConsumer09<String> consumer = new FlinkKafkaConsumer09<>("topic",
new SimpleStringSchema(), props);
>  
>     DataStream<String> dataStream = env.addSource(consumer);
>  
>     DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
>     wordDataStream.print();
>     env.execute("Word Split");
>  
> }
>  
> public static class Splitter implements FlatMapFunction<String, String> {
>  
>     public void flatMap(String sentence, Collector<String> out) throws Exception
{
>  
>         for (String word : sentence.split(" ")) {
>             out.collect(word);
>         }
>     }
>  
> }
> }
>  
> 
> The app does not print anything to the screen (although I produced events to Kafka).
> 
> I tried to skip the Splitter FlatMap function, but still nothing happens. SSL or any
kind of authentication is not required from Kafka.
> 
> This is the error that I found in the logs:
> 
> 2019-08-20 14:36:17,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Source: Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b)
switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic
metadata
>  
> 
> The Kafka’s topic has only one partition, so the topic metadata supposed to be very
basic.
> 
> I ran Kafka and the Flink locally in order to eliminate network related issues, but the
issue persists. So my assumption is that I’m doing something wrong…
> 
> Did you encounter such issue? Does someone have different code for consuming Kafka events
?
>  
> Best regards
> Eyal Peer / Data Platform Developer
> <image001.png>


Mime
View raw message