flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/4] flink git commit: [FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkFunction
Date Tue, 29 Aug 2017 12:46:58 GMT
[FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/959d54fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/959d54fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/959d54fc

Branch: refs/heads/master
Commit: 959d54fc828691759f15f2e83c0c123e9da6e782
Parents: ac72360
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Mon Aug 21 16:53:07 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunction.java        | 57 ++++++++++++++++----
 .../sink/TwoPhaseCommitSinkFunctionTest.java    |  4 +-
 2 files changed, 48 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index b73272d..77f74fe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -49,22 +50,25 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * @param <IN> Input type for {@link SinkFunction}.
  * @param <TXN> Transaction to store all of the information required to handle a transaction.
+ * @param <CONTEXT> Context that will be shared across all invocations for the given
{@link TwoPhaseCommitSinkFunction}
+ *                 instance. Context is created once
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
+public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		extends RichSinkFunction<IN>
 		implements CheckpointedFunction, CheckpointListener {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-	protected final ListStateDescriptor<State<TXN>> stateDescriptor;
+	protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
 
 	protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
 
 	@Nullable
 	protected TXN currentTransaction;
+	protected Optional<CONTEXT> userContext;
 
-	protected ListState<State<TXN>> state;
+	protected ListState<State<TXN, CONTEXT>> state;
 
 	/**
 	 * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities
for using this
@@ -77,8 +81,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 * </pre>
 	 * @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened
transactions.
 	 */
-	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation)
{
-		this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
+	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN, CONTEXT>> stateTypeInformation)
{
+		this(new ListStateDescriptor<State<TXN, CONTEXT>>("state", stateTypeInformation));
 	}
 
 	/**
@@ -86,10 +90,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 *
 	 * @param stateDescriptor descriptor for transactions POJO.
 	 */
-	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor)
{
+	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor)
{
 		this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
 	}
 
+	protected Optional<CONTEXT> initializeUserContext() {
+		return Optional.empty();
+	}
+
+	protected Optional<CONTEXT> getUserContext() {
+		return userContext;
+	}
+
 	// ------ methods that should be implemented in child class to support two phase commit
algorithm ------
 
 	/**
@@ -142,6 +154,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		abort(transaction);
 	}
 
+	protected void finishRecoveringContext() {
+	}
+
 	// ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener}
------
 
 	@Override
@@ -226,7 +241,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		state.clear();
 		state.add(new State<>(
 			this.currentTransaction,
-			new ArrayList<>(pendingCommitTransactions.values())));
+			new ArrayList<>(pendingCommitTransactions.values()),
+			userContext));
 	}
 
 	@Override
@@ -250,7 +266,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		if (context.isRestored()) {
 			LOG.info("{} - restoring state", name());
 
-			for (State<TXN> operatorState : state.get()) {
+			for (State<TXN, CONTEXT> operatorState : state.get()) {
+				userContext = operatorState.getContext();
 				List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
 				for (TXN recoveredTransaction : recoveredTransactions) {
 					// If this fails, there is actually a data loss
@@ -260,9 +277,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 				recoverAndAbort(operatorState.getPendingTransaction());
 				LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
+
+				if (userContext.isPresent()) {
+					finishRecoveringContext();
+				}
 			}
-		} else {
+		}
+		// if in restore we didn't get any userContext or we are initializing from scratch
+		if (userContext == null) {
 			LOG.info("{} - no state to restore {}", name());
+
+			userContext = initializeUserContext();
 		}
 		this.pendingCommitTransactions.clear();
 
@@ -291,14 +316,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	/**
 	 * State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
 	 */
-	public static class State<TXN> {
+	public static class State<TXN, CONTEXT> {
 		protected TXN pendingTransaction;
 		protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+		protected Optional<CONTEXT> context;
 
 		public State() {
 		}
 
-		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions, Optional<CONTEXT>
context) {
+			this.context = requireNonNull(context, "context is null");
 			this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
 			this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions
is null");
 		}
@@ -318,5 +345,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
 			this.pendingCommitTransactions = pendingCommitTransactions;
 		}
+
+		public Optional<CONTEXT> getContext() {
+			return context;
+		}
+
+		public void setContext(Optional<CONTEXT> context) {
+			this.context = context;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index f2fcb96..b9097d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -120,12 +120,12 @@ public class TwoPhaseCommitSinkFunctionTest {
 		assertEquals(expectedValues, actualValues);
 	}
 
-	private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String,
FileTransaction> {
+	private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String,
FileTransaction, Void> {
 		private final File tmpDirectory;
 		private final File targetDirectory;
 
 		public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
-			super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
+			super(TypeInformation.of(new TypeHint<State<FileTransaction, Void>>() {}));
 
 			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
 				throw new IllegalArgumentException();


Mime
View raw message