Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 65020200D0A for ; Wed, 4 Oct 2017 12:36:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6351B1609E2; Wed, 4 Oct 2017 10:36:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A947A1609D6 for ; Wed, 4 Oct 2017 12:36:50 +0200 (CEST) Received: (qmail 682 invoked by uid 500); 4 Oct 2017 10:36:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 672 invoked by uid 99); 4 Oct 2017 10:36:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Oct 2017 10:36:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C042C182731 for ; Wed, 4 Oct 2017 10:36:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.822 X-Spam-Level: X-Spam-Status: No, score=-0.822 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=abv.bg Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id FoALekupFz0a for ; Wed, 4 Oct 2017 10:36:47 +0000 (UTC) Received: from smtp-out.abv.bg (smtp-out.abv.bg [194.153.145.70]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BC5615FB57 for ; Wed, 4 Oct 2017 10:36:46 +0000 (UTC) Received: from nm71.abv.bg (nm71.ni.bg [192.168.151.57]) by smtp-out.abv.bg (Postfix) with ESMTP id DC23F182FF9C; Wed, 4 Oct 2017 13:36:38 +0300 (EEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=abv.bg; s=smtp-out; t=1507113398; bh=qYpFqOXSKeh5Hc0THNCTiN8QZokQEKVAi+xLRpKmaGI=; h=Date:From:To:Cc:In-Reply-To:References:Subject:From; b=xt3+X5VXZMRejb2v6Fho+wMX5j5hFZ1pFsBGVUSv/MBmDr/jN8TaYnCkCw+k6qPFr CkFXIbOyht3cCwQSurzkmrxseXHI5S4mQD0OrleaCOmAEuLEeNSUyLtRluwajgvoPL +wC3ntYIune0olpKVWoFFsQBqD0WMISuz0x54Xq4= Received: from nm71.abv.bg (localhost [127.0.0.1]) by nm71.abv.bg (Postfix) with ESMTP id CC0F41FBD3; Wed, 4 Oct 2017 13:36:38 +0300 (EEST) Date: Wed, 4 Oct 2017 13:36:38 +0300 (EEST) From: "r. r." To: Carst Tankink Cc: "user@flink.apache.org" Message-ID: <668390891.1400715.1507113398834.JavaMail.apache@nm71.abv.bg> In-Reply-To: <83A3989C-F68A-4AD2-9677-E3CC4A27E627@bol.com> References: <1887505889.1209898.1506962787711.JavaMail.apache@nm73.abv.bg> <981577EAC00F8245AB0ABE8CAA3EBC4812B0A9CD@exlnmb46.eur.nsroot.net> <83A3989C-F68A-4AD2-9677-E3CC4A27E627@bol.com> Subject: Re: kafka consumer parallelism MIME-Version: 1.0 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: 8bit X-Mailer: AbvMail 3.0 X-Originating-IP: 46.252.142.235 archived-at: Wed, 04 Oct 2017 10:36:51 -0000 Thanks Timo & Tovi - this helped me get a better idea how it works @Carst, I have rebalance after the map() (messageStream.map(...).rebalance()) - doesn't it mean the load will be redistributed across all job managers' slots anyway? Or is the map() spread out only if I do as you suggest messageStream.rebalance().map(..) ? Best regards Rob >-------- Оригинално писмо -------- >От: Carst Tankink ctankink@bol.com >Относно: Re: kafka consumer parallelism >До: "user@flink.apache.org" >Изпратено на: 03.10.2017 11:30 > (Accidentally sent this to Timo instead of to-list...) > > Hi, > > What Timo says is true, but in case you have a higher parallism than the number of partitions (because you want to make use of it in a future operator), you could do a .rebalance() (see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#physical-partitioning) after the Kafka source. > This makes sure that all operators after the Kafka source get an even load, at the cost of having to redistribute the documents (so there is de/serialization + network overhead). > > > Carst > > On 10/3/17, 09:34, "Sofer, Tovi " wrote: > > Hi Robert, > > I had similar issue. > For me the problem was that the topic was auto created with one partition. > You can alter it to have 5 partitions using kafka-topics command. > Example: > kafka-topics --alter --partitions 5 --topic fix --zookeeper localhost:2181 > > Regards, > Tovi > -----Original Message----- > From: Timo Walther [mailto:twalthr@apache.org] > Sent: יום ב 02 אוקטובר 2017 20:59 > To: user@flink.apache.org > Subject: Re: kafka consumer parallelism > > Hi, > > I'm not a Kafka expert but I think you need to have more than 1 Kafka partition to process multiple documents at the same time. Make also sure to send the documents to different partitions. > > Regards, > Timo > > > Am 10/2/17 um 6:46 PM schrieb r. r.: > > Hello > > I'm running a job with "flink run -p5" and additionally set env.setParallelism(5). > > The source of the stream is Kafka, the job uses FlinkKafkaConsumer010. > > In Flink UI though I notice that if I send 3 documents to Kafka, only one 'instance' of the consumer seems to receive Kafka's record and send them to next operators, which according to Flink UI are properly parallelized. > > What's the explanation of this behavior? > > According to sources: > > > > To enable parallel execution, the user defined source should > > * implement {@link > > org.apache.flink.streaming.api.functions.source.ParallelSourceFunction > > } or extend {@link > > * > > org.apache.flink.streaming.api.functions.source.RichParallelSourceFunc > > tion} > > which FlinkKafkaConsumer010 does > > > > Please check a screenshot at > > https://urldefense.proofpoint.com/v2/url?u=https-3A__imgur.com_a_E1H9r > > &d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3rJ99 > > _MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=ti6cswIJ4X9 > > d5wgGkq5EUx41y4WXZ_z_HebkoOrLEmw&e= you'll see that only one sends 3 > > records to the sinks > > > > My code is here: > > https://urldefense.proofpoint.com/v2/url?u=https-3A__pastebin.com_yjYC > > XAAR&d=DwIDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=bfLStYBPfgr58eRbGoW11gp3x4kr3 > > rJ99_MiSMX5oOs&m=LiwKApZmqwYYsiKqby4Ugd5WJgyPKpj3H7s9l7Xw_Qg&s=AApHKm3 > > amPLzWwAqk2KITEeUkhNE0GS1Oo02jaUpKIw&e= > > > > Thanks! > > >