Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 84BCC160C1B for ; Wed, 3 Jan 2018 19:26:06 +0100 (CET) Received: (qmail 48100 invoked by uid 500); 3 Jan 2018 18:26:00 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 48088 invoked by uid 99); 3 Jan 2018 18:26:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jan 2018 18:26:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E1D6B1805BE for ; Wed, 3 Jan 2018 18:25:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id K8r7bUiI-cfp for ; Wed, 3 Jan 2018 18:25:58 +0000 (UTC) Received: from mail-ua0-f169.google.com (mail-ua0-f169.google.com [209.85.217.169]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 074725F2F0 for ; Wed, 3 Jan 2018 18:25:58 +0000 (UTC) Received: by mail-ua0-f169.google.com with SMTP id e39so1598118uae.12 for ; Wed, 03 Jan 2018 10:25:57 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=VFR2Ts2NI2VOg4ScXeopOVyEWKM2rcaYQps07zxWM5o=; b=r4/Hywf987Vw9J1fSwDZ5KaxEmTsHNbq4foXKYqwdA/v4ET7qFTo8r74UwWxzbm4hf Z5ElirB6jhbQ4rgZ3SAl3lO0jX6/Jk3GnwBHFK8F3CUOd+u6VTDaGoMI6Ewtcc3iFtzB L2qYqdnr55oCRYlRNGnEFbjcZGfqQRwjWjY/a/gUpVdvX/ffHY+2alxBy8IFJ+b+9PbL wEh6qhSVKn0SIPQiKJvN3IyV4Kf6d7WAL/xrsn0lVTpDIvE1QZ6KFU60MHZCXMkmvqIX ac7ZJrnP/hQKD5zNIOwhXKWFH9IN2yA9OBASfuTWIUcsJpxWdZHd2z62xh6dFUGWjPjd GjnQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=VFR2Ts2NI2VOg4ScXeopOVyEWKM2rcaYQps07zxWM5o=; b=Tra2TChPtXvdzninLE0yv79SU0S0udFRDAs/4x3Qa4p7JIGMFvzKXhqxODfKVeIuKv wf7OpZsDcww+3NgPK/XVKWMu8MEwr5yOqQPV1PNg6JPd2y71Mc44KRrbkYIDgKvcvvY2 YaWIRrLkM+Gjmgz+l5Y939eyYTKv+aZj0nnCzkm1uWQjVRzlFR+qRS21DB4vCNSCiaGq mujPQH2x2o42VS6gEeqmQJxpZF2R16duz2F6Cs/LRhEs1nT01HYqbhPHfANIuN/Ikm6F lHhCnCZAazIYzkGobj88PJPNNWGAj1UhCBFMzVvyw/pvcJnBKx67m350GfPWP/Y7gQPa FV4A== X-Gm-Message-State: AKGB3mLZkN9fmkM3Sf6mYCVCWCEpqvOlIIg5s0S+ucpT2KOhpCgpspH1 E70E2LJvxBCWo6OZJdLCVysl74WFvV4KnbC5evs8QA== X-Google-Smtp-Source: ACJfBovXdtGV+d8YD2ZBH3I9i26znqdD0pEj21YEescHa5I1CMcGCgv/OLj1l9lud4CxXKwYVUEuLDh1kLhMNamF6PA= X-Received: by 10.159.38.98 with SMTP id 89mr2191231uag.37.1515003950797; Wed, 03 Jan 2018 10:25:50 -0800 (PST) MIME-Version: 1.0 Received: by 10.176.12.13 with HTTP; Wed, 3 Jan 2018 10:25:30 -0800 (PST) From: Sagar Date: Wed, 3 Jan 2018 23:55:30 +0530 Message-ID: Subject: Question Regarding seek() in KafkaConsumer To: dev@kafka.apache.org Content-Type: multipart/alternative; boundary="001a113c874c5216df0561e35806" archived-at: Wed, 03 Jan 2018 18:26:07 -0000 --001a113c874c5216df0561e35806 Content-Type: text/plain; charset="UTF-8" Hi, I have a use case where in I need exactly once processing for data across various topics. So, this means that in case of errors, the next time poll() method runs, it should start off from the exact same record. The approach I have used is, 1) Call poll() and get a list of records. 2) For each record: 2.1) Get the processing status of a record. 2.2) In case of success, add it to a list of records to be committed. 2.3) In case of failure: a) Commit whatever records were collected till now. b) For each topic partition, get the last committed offset using committed() method in KafkaConsumer. c) Seek the topic partition to the offset obtained from above. break the loop. In my mind it should have worked, but then I experienced some data loss along the way when I ran it from earliest(across 17 topics 5 partitions). The javadocs for seek say : *If this API is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets* I am committing offsets manually (sync). Just wanted to understand is i not safe to call this api this way? If I want to reliably retry then what patterns should be used? Couple of other approaches I read in Kafka definitive gide were to push the retriable records to a buffer or to a separate topic. So, are those the correct ways of retrying and not this? Thanks! Sagar. --001a113c874c5216df0561e35806--