Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31052200D1B for ; Thu, 12 Oct 2017 16:43:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2F2A51609E8; Thu, 12 Oct 2017 14:43:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4E20C1609CD for ; Thu, 12 Oct 2017 16:43:55 +0200 (CEST) Received: (qmail 63066 invoked by uid 500); 12 Oct 2017 14:43:54 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 63055 invoked by uid 99); 12 Oct 2017 14:43:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Oct 2017 14:43:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 53FD6183250 for ; Thu, 12 Oct 2017 14:43:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.792 X-Spam-Level: * X-Spam-Status: No, score=1.792 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URI_HEX=1.313] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=data-artisans-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id D22hg4cqGVYX for ; Thu, 12 Oct 2017 14:43:49 +0000 (UTC) Received: from mail-wm0-f51.google.com (mail-wm0-f51.google.com [74.125.82.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 8732F5FE4F for ; Thu, 12 Oct 2017 14:43:49 +0000 (UTC) Received: by mail-wm0-f51.google.com with SMTP id b189so13754221wmd.4 for ; Thu, 12 Oct 2017 07:43:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=data-artisans-com.20150623.gappssmtp.com; s=20150623; h=mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=VjsAz4oUWNjSI6vCDw46TSxXob5nVnYmvdqDNO1WRGk=; b=muCik7yAMdDNUQyF+SfIqZyqXWchWo3KYzSViU6+2qL0z5kpVAETDjbz7QjgGomA8s zNHmTuK7bfpxYIAMveLHhf6gJie57ym1JudsAxfJUMFYiHkuZf0KX8d/xtWw9aZroMiT bB2niFCX6geO0KDakAr2DLZ92HHUi0OeBEalA0S+Nc4h4A7j9Ai0Pr5Xhwyw7cp/lOay XmqNWoG8aVJOwBeZJ6lyyMJCpv5j2AcZVp1NSMao2Al5byam3EVNBJbug9t+fjiNle9o QgxqdJdF+jgFLJhBfJH/iBOTgnPM5CkQvzMj1lzFmRePxhwqESG0GPjthk0cf/JigQrf EZCw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=VjsAz4oUWNjSI6vCDw46TSxXob5nVnYmvdqDNO1WRGk=; b=AVIIWdQwypAgcrF8WItvy3BbpB/dwV18KZocVt8cno2TE4zFM+kCjRUesWXfSyD/7g dxiWPftc4fDfEVHGT35t8GmOnu9CkHe2LgduCbWRFkeXDuSRBM1s6NU4O5ey/ziqHT/8 MtyYUVMdoeeSF5iBlcgsyFWyfONAFK2YRJtLmKbFsgxiiJI4L/UrGblvArIWvJX/MdjW yUfulu50N/v42fjdJNmwB0+Tz0CLGgioDXj5jPsb3AgQ+95+fSQpegljAWQ8U7ZCSqh6 OUl8AGA/WMYgxWk/c2EFhXhoW/ZYtJnZ5LBCNob5lOVx/l1S5yCFwWAQdk9sd2hoSvxd 4Vbw== X-Gm-Message-State: AMCzsaVKLrviAdgwVClj0cJhRGLzznPuRPbJKnC7sXypbyWbxYXgtsup o5KUSNQwZGsEMlJQCo8CcttVUw== X-Google-Smtp-Source: AOwi7QDUDnOKcDihhr7XzHVMvU7Vj282TInpZMSGEOpwGHeEsiPuFWqKzdPSimbcoB1qGAkk0cs2Eg== X-Received: by 10.28.60.68 with SMTP id j65mr1952607wma.16.1507819428463; Thu, 12 Oct 2017 07:43:48 -0700 (PDT) Received: from piotrs-mbp.fritz.box (ip-2-205-81-99.web.vodafone.de. [2.205.81.99]) by smtp.gmail.com with ESMTPSA id 188sm405955wmg.45.2017.10.12.07.43.46 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 12 Oct 2017 07:43:47 -0700 (PDT) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: Beam Application run on cluster setup (Kafka+Flink) From: Piotr Nowojski In-Reply-To: <1507816161074-0.post@n4.nabble.com> Date: Thu, 12 Oct 2017 16:43:44 +0200 Cc: user@flink.apache.org Content-Transfer-Encoding: quoted-printable Message-Id: <221CCD52-8850-41EA-B772-98ADFE616342@data-artisans.com> References: <1507816161074-0.post@n4.nabble.com> To: Shankara X-Mailer: Apple Mail (2.3273) archived-at: Thu, 12 Oct 2017 14:43:56 -0000 Hi, What do you mean by: > With standalone beam application kafka can receive the message, But in cluster setup it is not working. In your example you are reading the data from Kafka and printing them to = console. There doesn=E2=80=99t seems to be anything that writes back to = Kafka, so what do you mean by =E2=80=9CKafka can not receive the = message=E2=80=9D? Did you check the output file of your application in the log directory? = Did you check Flink logs if there are any errors? Piotrek > On 12 Oct 2017, at 15:49, Shankara wrote: >=20 > Below is my setup=20 > 1. Kafka zookeeper and server in one machine (192.168.1.116) = and > producer (192.168.1.100) and consumer (192.168.1.117) in another = machine. =20 > --> This work fine no issue=20 > 2. Running standalone beam application with kafka consumer --> = This > work fine > 3. Running beam application in flink cluster with kafka = consumer --> > This is not working > Not receiving message from kafka producer. >=20 > Same program works fine with standalone with flink runner. > Below is my code snippet. >=20 > public static void main(String[] args) { > Pipeline p =3D initializePipeline(args); > Map> intelliOmIms =3D new TreeMap<>(); >=20 > PTransform>> reader; > reader =3D KafkaIO.read() > .withBootstrapServers("192.168.1.116:9092") --->Kafka > zookeeper and server running > .withTopic("kafkatest") > .withKeyDeserializer(IntegerDeserializer.class) > .withValueDeserializer(IntelliOmImsKpiDataUtil.class) > .withoutMetadata(); >=20 > PCollection> output =3D p.apply(reader); > output.apply(ParDo.of(new PrintMsg())); >=20 > p.run().waitUntilFinish(); > } >=20 > In IntelliOmImsKpiDataUtil deserializer I am just printing message = saying > that kafka is received the message. >=20 > public static class PrintMsg extends DoFn, = Void> { >=20 > @ProcessElement > public void processElement(ProcessContext c) { > System.out.println("Received Message .... from kafkatest Topic = "); > } > } >=20 > Started Zookeeper in 192.168.1.116 like below : > bin/zookeeper-server-start.sh config/zookeeper.properties >=20 > Started Server in 192.168.1.116 like below : > bin/kafka-server-start.sh config/server.properties >=20 > Started Producer in 192.168.1.100 like below : > bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 = --topic > kafkatest >=20 > Started Consumer in 192.168.1.117 like below : > bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 = --topic > kafkatest --from-beginning >=20 > With standalone beam application kafka can receive the message, But = in > cluster setup it is not working. >=20 > Can you please help me to check it.=20 >=20 >=20 >=20 >=20 > -- > Sent from: = http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/