Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3F7E2009F8 for ; Fri, 3 Jun 2016 20:33:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2939160A49; Fri, 3 Jun 2016 18:33:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3D94160A3B for ; Fri, 3 Jun 2016 20:33:19 +0200 (CEST) Received: (qmail 89285 invoked by uid 500); 3 Jun 2016 18:33:19 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 89268 invoked by uid 99); 3 Jun 2016 18:33:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jun 2016 18:33:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 4EE15C0D0F for ; Fri, 3 Jun 2016 18:33:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.198 X-Spam-Level: * X-Spam-Status: No, score=1.198 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id UWUTsDjTuudo for ; Fri, 3 Jun 2016 18:33:16 +0000 (UTC) Received: from mail-io0-f178.google.com (mail-io0-f178.google.com [209.85.223.178]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 1CC345F624 for ; Fri, 3 Jun 2016 18:33:16 +0000 (UTC) Received: by mail-io0-f178.google.com with SMTP id p194so83322580iod.1 for ; Fri, 03 Jun 2016 11:33:16 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=BZSzfxMeS5zCZiVBWJ4pF526hAKWzCGLyAk0g9+AMts=; b=osooeql/qBEvF6bEYapL2cdNceMRqCAbm4bTyqZuj1GJ92hfMIqz1F/joAouF3xbok hECfICBUXyGyUGCHxmrcMNCKfDJdMy0K+ReIojQ0/IHuk++HCj6EasqZi4hFuSdQxHgK tUICMFD6WeOZReBGaUJrM9+P4kHqeD9xhGLj4i4jyKISZl74F8XOG9aCYARZ4CQjNv4S vQX2+npRkZp/6GURNV1nrumbCpHB4NmRvoULgJgNlZLk5TycTzqlJP1GdynpmiCemoP1 kH8OamzfunDs8GmB74Tl5TIAxkVbSQLeGDoXd92aUW5/NZmbCCkPqbNv5nqU3rcgQMtq LABw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=BZSzfxMeS5zCZiVBWJ4pF526hAKWzCGLyAk0g9+AMts=; b=TMmYj8O+kQo+zFychpBUGgHpe9WRr4BAZ5TfkYFjlQn5MArG125ZSMb0rJBZ4A8BkZ lD0WyVxeBSd7arTmL/DcjyUTEsoUms+W+1GRsMzrXo3iDhz8k1GuUbpWY6LD9yHdaHAM ltkAmsqXAGlUBABzbG8A5pFvzw9H20Q3WSakJgCkQtms35eQ3F0uL6yYCr2gzWJk0ZVb 8Wv15EYc4NTQAj6tquoaCoXZzsoWY3M20NekxSedBkoy00g6OMQt2MI6jGA4hYASFmqI YKFiD34D5bg9XDS1MGoaYe2H/n3G1pT9bJCuCcY6p24uQliorljwAT84ZFkydjMiEckl NUAw== X-Gm-Message-State: ALyK8tIUrWsL2eWrnsyjgWkrV29nsFzzsU5gXLqpBTMYuAoc11CuYSCj3B0tYf1fP5zYV22k4Ka/3L8QuSW80w== X-Received: by 10.36.67.210 with SMTP id s201mr1483152itb.86.1464978789120; Fri, 03 Jun 2016 11:33:09 -0700 (PDT) MIME-Version: 1.0 Received: by 10.79.129.147 with HTTP; Fri, 3 Jun 2016 11:33:08 -0700 (PDT) In-Reply-To: <20160526235645.7A89AB2050@b01ledav03.gho.pok.ibm.com> References: <201604141805.u3EI5fM3010332@d03av04.boulder.ibm.com> <201604181641.u3IGfwQO002368@d01av02.pok.ibm.com> <20160526235645.7A89AB2050@b01ledav03.gho.pok.ibm.com> From: Guozhang Wang Date: Fri, 3 Jun 2016 11:33:08 -0700 Message-ID: Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary=001a113fac78545b03053463f4d2 archived-at: Fri, 03 Jun 2016 18:33:21 -0000 --001a113fac78545b03053463f4d2 Content-Type: text/plain; charset=UTF-8 Sorry for being late on this thread. The assign() function is auto-triggered during the rebalance by one of the consumers when it receives all subscription information collected from the server-side coordinator. More details can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal#KafkaClient-sideAssignmentProposal-ConsumerEmbeddedProtocol As for Kafka Streams, they way it did "stickiness" is by 1) let all consumers put their current assigned topic-partitions and server ids into the "metadata" field of the JoinGroupRequest, 2) when the selected consumer triggers assign() along with all the subscriptions as well as their metadata, it can parse the metadata to learn about the existing assignment map; and hence when making the new assignment it will try to assign partitions to its current owners "with best effort". Hope this helps. Guozhang On Thu, May 26, 2016 at 4:56 PM, Vahid S Hashemian < vahidhashemian@us.ibm.com> wrote: > Hi Guozhang, > > I was looking at the implementation of StreamsPartitionAssignor through > its unit tests and expected to find some tests that > - verify stickiness by making at least two calls to the assign() method > (so we check the second assign() call output preserves the assignments > coming from the first assign() call output); or > - start off by a preset assignment, call assign() after some subscription > change, and verify the previous assignment are preserved. > But none of the methods seem to do these. Did I overlook them, or > stickiness is being tested in some other fashion? > > Also, if there is a high-level write-up about how this assignor works > could you please point me to it? Thanks. > > Regards. > --Vahid > > > > > From: Guozhang Wang > To: "dev@kafka.apache.org" > Date: 05/02/2016 10:34 AM > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment Strategy > > > > Just FYI, the StreamsPartitionAssignor in Kafka Streams are already doing > some sort of sticky partitioning mechanism. This is done through the > userData field though; i.e. all group members send their current "assigned > partitions" in their join group request, which will be grouped and send to > the leader, the leader then does best-effort for sticky-partitioning. > > > Guozhang > > On Fri, Apr 29, 2016 at 9:48 PM, Ewen Cheslack-Postava > wrote: > > > I think I'm unclear how we leverage the > > onPartitionsRevoked/onPartitionsAssigned here in any way that's > different > > from our normal usage -- certainly you can use them to generate a diff, > but > > you still need to commit when partitions are revoked and that has a > > non-trivial cost. Are we just saying that you might be able to save some > > overhead, e.g. closing/reopening some other resources by doing a flush > but > > not a close() or something? You still need to flush any output and > commit > > offsets before returning from onPartitionsRevoked, right? Otherwise you > > couldn't guarantee clean handoff of partitions. > > > > In terms of the rebalancing, the basic requirements in the KIP seem > sound. > > Passing previous assignment data via UserData also seems reasonable > since > > it avoids redistributing all assignment data to all members and doesn't > > rely on the next generation leader being a member of the current > > generation. Hopefully this shouldn't be surprising since I think I > > discussed this w/ Jason before he updated the relevant wiki pages :) > > > > -Ewen > > > > > > On Mon, Apr 18, 2016 at 9:34 AM, Vahid S Hashemian < > > vahidhashemian@us.ibm.com> wrote: > > > > > HI Jason, > > > > > > Thanks for your feedback. > > > > > > I believe your suggestion on how to take advantage of this assignor is > > > valid. We can leverage onPartitionsRevoked() and > onPartitionsAssigned() > > > callbacks and do a comparison of assigned partitions before and after > the > > > re-balance and do the cleanup only if there is a change (e.g., if some > > > previously assigned partition is not in the assignment). > > > > > > On your second question, a number of tests that I ran shows that the > old > > > assignments are preserved in the current implementation; except for > when > > > the consumer group leader is killed; in which case, a fresh assignment > is > > > performed. This is something that needs to be fixed. I tried to use > your > > > pointers to find out where the best place is to preserve the old > > > assignment in such circumstances but have not been able to pinpoint > it. > > If > > > you have any suggestion on this please share. Thanks. > > > > > > Regards, > > > Vahid Hashemian > > > > > > > > > > > > > > > From: Jason Gustafson > > > To: dev@kafka.apache.org > > > Date: 04/14/2016 11:37 AM > > > Subject: Re: [DISCUSS] KIP-54 Sticky Partition Assignment > Strategy > > > > > > > > > > > > Hi Vahid, > > > > > > Thanks for the proposal. I think one of the advantages of having > sticky > > > assignment would be reduce the need to cleanup local partition state > > > between rebalances. Do you have any thoughts on how the user would > take > > > advantage of this assignor in the consumer to do this? Maybe one > approach > > > is to delay cleanup until you detect a change from the previous > > assignment > > > in the onPartitionsAssigned() callback? > > > > > > Also, can you provide some detail on how the sticky assignor works at > the > > > group protocol level? For example, do you pass old assignments through > > the > > > "UserData" field in the consumer's JoinGroup? > > > > > > Thanks, > > > Jason > > > > > > On Thu, Apr 14, 2016 at 11:05 AM, Vahid S Hashemian < > > > vahidhashemian@us.ibm.com> wrote: > > > > > > > Hi all, > > > > > > > > I have started a new KIP under > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy > > > > > > > > The corresponding JIRA is at > > > > https://issues.apache.org/jira/browse/KAFKA-2273 > > > > The corresponding PR is at https://github.com/apache/kafka/pull/1020 > > > > > > > > Your feedback is much appreciated. > > > > > > > > Regards, > > > > Vahid Hashemian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Thanks, > > Ewen > > > > > > -- > -- Guozhang > > > > > -- -- Guozhang --001a113fac78545b03053463f4d2--