From user-return-29372-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Aug 22 10:01:33 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id E8190180644 for ; Thu, 22 Aug 2019 12:01:31 +0200 (CEST) Received: (qmail 56579 invoked by uid 500); 22 Aug 2019 10:01:30 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 56569 invoked by uid 99); 22 Aug 2019 10:01:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Aug 2019 10:01:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 02C7BC0AEC for ; Thu, 22 Aug 2019 10:01:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.001 X-Spam-Level: ** X-Spam-Status: No, score=2.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URIBL_CSS=0.1, URIBL_CSS_A=0.1] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=126.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id o1sQZ1nV48Hp for ; Thu, 22 Aug 2019 10:01:27 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=123.126.96.3; helo=mail-m963.mail.126.com; envelope-from=miraisenshi@126.com; receiver= Received: from mail-m963.mail.126.com (mail-m963.mail.126.com [123.126.96.3]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTP id D6263BDEA4 for ; Thu, 22 Aug 2019 10:01:25 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=126.com; s=s110527; h=Mime-Version:Subject:From:Date:Message-Id; bh=TG2VO ztDJwzD1jNkX8txYGxY9RW81Onx9T/lXsor0PY=; b=nGORZvViM8CgH8BY+gScD z98coSmOjvkKnXTnRHYc5E+GlefMvPFQk27tBmEjSyQy9seYRitu768LZvEbm87Q rq/TctRAcMFMKwc0uPEPLFwpsp06iPa8EpzcjlRVo+uAothtDogZMn3y/2OVANCi 2X22Km7akbRqBFRapOKlX0= Received: from [192.168.40.96] (unknown [114.113.120.18]) by smtp8 (Coremail) with SMTP id NORpCgBHfqvmZ15djMTOKA--.164S3; Thu, 22 Aug 2019 18:01:20 +0800 (CST) Content-Type: multipart/alternative; boundary="Apple-Mail=_110597D9-B8D4-4F48-B23A-7D8F3BD85830" Mime-Version: 1.0 (Mac OS X Mail 12.4 \(3445.104.11\)) Subject: Re: timeout error while connecting to Kafka From: Qi Kang In-Reply-To: Date: Thu, 22 Aug 2019 18:01:09 +0800 Cc: "user@flink.apache.org" Message-Id: <52ED6948-3754-4095-9214-C71427658936@126.com> References: To: Eyal Pe'er X-Mailer: Apple Mail (2.3445.104.11) X-CM-TRANSID:NORpCgBHfqvmZ15djMTOKA--.164S3 X-Coremail-Antispam: 1Uf129KBjvJXoWxGw1DGFyUur13XrW8uF48Zwb_yoW5WF45pr 4qvr47ZFn3C3Wjqwsav3yru39xA3WrX347Kr15uwnFvr9xWFyF9F40kFZ8AFW8Jrn8ZFyq vw4Yv3yrAr15CrJanT9S1TB71UUUUUUqnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x07jh3kZUUUUU= X-Originating-IP: [114.113.120.18] X-CM-SenderInfo: 5plutxpvhq2xbl6rjloofrz/1tbikxEZBVpD9VpungAAsY --Apple-Mail=_110597D9-B8D4-4F48-B23A-7D8F3BD85830 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 The code itself is fine. Turning the app=E2=80=99s log level to DEBUG = will give you more information. BTW, please make sure that the addresses of Kafka brokers are properly = resolved. > On Aug 22, 2019, at 15:45, Eyal Pe'er wrote: >=20 > Hi, >=20 > I'm trying to consume events using Apache Flink. >=20 > The code is very basic, trying to connect the topic split words by = space and print it to the console. Kafka version is 0.9. >=20 > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > =20 > import org.apache.flink.streaming.api.datastream.DataStream; > import = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import = org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > import org.apache.flink.util.Collector; > import java.util.Properties; > =20 > public class KafkaStreaming { > =20 > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env =3D = StreamExecutionEnvironment.getExecutionEnvironment(); > =20 > Properties props =3D new Properties(); > props.setProperty("bootstrap.servers", "kafka servers:9092..."); > props.setProperty("group.id ", "flinkPOC"); > FlinkKafkaConsumer09 consumer =3D new = FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props); > =20 > DataStream dataStream =3D env.addSource(consumer); > =20 > DataStream wordDataStream =3D dataStream.flatMap(new = Splitter()); > wordDataStream.print(); > env.execute("Word Split"); > =20 > } > =20 > public static class Splitter implements FlatMapFunction { > =20 > public void flatMap(String sentence, Collector out) throws = Exception { > =20 > for (String word : sentence.split(" ")) { > out.collect(word); > } > } > =20 > } > } > =20 >=20 > The app does not print anything to the screen (although I produced = events to Kafka). >=20 > I tried to skip the Splitter FlatMap function, but still nothing = happens. SSL or any kind of authentication is not required from Kafka. >=20 > This is the error that I found in the logs: >=20 > 2019-08-20 14:36:17,654 INFO = org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: = Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) = (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED. > org.apache.kafka.common.errors.TimeoutException: Timeout expired while = fetching topic metadata > =20 >=20 > The Kafka=E2=80=99s topic has only one partition, so the topic = metadata supposed to be very basic. >=20 > I ran Kafka and the Flink locally in order to eliminate network = related issues, but the issue persists. So my assumption is that I=E2=80=99= m doing something wrong=E2=80=A6 >=20 > Did you encounter such issue? Does someone have different code for = consuming Kafka events ? > =20 > Best regards > Eyal Peer / Data Platform Developer > --Apple-Mail=_110597D9-B8D4-4F48-B23A-7D8F3BD85830 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 The = code itself is fine. Turning the app=E2=80=99s log level to DEBUG will = give you more information.

BTW, please make sure that the addresses of Kafka brokers are = properly resolved.


On Aug 22, 2019, at 15:45, Eyal Pe'er <eyal.peer@startapp.com> wrote:

Hi,

I'm = trying to consume events using Apache Flink.

The code is very basic, trying to connect = the topic split words by space and print it to the console. Kafka = version is 0.9.

import org.apache.flink.api.common.f= unctions.FlatMapFunction;
import org.apache.flink.api.common.s= erialization.SimpleStringSchema;
 
import org.apache.flink.streaming.ap= i.datastream.DataStream;
import org.apache.flink.streaming.ap= i.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.co= nnectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.util.= Collector;
import java.util.Properties;
 
public class KafkaStreaming {
 
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env =3D StreamExecutionEnvironment.getExecutionEnvironment();
 
    Properties props =3D new Properties();
    = props.setProperty("bootstrap.servers", "kafka = servers:9092...");
    = props.setProperty("group.id", "flinkPOC");
    FlinkKafkaConsumer09<String> consumer =3D new FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);
 
    DataStream<String> dataStream =3D = env.addSource(consumer);
 
    DataStream<String> wordDataStream =3D = dataStream.flatMap(new Splitter());
    = wordDataStream.print();
    env.execute("Word Split");
 
}
 
public static class Splitter implements FlatMapFunction<String, String> {
 
    public void flatMap(String sentence, Collector<String> out) throws Exception {
 
        for (String word : = sentence.split(" = ")) {
          &nb= sp; out.collect(word);
        }
    }
 
}
}

 

The app does not print anything to the = screen (although I produced events to Kafka).

I tried to skip the Splitter FlatMap = function, but still nothing happens. SSL or any kind of authentication = is not required from Kafka.

This = is the error that I found in the logs:

2019-08-20 14:36:17,654 INFO  = org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING = to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic = metadata

 

The Kafka=E2=80=99s topic has only one = partition, so the topic metadata supposed to be very basic.

I ran Kafka and the Flink locally in order = to eliminate network related issues, but the issue persists. So my = assumption is that I=E2=80=99m doing something wrong=E2=80=A6

Did you = encounter such issue? Does someone have different code for consuming = Kafka events ?
 
Best regards
Eyal Peer / Data = Platform Developer
<image001.png>

= --Apple-Mail=_110597D9-B8D4-4F48-B23A-7D8F3BD85830--