Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 70374200B44 for ; Thu, 14 Jul 2016 17:01:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6F0A3160A63; Thu, 14 Jul 2016 15:01:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8FC12160A60 for ; Thu, 14 Jul 2016 17:01:39 +0200 (CEST) Received: (qmail 71200 invoked by uid 500); 14 Jul 2016 15:01:33 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 71184 invoked by uid 99); 14 Jul 2016 15:01:33 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jul 2016 15:01:33 +0000 Received: from mail-wm0-f51.google.com (mail-wm0-f51.google.com [74.125.82.51]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1485B1A01E3 for ; Thu, 14 Jul 2016 15:01:33 +0000 (UTC) Received: by mail-wm0-f51.google.com with SMTP id f126so69793574wma.1 for ; Thu, 14 Jul 2016 08:01:32 -0700 (PDT) X-Gm-Message-State: ALyK8tLh90udqPPBUFMMxIHRwvTPff2McNvXb97hoyB2qmXbBKY3cU+0yqueOwrwW2rLPiNp/INhZnmw3XmWyg== X-Received: by 10.28.197.68 with SMTP id v65mr18004636wmf.2.1468508491560; Thu, 14 Jul 2016 08:01:31 -0700 (PDT) MIME-Version: 1.0 Received: by 10.194.5.68 with HTTP; Thu, 14 Jul 2016 08:01:11 -0700 (PDT) In-Reply-To: References: From: Robert Metzger Date: Thu, 14 Jul 2016 17:01:11 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [Discussion] Query Regarding Operator chaining To: Aljoscha Krettek Cc: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=94eb2c0d7c60fd7bfa053799c6c0 archived-at: Thu, 14 Jul 2016 15:01:40 -0000 --94eb2c0d7c60fd7bfa053799c6c0 Content-Type: text/plain; charset=UTF-8 Aljoscha is right. Multiple consumers in the same consumer group can not read from the same partition. You'll need to create a Kafka topic with more partitions to have higher parallelism. On Wed, Jul 6, 2016 at 10:45 AM, Aljoscha Krettek wrote: > Hi, > unfortunately the reading of one Kafka partition cannot be split among > several parallel instances of the Kafka source. So if you have only 2 > partitions your reading parallelism is limited to that. You are right that > this can lead to bad performance and underutilization. The only solution I > see right now is to have more partitions in Kafka so that more readers can > read in parallel. > > +Robert Adding Robert directly because he might have something more to say > about this. > > Cheers, > Aljoscha > > On Tue, 5 Jul 2016 at 15:48 Vinay Patil wrote: > >> Hi, >> >> The re-balance actually distributes it to all the task managers, and now >> all TM's are getting utilized, You were right , I am seeing two >> boxes(Tasks) now. >> >> I have one question regarding the task slots : >> >> For the source the parallelism is set to 56, now when we see on the UI and >> click on source sub-task , I see 56 entries , out of which only two are >> getting the data from Kafka (this may be because I have two kafka >> partitions) >> >> The 56 entries that I am seeing for a sub-task on UI are the total task >> slots of all TM's, right ? >> >> If yes, only two slots are getting utilized, how do I ensure enough task >> slots are getting utilized at the source ? I have 7 task managers (8 cores >> per TM), so if only 1 core each of two task manager is performing the >> consume operation, wouldn't it hamper the performance. >> >> Even if two Task managers are utilized , all 16 slots should have been >> used >> , right ? >> >> For the other sub-task, for all 56 entries I am seeing bytes received. >> (this may be because of applying rebalance after the source) >> >> P.S: I am reading over million records from Kafka , so need to utilize >> enough resources [Performance is the key here]. >> >> >> Regards, >> Vinay Patil >> >> On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil >> wrote: >> >> > Thanks a lot guys, this helps to understand better >> > >> > Regards, >> > Vinay Patil >> > >> > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen wrote: >> > >> >> Just to be sure: Each *subtask* has one thread - so for each task, >> there >> >> are as many parallel threads (distributed across nodes) as your >> >> parallelism >> >> indicates. >> >> >> >> For most cases, having long chains and then a higher parallelism is a >> good >> >> choice. >> >> Cases where individual functions (MapFunction, etc) do something very >> CPU >> >> intensive are cases where you may want to not chain them, so they get a >> >> separate thread. >> >> >> >> If you see all tasks in one box in the UI, it probably means you have >> only >> >> "Filter" and "Map" as a function? In that case it is fine to have just >> one >> >> box (=Task) in the UI. The box still has parallelism via subtasks. >> >> >> >> If you insert a "rebalance()" between the Kafka Source and the >> >> Map/Filter/etc it makes sure that the data distribution in the >> >> Map/Filter/etc operators has best utilization independent of how the >> data >> >> was partitioned in Kafka. >> >> You should then also see two boxes in the UI - one for the Kafka >> Source, >> >> one for the actual processing. >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek >> >> wrote: >> >> >> >> > Hi, >> >> > chaining is useful to minimize communication overhead. But in your >> case >> >> you >> >> > might benefit more from having good cluster utilization. There seems >> to >> >> be >> >> > a tradeoff. Maybe you can run some easy tests to see how it behaves >> for >> >> > you. >> >> > >> >> > Cheers, >> >> > Aljoscha >> >> > >> >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil >> >> wrote: >> >> > >> >> > > Thanks, >> >> > > >> >> > > so is operator chaining useful in terms of utilizing the resources >> or >> >> we >> >> > > should keep the chaining to minimal use, say 3-4 operators and >> disable >> >> > > chaining ? >> >> > > I am worried because I am seeing all the operators in one box on >> flink >> >> > UI. >> >> > > >> >> > > >> >> > > Regards, >> >> > > Vinay Patil >> >> > > >> >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek < >> aljoscha@apache.org >> >> > >> >> > > wrote: >> >> > > >> >> > > > Hi, >> >> > > > this is true, yes. If the number of Kafka partitions is less than >> >> the >> >> > > > parallelism then some of the sources might not be utilized. If >> you >> >> > > insert a >> >> > > > rebalance after the sources you should be able to utilize all the >> >> > > > downstream operations equally. >> >> > > > >> >> > > > Cheers, >> >> > > > Aljoscha >> >> > > > >> >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil > > >> >> > wrote: >> >> > > > >> >> > > > > Just an update, the task will be executed by multiple threads >> , my >> >> > bad >> >> > > I >> >> > > > > asked the wrong way. >> >> > > > > Can you please clarify other things. >> >> > > > > >> >> > > > > Out of 8 node only 3 of them are getting utilized, reading the >> >> data >> >> > > from >> >> > > > > Kafka , does it mean that the Kafka partitions are set to less >> >> > number ? >> >> > > > > >> >> > > > > What if we use rescale or rebalance since it evenly >> distributes , >> >> > would >> >> > > > > that ensure maximum use of resources ? >> >> > > > > >> >> > > > > Regards, >> >> > > > > Vinay Patil >> >> > > > > >> >> > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < >> >> > vinay18.patil@gmail.com> >> >> > > > > wrote: >> >> > > > > >> >> > > > > > Hi, >> >> > > > > > >> >> > > > > > According to the documentation : >> >> > > > > > *"**Each task is executed by one thread ,**Chaining operators >> >> > > together >> >> > > > > > into tasks is a useful optimization: it reduces the overhead >> of >> >> > > > > > thread-to-thread handover and buffering, and increases >> overall >> >> > > > throughput >> >> > > > > > while decreasing latency"* >> >> > > > > > So does it mean that the single box (refer below mails) >> >> represent >> >> > it >> >> > > as >> >> > > > > a *single >> >> > > > > > task* and the task will be executed by single thread only ? >> >> > > > > > >> >> > > > > > I am having 8 node cluster (parallelism set to 56), so what >> is >> >> the >> >> > > > > correct >> >> > > > > > way to achieve maximum CPU utilization and parallelism ? Does >> >> > > complete >> >> > > > > > stream chaining into a single box achieve maximum >> parallelism ? >> >> > > > > > >> >> > > > > > The data we are processing is huge volume of data (60,000 >> >> records >> >> > per >> >> > > > > > second), so wanted to be sure what we can correct to achieve >> >> better >> >> > > > > > results. >> >> > > > > > >> >> > > > > > Regards, >> >> > > > > > Vinay Patil >> >> > > > > > >> >> > > > > > >> >> > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < >> >> > > aljoscha@apache.org> >> >> > > > > > wrote: >> >> > > > > > >> >> > > > > >> Hi, >> >> > > > > >> yes, the window operator is stateful, which means that it >> will >> >> > pick >> >> > > up >> >> > > > > >> where it left in case of a failure and restore. >> >> > > > > >> >> >> > > > > >> You're right about the graph, chained operators are shown as >> >> one >> >> > > box. >> >> > > > > >> >> >> > > > > >> Cheers, >> >> > > > > >> Aljoscha >> >> > > > > >> >> >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil < >> >> vinay18.patil@gmail.com> >> >> > > > > wrote: >> >> > > > > >> >> >> > > > > >> > Hi, >> >> > > > > >> > >> >> > > > > >> > Just watched the video on Robust Stream Processing . >> >> > > > > >> > So when we say Window is a stateful operator , does it >> mean >> >> that >> >> > > > even >> >> > > > > if >> >> > > > > >> > the task manager doing the window operation fails, will >> it >> >> pick >> >> > > up >> >> > > > > from >> >> > > > > >> > the state left earlier when it comes up ? (Have not read >> >> more on >> >> > > > state >> >> > > > > >> for >> >> > > > > >> > now) >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > Also in one of our project when we deploy on cluster and >> >> check >> >> > the >> >> > > > Job >> >> > > > > >> > Graph , everything is shown in one box , why this happens >> ? >> >> Is >> >> > it >> >> > > > > >> because >> >> > > > > >> > of chaining of streams ? >> >> > > > > >> > So the box here represent the function flow, right ? >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > Regards, >> >> > > > > >> > Vinay Patil >> >> > > > > >> > >> >> > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> > >> > >> > --94eb2c0d7c60fd7bfa053799c6c0--