flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/4] flink git commit: [FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitSinkFunction
Date Tue, 29 Aug 2017 12:47:00 GMT
[FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitSinkFunction

Make sure that state fields are coupled together between checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac72360c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac72360c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac72360c

Branch: refs/heads/master
Commit: ac72360cc0e71d6f543d93c9c1f117babaa35799
Parents: 257a5a5
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Mon Aug 21 16:43:26 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunction.java        | 97 +++++++++++---------
 .../sink/TwoPhaseCommitSinkFunctionTest.java    |  4 +-
 2 files changed, 57 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 58532f5..b73272d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -57,15 +57,14 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-	protected final ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor;
-	protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor;
+	protected final ListStateDescriptor<State<TXN>> stateDescriptor;
 
 	protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
 
 	@Nullable
 	protected TXN currentTransaction;
-	protected ListState<TXN> pendingTransactionsState;
-	protected ListState<List<TXN>> pendingCommitTransactionsState;
+
+	protected ListState<State<TXN>> state;
 
 	/**
 	 * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities
for using this
@@ -73,32 +72,22 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 * {@link TypeInformation#of(TypeHint)}. Example:
 	 * <pre>
 	 * {@code
-	 * TwoPhaseCommitSinkFunction(
-	 *     TypeInformation.of(TXN.class),
-	 *     TypeInformation.of(new TypeHint<List<TXN>>() {}));
+	 * TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>()
{}));
 	 * }
 	 * </pre>
-	 * @param txnTypeInformation {@link TypeInformation} for transaction POJO.
-	 * @param txnListTypeInformation {@link TypeInformation} for mapping between checkpointId
and transaction.
+	 * @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened
transactions.
 	 */
-	public TwoPhaseCommitSinkFunction(
-			TypeInformation<TXN> txnTypeInformation,
-			TypeInformation<List<TXN>> txnListTypeInformation) {
-		this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation),
-			new ListStateDescriptor<>("pendingCommitTransactions", txnListTypeInformation));
+	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation)
{
+		this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
 	}
 
 	/**
 	 * Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors.
 	 *
-	 * @param pendingTransactionsDescriptor descriptor for transaction POJO.
-	 * @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId
and transaction POJO.
+	 * @param stateDescriptor descriptor for transactions POJO.
 	 */
-	public TwoPhaseCommitSinkFunction(
-			ListStateDescriptor<TXN> pendingTransactionsDescriptor,
-			ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor) {
-		this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor
is null");
-		this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor,
"pendingCommitTransactionsDescriptor is null");
+	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor)
{
+		this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
 	}
 
 	// ------ methods that should be implemented in child class to support two phase commit
algorithm ------
@@ -195,11 +184,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		// ==> There should never be a case where we have no pending transaction here
 		//
 
-		Iterator<Map.Entry<Long, TXN>> pendingTransactionsIterator = pendingCommitTransactions.entrySet().iterator();
-		checkState(pendingTransactionsIterator.hasNext(), "checkpoint completed, but no transaction
pending");
+		Iterator<Map.Entry<Long, TXN>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
+		checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction
pending");
 
-		while (pendingTransactionsIterator.hasNext()) {
-			Map.Entry<Long, TXN> entry = pendingTransactionsIterator.next();
+		while (pendingTransactionIterator.hasNext()) {
+			Map.Entry<Long, TXN> entry = pendingTransactionIterator.next();
 			Long pendingTransactionCheckpointId = entry.getKey();
 			TXN pendingTransaction = entry.getValue();
 			if (pendingTransactionCheckpointId > checkpointId) {
@@ -213,7 +202,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 			LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
 
-			pendingTransactionsIterator.remove();
+			pendingTransactionIterator.remove();
 		}
 	}
 
@@ -234,13 +223,10 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		currentTransaction = beginTransaction();
 		LOG.debug("{} - started new transaction '{}'", name(), currentTransaction);
 
-		pendingCommitTransactionsState.clear();
-		pendingCommitTransactionsState.add(new ArrayList<>(pendingCommitTransactions.values()));
-
-		pendingTransactionsState.clear();
-		// in case of failure we might not be able to abort currentTransaction. Let's store it
into the state
-		// so it can be aborted after a restart/crash
-		pendingTransactionsState.add(currentTransaction);
+		state.clear();
+		state.add(new State<>(
+			this.currentTransaction,
+			new ArrayList<>(pendingCommitTransactions.values())));
 	}
 
 	@Override
@@ -259,24 +245,21 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		// we can have more than one transaction to check in case of a scale-in event, or
 		// for the reasons discussed in the 'notifyCheckpointComplete()' method.
 
-		pendingTransactionsState = context.getOperatorStateStore().getListState(pendingTransactionsDescriptor);
-		pendingCommitTransactionsState = context.getOperatorStateStore().getListState(pendingCommitTransactionsDescriptor);
+		state = context.getOperatorStateStore().getListState(stateDescriptor);
 
 		if (context.isRestored()) {
 			LOG.info("{} - restoring state", name());
 
-			for (List<TXN> recoveredTransactions : pendingCommitTransactionsState.get()) {
+			for (State<TXN> operatorState : state.get()) {
+				List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
 				for (TXN recoveredTransaction : recoveredTransactions) {
 					// If this fails, there is actually a data loss
 					recoverAndCommit(recoveredTransaction);
 					LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
 				}
-			}
 
-			// Explicitly abort transactions that could be not closed cleanly
-			for (TXN pendingTransaction : pendingTransactionsState.get()) {
-				recoverAndAbort(pendingTransaction);
-				LOG.info("{} aborted recovered transaction {}", name(), pendingTransaction);
+				recoverAndAbort(operatorState.getPendingTransaction());
+				LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
 			}
 		} else {
 			LOG.info("{} - no state to restore {}", name());
@@ -304,4 +287,36 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 			getRuntimeContext().getIndexOfThisSubtask(),
 			getRuntimeContext().getNumberOfParallelSubtasks());
 	}
+
+	/**
+	 * State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
+	 */
+	public static class State<TXN> {
+		protected TXN pendingTransaction;
+		protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+
+		public State() {
+		}
+
+		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+			this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
+			this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions
is null");
+		}
+
+		public TXN getPendingTransaction() {
+			return pendingTransaction;
+		}
+
+		public void setPendingTransaction(TXN pendingTransaction) {
+			this.pendingTransaction = pendingTransaction;
+		}
+
+		public List<TXN> getPendingCommitTransactions() {
+			return pendingCommitTransactions;
+		}
+
+		public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
+			this.pendingCommitTransactions = pendingCommitTransactions;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 9d01e74..f2fcb96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -125,9 +125,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 		private final File targetDirectory;
 
 		public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
-			super(
-				TypeInformation.of(FileTransaction.class),
-				TypeInformation.of(new TypeHint<List<FileTransaction>>() {}));
+			super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
 
 			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
 				throw new IllegalArgumentException();


Mime
View raw message