Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6CB6200B72 for ; Fri, 26 Aug 2016 19:37:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B5323160A94; Fri, 26 Aug 2016 17:37:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 059CD160AC4 for ; Fri, 26 Aug 2016 19:37:21 +0200 (CEST) Received: (qmail 40318 invoked by uid 500); 26 Aug 2016 17:37:21 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 40309 invoked by uid 99); 26 Aug 2016 17:37:21 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 17:37:21 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id A491B2C015C for ; Fri, 26 Aug 2016 17:37:20 +0000 (UTC) Date: Fri, 26 Aug 2016 17:37:20 +0000 (UTC) From: "Elias Levy (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 26 Aug 2016 17:37:22 -0000 [ https://issues.apache.org/jira/browse/FLINK-4502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15439445#comment-15439445 ] Elias Levy commented on FLINK-4502: ----------------------------------- Chesnay, thanks for correcting me about the overwriting issue. I missed the implication of setting the timestamp on the queries. I still think the documentation is not clear that idempotent queries are a prerequisite of exactly-once semantics when the WAL is enabled, as shown by the portion of the documentation I quoted. The connector documentation page mentions idempotent once, in the paragraph previous to the one describing the WAL. The WAL functionality description could also be clearer. For instance, is says "The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt." But that does not seem accurate. The WAL doesn't appear to guarantee that a replayed checkpoint is identical. Rather, it guarantees that no output associated with a checkpoint is written to the sink until the checkpoint is complete, which avoids possibly incompatible duplicate outputs when data from non-completed checkpoints are replayed on recovery. > Cassandra connector documentation has misleading consistency guarantees > ----------------------------------------------------------------------- > > Key: FLINK-4502 > URL: https://issues.apache.org/jira/browse/FLINK-4502 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Documentation > Affects Versions: 1.1.0 > Reporter: Elias Levy > > The Cassandra connector documentation states that "enableWriteAheadLog() is an optional method, that allows exactly-once processing for non-deterministic algorithms." This claim appears to be false. > From what I gather, the write ahead log feature of the connector works as follows: > - The sink is replaced with a stateful operator that writes incoming messages to the state backend based on checkpoint they belong in. > - When the operator is notified that a Flink checkpoint has been completed it, for each set of checkpoints older than and including the committed one: > * reads its messages from the state backend > * writes them to Cassandra > * records that it has committed them to Cassandra for the specific checkpoint and operator instance > * and erases them from the state backend. > This process attempts to avoid resubmitting queries to Cassandra that would otherwise occur when recovering a job from a checkpoint and having messages replayed. > Alas, this does not guarantee exactly once semantics at the sink. The writes to Cassandra that occur when the operator is notified that checkpoint is completed are not atomic and they are potentially non-idempotent. If the job dies while writing to Cassandra or before committing the checkpoint via committer, queries will be replayed when the job recovers. Thus the documentation appear to be incorrect in stating this provides exactly-once semantics. > There also seems to be an issue in GenericWriteAheadSink's notifyOfCompletedCheckpoint which may result in incorrect output. If sendValues returns false because a write failed, instead of bailing, it simply moves on to the next checkpoint to commit if there is one, keeping the previous one around to try again later. But that can result in newer data being overwritten with older data when the previous checkpoint is retried. Although given that CassandraCommitter implements isCheckpointCommitted as checkpointID <= this.lastCommittedCheckpointID, it actually means that when it goes back to try the uncommitted older checkpoint it will consider it committed, even though some of its data may not have been written out, and the data will be discarded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)