Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AAEE4200B0F for ; Thu, 2 Jun 2016 09:10:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AA5F6160A22; Thu, 2 Jun 2016 07:10:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F1568160A3E for ; Thu, 2 Jun 2016 09:10:00 +0200 (CEST) Received: (qmail 90831 invoked by uid 500); 2 Jun 2016 07:09:59 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 90618 invoked by uid 99); 2 Jun 2016 07:09:59 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2016 07:09:59 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 524EB2C1F6E for ; Thu, 2 Jun 2016 07:09:59 +0000 (UTC) Date: Thu, 2 Jun 2016 07:09:59 +0000 (UTC) From: "Michael Andre Pearce (IG) (JIRA)" To: dev@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 02 Jun 2016 07:10:01 -0000 [ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15311853#comment-15311853 ] Michael Andre Pearce (IG) commented on KAFKA-2260: -------------------------------------------------- Is this still active/alive? would be a real shame if this was left to die and a shame it didn't make it into 0.10, the concept and neatness of the solution very useful. We would have many uses for this. > Allow specifying expected offset on produce > ------------------------------------------- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement > Reporter: Ben Kirwin > Assignee: Ewen Cheslack-Postava > Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the Kafka producer. This update has a small footprint, but enables a bunch of interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the expected offset doesn't match the actual offset, the server should fail the produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous check-and-set -- but instead of checking the current value of some state, it checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low contention. Happily, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) for a given partition -- and in many of these cases, predicting offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer can retry a write indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n messages consecutively to a partition, even if the list is much larger than the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a currently-unused field, adds no new APIs, and requires very little new code or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored by the server. This field could be used for the 'expected offset', with a sigil value for the current behaviour. (-1 is a natural choice, since it's already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this changed, this method would assign offsets in the same way -- but if they don't match the offset in the message, we'd return an error instead of completing the write. > - To avoid breaking existing clients, this behaviour would need to live behind some config flag. (Possibly global, but probably more useful per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer questions, and if this seems interesting, I'd be glad to flesh this out into a full KIP or patch. (And apologies if this is the wrong venue for this sort of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)