flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees
Date Thu, 25 Aug 2016 23:42:20 GMT
Elias Levy created FLINK-4502:

             Summary: Cassandra connector documentation has misleading consistency guarantees
                 Key: FLINK-4502
                 URL: https://issues.apache.org/jira/browse/FLINK-4502
             Project: Flink
          Issue Type: Bug
          Components: Documentation
    Affects Versions: 1.1.0
            Reporter: Elias Levy

The Cassandra connector documentation states that  "enableWriteAheadLog() is an optional method,
that allows exactly-once processing for non-deterministic algorithms."  This claim appears
to be false.

>From what I gather, the write ahead log feature of the connector works as follows:
- The sink is replaced with a stateful operator that writes incoming messages to the state
backend based on checkpoint they belong in.
- When the operator is notified that a Flink checkpoint has been completed it, for each set
of checkpoints older than and including the committed one:
  * reads its messages from the state backend
  * writes them to Cassandra
  * records that it has committed them to Cassandra for the specific checkpoint and operator
   * and erases them from the state backend.

This process attempts to avoid resubmitting queries to Cassandra that would otherwise occur
when recovering a job from a checkpoint and having messages replayed.

Alas, this does not guarantee exactly once semantics at the sink.  The writes to Cassandra
that occur when the operator is notified that checkpoint is completed are not atomic and they
are potentially non-idempotent.  If the job dies while writing to Cassandra or before committing
the checkpoint via committer, queries will be replayed when the job recovers.  Thus the documentation
appear to be incorrect in stating this provides exactly-once semantics.

There also seems to be an issue in GenericWriteAheadSink's notifyOfCompletedCheckpoint which
may result in incorrect output.  If sendValues returns false because a write failed, instead
of bailing, it simply moves on to the next checkpoint to commit if there is one, keeping the
previous one around to try again later.  But that can result in newer data being overwritten
with older data when the previous checkpoint is retried.  Although given that CassandraCommitter
implements isCheckpointCommitted as checkpointID <= this.lastCommittedCheckpointID, it
actually means that when it goes back to try the uncommitted older checkpoint it will consider
it committed, even though some of its data may not have been written out, and the data will
be discarded.

This message was sent by Atlassian JIRA

View raw message