Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 68870200B99 for ; Wed, 5 Oct 2016 18:47:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 67432160ADE; Wed, 5 Oct 2016 16:47:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF391160AC9 for ; Wed, 5 Oct 2016 18:47:35 +0200 (CEST) Received: (qmail 6395 invoked by uid 500); 5 Oct 2016 16:47:29 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 6010 invoked by uid 99); 5 Oct 2016 16:47:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Oct 2016 16:47:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D4565C7243 for ; Wed, 5 Oct 2016 16:47:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -7.019 X-Spam-Level: X-Spam-Status: No, score=-7.019 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id yktTY8lmV2zm for ; Wed, 5 Oct 2016 16:47:23 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6C6E75FAD5 for ; Wed, 5 Oct 2016 16:47:22 +0000 (UTC) Received: (qmail 4355 invoked by uid 99); 5 Oct 2016 16:47:21 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Oct 2016 16:47:21 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 7E3882C2A64 for ; Wed, 5 Oct 2016 16:47:20 +0000 (UTC) Date: Wed, 5 Oct 2016 16:47:20 +0000 (UTC) From: "Amit Sela (JIRA)" To: commits@beam.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (BEAM-704) KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 05 Oct 2016 16:47:36 -0000 Amit Sela created BEAM-704: ------------------------------ Summary: KafkaIO should handle "latest offset" evenly, and persist it as part of the CheckpointMark. Key: BEAM-704 URL: https://issues.apache.org/jira/browse/BEAM-704 Project: Beam Issue Type: Improvement Components: sdk-java-extensions Reporter: Amit Sela Assignee: James Malone Currently, the KafkaIO (when configured to "latest") will check the latest offset on the worker. This means that each worker sees a "different" latest for the time it checks for the partitions assigned to it. This also means that if a worker fails before starting to read, and new messages were added in between, they would be missed. I think we should consider checking the offsets (could be the same for "earliest") when running initialSplits (that's how Spark does that as well, one call from the driver for all topic-partitions). I'd also suggest we persist the latest offset as part of the CheckpointMark so that once latest is set, it is remembered until new messages arrive and it doesn't need to be resolved again (and if there were new messages available they won't be missed upon failure). For Spark this is even more important as state is passed in-between micro-batches and sparse partitions may skip messages until a message finally arrives within the read time-frame. -- This message was sent by Atlassian JIRA (v6.3.4#6332)