flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7210) Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)
Date Tue, 25 Jul 2017 08:02:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099667#comment-16099667
] 

ASF GitHub Bot commented on FLINK-7210:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4368#discussion_r129234789
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
    @@ -0,0 +1,342 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.state.CheckpointListener;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Objects;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * This is a recommended base class for all of the {@link SinkFunction} that intend to
implement exactly-once semantic.
    + * It does that by implementing two phase commit algorithm on top of the {@link CheckpointedFunction}
and
    + * {@link CheckpointListener}. User should provide custom TXN (transaction handle) and
implement abstract methods
    + * handling this transaction handle.
    + *
    + * @param <IN> Input type for {@link SinkFunction}
    + * @param <TXN> Transaction to store all of the information required to handle
a transaction (must be Serializable)
    + */
    +@PublicEvolving
    +public abstract class TwoPhaseCommitSinkFunction<IN, TXN extends Serializable>
    +		extends RichSinkFunction<IN>
    +		implements CheckpointedFunction, CheckpointListener {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
    +
    +	protected final ListStateDescriptor<TransactionAndCheckpoint<TXN>> pendingCommitTransactionsDescriptor;
    +	protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor;
    +
    +	protected final List<TransactionAndCheckpoint<TXN>> pendingCommitTransactions
= new ArrayList<>();
    +
    +	@Nullable
    +	protected TXN currentTransaction;
    +	protected ListState<TXN> pendingTransactionsState;
    +	protected ListState<TransactionAndCheckpoint<TXN>> pendingCommitTransactionsState;
    +
    +	public TwoPhaseCommitSinkFunction(Class<TXN> txnClass) {
    +		this(
    +			TypeInformation.of(txnClass),
    +			TypeInformation.of(new TypeHint<TransactionAndCheckpoint<TXN>>() {}));
    +	}
    +
    +	public TwoPhaseCommitSinkFunction(
    +			TypeInformation<TXN> txnTypeInformation,
    +			TypeInformation<TransactionAndCheckpoint<TXN>> txnAndCheckpointTypeInformation)
{
    +		this(
    +			new ListStateDescriptor<>("pendingTransactions", txnTypeInformation),
    +			new ListStateDescriptor<>("pendingCommitTransactions", txnAndCheckpointTypeInformation));
    +	}
    +
    +	public TwoPhaseCommitSinkFunction(
    +			ListStateDescriptor<TXN> pendingTransactionsDescriptor,
    +			ListStateDescriptor<TransactionAndCheckpoint<TXN>> pendingCommitTransactionsDescriptor)
{
    +		this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor,
"pendingTransactionsDescriptor is null");
    +		this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor,
"pendingCommitTransactionsDescriptor is null");
    +	}
    +
    +	// ------ methods that should be implemented in child class to support two phase commit
algorithm ------
    +
    +	/**
    +	 * Write value within a transaction.
    +	 */
    +	protected abstract void invoke(TXN transaction, IN value) throws Exception;
    +
    +	/**
    +	 * Method that starts a new transaction.
    +	 *
    +	 * @return newly created transaction.
    +	 */
    +	protected abstract TXN beginTransaction() throws Exception;
    +
    +	/**
    +	 * Pre commit previously created transaction. Pre commit must make all of the necessary
steps to prepare the
    +	 * transaction for a commit that might happen in the future. After this point the transaction
might still be
    +	 * aborted, but underlying implementation must ensure that commit calls on already pre
committed transactions
    +	 * will always succeed.
    +	 *
    +	 * <p>Usually implementation involves flushing the data.
    +	 */
    +	protected abstract void preCommit(TXN transaction) throws Exception;
    +
    +	/**
    +	 * Commit a pre-committed transaction. If this method fail, Flink application will be
    +	 * restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will
be called again for the
    +	 * same transaction.
    +	 */
    +	protected abstract void commit(TXN transaction);
    +
    +	/**
    +	 * Invoked on recovered transactions after a failure. Must eventually succeed. If it
fails, Flink application will
    +	 * be restarted and it will be invoked again. If it does not succeed it means a data
loss will occur.
    +	 */
    +	protected void recoverAndCommit(TXN transaction) {
    +		commit(transaction);
    +	}
    +
    +	/**
    +	 * Abort a transaction.
    +	 */
    +	protected abstract void abort(TXN transaction);
    +
    +	/**
    +	 * Abort a transaction that was rejected by a coordinator after a failure.
    +	 */
    +	protected void recoverAndAbort(TXN transaction) {
    +		abort(transaction);
    +	}
    +
    +	// ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener}
------
    +
    +	@Override
    +	public final void invoke(IN value) throws Exception {
    +		invoke(currentTransaction, value);
    +	}
    +
    +	@Override
    +	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
    +		// the following scenarios are possible here
    +		//
    +		//  (1) there is exactly one transaction from the latest checkpoint that
    +		//      was triggered and completed. That should be the common case.
    +		//      Simply commit that transaction in that case.
    +		//
    +		//  (2) there are multiple pending transactions because one previous
    +		//      checkpoint was skipped. That is a rare case, but can happen
    +		//      for example when:
    +		//
    +		//        - the master cannot persist the metadata of the last
    +		//          checkpoint (temporary outage in the storage system) but
    +		//          could persist a successive checkpoint (the one notified here)
    +		//
    +		//        - other tasks could not persist their status during
    +		//          the previous checkpoint, but did not trigger a failure because they
    +		//          could hold onto their state and could successfully persist it in
    +		//          a successive checkpoint (the one notified here)
    +		//
    +		//      In both cases, the prior checkpoint never reach a committed state, but
    +		//      this checkpoint is always expected to subsume the prior one and cover all
    +		//      changes since the last successful one As a consequence, we need to commit
    +		//      all pending transactions.
    +		//
    +		//  (3) Multiple transactions are pending, but the checkpoint complete notification
    +		//      relates not to the latest. That is possible, because notification messages
    +		//      can be delayed (in an extreme case till arrive after a succeeding checkpoint
    +		//      was triggered) and because there can be concurrent overlapping checkpoints
    +		//      (a new one is started before the previous fully finished).
    +		//
    +		// ==> There should never be a case where we have no pending transaction here
    +		//
    +
    +		Iterator<TransactionAndCheckpoint<TXN>> pendingTransactionsIterator = pendingCommitTransactions.iterator();
    +		checkState(pendingTransactionsIterator.hasNext(), "checkpoint completed, but no transaction
pending");
    +
    +		List<TransactionAndCheckpoint<TXN>> remainingTransactions = new ArrayList<>();
    +
    +		for (TransactionAndCheckpoint<TXN> pendingTransaction : pendingCommitTransactions)
{
    +			if (pendingTransaction.checkpointId > checkpointId) {
    +				remainingTransactions.add(pendingTransaction);
    +				continue;
    +			}
    +
    +			LOG.info("{} - checkpoint {} complete, committing completed checkpoint transaction
{}",
    +				name(), checkpointId, pendingTransaction);
    +
    +			// If this fails, there is actually a data loss
    +			commit(pendingTransaction.transaction);
    +
    +			LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
    +		}
    +
    +		pendingCommitTransactions.clear();
    +		for (TransactionAndCheckpoint<TXN> remainingTransaction : remainingTransactions)
{
    +			pendingCommitTransactions.add(remainingTransaction);
    --- End diff --
    
    this seems redundant. It should be able to simply be removal while iterating, no?


> Add TwoPhaseCommitSinkFunction (implementing exactly-once semantic in generic way)
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-7210
>                 URL: https://issues.apache.org/jira/browse/FLINK-7210
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming Connectors
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> To implement exactly-once sink there is a re-occurring pattern for doing it - two phase
commit algorithm. It is used both in `BucketingSink` and in `Pravega` sink and it will be
used in `Kafka 0.11` connector. It would be good to extract this common logic into one class,
both to improve existing implementation (for exampe `Pravega`'s sink doesn't abort interrupted
transactions) and to make it easier for the users to implement their own custom exactly-once
sinks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message