Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4CBB6200B25 for ; Wed, 8 Jun 2016 21:45:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4BA1B160A0E; Wed, 8 Jun 2016 19:45:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6CB1A160A2E for ; Wed, 8 Jun 2016 21:45:43 +0200 (CEST) Received: (qmail 31893 invoked by uid 500); 8 Jun 2016 19:45:42 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 31883 invoked by uid 99); 8 Jun 2016 19:45:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 19:45:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 3D39FC0709 for ; Wed, 8 Jun 2016 19:45:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 59FUIrfLhrfn for ; Wed, 8 Jun 2016 19:45:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 5B0F95FB2C for ; Wed, 8 Jun 2016 19:45:40 +0000 (UTC) Received: (qmail 31737 invoked by uid 99); 8 Jun 2016 19:45:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 19:45:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 665DADFAED; Wed, 8 Jun 2016 19:45:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 08 Jun 2016 19:45:40 -0000 Message-Id: <9022bfc173fe43a6b3ecf2c5445701f9@git.apache.org> In-Reply-To: <926431c78bae4efc804d65860bca2ff8@git.apache.org> References: <926431c78bae4efc804d65860bca2ff8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-beam git commit: Improve UnboundedSource javadoc archived-at: Wed, 08 Jun 2016 19:45:44 -0000 Improve UnboundedSource javadoc Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/269da952 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/269da952 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/269da952 Branch: refs/heads/master Commit: 269da95272e8d6d35d3a7e6081b6059ca1bc2e37 Parents: 7fb21f2 Author: Mark Shields Authored: Mon May 16 12:56:53 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 8 12:45:31 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/UnboundedSource.java | 47 +++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/269da952/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 82c8db7..2c4a325 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -104,14 +104,34 @@ public abstract class UnboundedSource< */ public interface CheckpointMark { /** - * Perform any finalization that needs to happen after a bundle of data read from - * the source has been processed and committed. + * Called by the system to signal that this checkpoint mark has been committed along with + * all the records which have been read from the {@link UnboundedReader} since the + * previous finalized checkpoint was taken. * - *

For example, this could be sending acknowledgement requests to an external - * data source such as Pub/Sub. + *

For example, this method could send acknowledgements to an external data source + * such as Pubsub. * - *

This may be called from any thread, potentially at the same time as calls to the - * {@code UnboundedReader} that created it. + *

Note that: + *

    + *
  • This finalize method may be called from any thread, concurrently with calls to + * the {@link UnboundedReader} it was created from. + *
  • Checkpoints will not necessarily be finalized as soon as they are created. + * A checkpoint may be taken while a previous checkpoint from the same + * {@link UnboundedReader} has not yet be finalized. + *
  • In the absence of failures, all checkpoints will be finalized and they will be + * finalized in the same order they were taken from the {@link UnboundedReader}. + *
  • It is possible for a checkpoint to be taken but this method never called if + * the checkpoint could not be committed for any reason. + *
  • If this call throws an exception then the entire checkpoint will be abandoned and the + * reader restarted from an earlier, successfully-finalized checkpoint. + *
  • If a checkpoint fails for any reason then no later checkpoint will be allowed to be + * finalized without the reader first being restarted. + *
  • If an {@link UnboundedReader} is restarted from an earlier checkpoint, the checkpoint + * instance will be deserialized from the serialized form of the earlier checkpoint using + * {@link UnboundedSource#getCheckpointMarkCoder()}. + *
  • It is not safe to assume the {@link UnboundedReader} from which this checkpoint was + * created still exists at the time this method is called. + *
*/ void finalizeCheckpoint() throws IOException; } @@ -126,9 +146,11 @@ public abstract class UnboundedSource< private static final byte[] EMPTY = new byte[0]; /** - * Initializes the reader and advances the reader to the first record. + * Initializes the reader and advances the reader to the first record. If the reader has been + * restored from a checkpoint then it should advance to the next unread record at the point + * the checkpoint was taken. * - *

This method should be called exactly once. The invocation should occur prior to calling + *

This method will be called exactly once. The invocation will occur prior to calling * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that * are needed to initialize the reader. * @@ -205,10 +227,11 @@ public abstract class UnboundedSource< /** * Returns a {@link CheckpointMark} representing the progress of this {@code UnboundedReader}. * - *

The elements read up until this is called will be processed together as a bundle. Once - * the result of this processing has been durably committed, - * {@link CheckpointMark#finalizeCheckpoint} will be called on the {@link CheckpointMark} - * object. + *

All elements read up until this method is called will be processed together as a bundle. + * (An element is considered 'read' if it could be returned by a call to {@link #getCurrent}.) + * Once the result of processing those elements and the returned checkpoint have been durably + * committed, {@link CheckpointMark#finalizeCheckpoint} will (eventually) be called on the + * returned {@link CheckpointMark} object. * *

The returned object should not be modified. *