Return-Path: X-Original-To: apmail-kafka-users-archive@www.apache.org Delivered-To: apmail-kafka-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BDCFCEC7A for ; Tue, 15 Jan 2013 05:25:10 +0000 (UTC) Received: (qmail 92450 invoked by uid 500); 15 Jan 2013 05:25:10 -0000 Delivered-To: apmail-kafka-users-archive@kafka.apache.org Received: (qmail 92411 invoked by uid 500); 15 Jan 2013 05:25:09 -0000 Mailing-List: contact users-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@kafka.apache.org Delivered-To: mailing list users@kafka.apache.org Received: (qmail 92383 invoked by uid 99); 15 Jan 2013 05:25:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Jan 2013 05:25:08 +0000 X-ASF-Spam-Status: No, hits=-0.1 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_MED,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of felix.giguere@mate1inc.com designates 74.125.245.72 as permitted sender) Received: from [74.125.245.72] (HELO na3sys010aog102.obsmtp.com) (74.125.245.72) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 15 Jan 2013 05:25:00 +0000 Received: from mail-qa0-f70.google.com ([209.85.216.70]) (using TLSv1) by na3sys010aob102.postini.com ([74.125.244.12]) with SMTP ID DSNKUPToFri2jfq96tzhHm3lMOZnKULE2eYg@postini.com; Mon, 14 Jan 2013 21:24:39 PST Received: by mail-qa0-f70.google.com with SMTP id hg5so5970509qab.9 for ; Mon, 14 Jan 2013 21:24:37 -0800 (PST) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=x-received:mime-version:in-reply-to:references:from:date:message-id :subject:to:content-type:x-gm-message-state; bh=nGv2GmAiG554eEVPCEhA0D7KCpqBhm2Fh0r1ibqHZkk=; b=KyZiOO0Csen6F8dA3xfyXIWjYUFEHOE2rUv857/D4gfnOzA3dVlU4uZQDHeIa7G1R8 X0Fzp+1YkqO4zWdSsgcvKic71Ac3Lrp8pXDH2Fh+pikHn/GoSOrV3GwE6A+oy9xsUBDL TOUks760fr6eOYhnPcGrlibLXtY3sFEjUfFnfoVTFVaTPrQen85wrtFZx0yw/znP2het Qk+Y5wBKvVgRE1n/B3k709IqPhNOJ4sNYAKs395EkLMcrEsitncCyOGkBqE27wIQ4hof 3htf3tc6DQuHKt6jkz+SQrV8X+OfWBKcjbQf9FvhZqELYCrRM1g895kdIzuncK7mL3np gQvw== X-Received: by 10.49.127.240 with SMTP id nj16mr87884707qeb.13.1358227477845; Mon, 14 Jan 2013 21:24:37 -0800 (PST) Received: by 10.49.127.240 with SMTP id nj16mr87884699qeb.13.1358227477659; Mon, 14 Jan 2013 21:24:37 -0800 (PST) MIME-Version: 1.0 Received: by 10.49.12.2 with HTTP; Mon, 14 Jan 2013 21:24:17 -0800 (PST) In-Reply-To: References: From: Felix GV Date: Tue, 15 Jan 2013 00:24:17 -0500 Message-ID: Subject: Re: Is this a good overview of kafka? To: users@kafka.apache.org, stan.rosenberg@gmail.com Content-Type: multipart/alternative; boundary=047d7b6d9624547f1704d34cfa6b X-Gm-Message-State: ALoCoQl+VzhnhyCArQKP/1LqZ4ExVrTa07HzWKGoURsA0dXY7lSqblO7CYSRlJggOf8OebLWgZTZ9GxqEiddhVebqlllh1aQ8qYHlcTuNQNJ5qmeB4Vvco9Eg+NHKfWbDPMgpZSajjIUFlM1WLtBmitPwoeD6LQRgwGJU/k5RwVnx0AZiq0x6vY= X-Virus-Checked: Checked by ClamAV on apache.org --047d7b6d9624547f1704d34cfa6b Content-Type: text/plain; charset=ISO-8859-1 Sure, I'll try to give a better explanation :) Little disclaimer though: My knowledge is based on my reading of the Kafka design paper more than a year ago, so right off the bat, it's possible that I may be forgetting or assuming things which I shouldn't... Also, Kafka was pre-0.7 at the time, and we've been running 0.7.0-ish in prod for a while now, so it's possible that some of my understanding is outdated in the context of 0.7.2, and there are definitely a fait bit of things that changed in 0.8 but I don't know what changed well enough to make informed statements about 0.8. All that to say that you should take your version of Kafka into account. And it certainly doesn't hurt to read the design paper either ;) So, my understanding is that when a Kafka broker comes online: - The broker contacts the ZK ensemble and registers itself. It also registers partitions for each of to the topics that exist in ZK (and according to the settings its own broker config file). - Producers are watching the online partitions in ZK, and when it changes, ZK fires off an event to them so that they can update their partition count. This partition count is used as a modulo on the hash returned by the brokers' partitioning function. So even if you have a custom partitioning function that deterministically gives out the same hash for a given bucket of messages, if you apply a different modulo to that hash, then on course it's going to make the messages of that bucket go to a different partition. This is done so that all online partitions get to have some data. - Consumers are also watching the online partitions in ZK. When it changes, ZK fires off an event to them, and they start re-balancing, so that the partitions are spread as fairly as possible between the consumers. In that process, partitions are assigned to consumers, and those partition-assignments could (and may very well) be different than the ones that were in place before the re-balance. When a Kafka broker goes offline, if also affects the online partition count, so the producers will again send their messages to different partitions (so that all messages have somewhere to go) and the consumers will re-balance again (to prevent starving a consumer whose partitions became unavailable). When a consumer goes online: - The consumer registers itself in ZK using its consumer group. - If there are other consumers watching that consumer group, then they will get notified and a re-balance of the whole group will be triggered, just like in the above case. When a consumer goes offline, a re-balance is triggered as well for the same reasons. In the case of consumers going online or offline, this does not change the ordering guarantees within the partitions per say. BUT, if your consumers were keeping any sort of internal state in relation to the ordered data they were consuming, then that state won't be relevant anymore, because they will start consuming form different partitions after the rebalance. Depending on the type of processing you're doing, that may or may not break the work your consumer is doing. Thus, the only event that does has no chance of affecting the stickiness of a (data bucket ==> consumer process), is producers going online or offline. Broker changes definitely alter which message buckets go into which partitions. Consumer changes don't affect the content of partitions, but it does change which consumer is consuming which partition. If ordering guarantees are important to you, then I guess the best thing to do might be to add watches on the same type of stuff that triggers the changes described above, and to act accordingly when those changes happen (by flushing the internal state, restarting the consumers, rolling back the ZK offset to some checkpoint in the past, or whatever else is relevant in your use case...) Hopefully that was clear (and accurate) enough...! -- Felix On Mon, Jan 14, 2013 at 9:38 PM, Stan Rosenberg wrote: > Hi Felix, > > Would you mind elaborating on what you said regarding the ordering > guaranteed; inlined below. > > Thanks, > > stan > > On Mon, Jan 14, 2013 at 6:08 PM, Felix GV wrote: > > > > > For example if you partitioned using a User ID field within the messages, > > you would be > > guaranteed that all messages pertaining to a certain user would end up in > > the same partition, and that they would be correctly ordered. You should > be > > aware, however, that this guarantee is only maintained as long as there > are > > no consumer re-balance (which happens when adding or removing a consumer > or > > a broker). > > > > Why would consumer re-balance or broker failure alter the above partition > invariant? > --047d7b6d9624547f1704d34cfa6b--