flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/5] flink git commit: [FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry
Date Sun, 14 May 2017 11:50:08 GMT
Repository: flink
Updated Branches:
  refs/heads/master b54f44888 -> 44fb035e0


[FLINK-6534] [checkpoint] Use async IO to dispose state in SharedStateRegistry


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fb035e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fb035e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fb035e

Branch: refs/heads/master
Commit: 44fb035e021259986f0b1aac4126143510a758e5
Parents: 098e46f
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Fri May 12 16:01:05 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/AbstractCompletedCheckpointStore.java   | 6 ++++++
 .../runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java  | 2 ++
 .../org/apache/flink/runtime/state/SharedStateRegistry.java    | 6 +++++-
 3 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
index f42fd06..bf70501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
+import java.util.concurrent.Executor;
+
 /**
  * This is the base class that provides implementation of some aspects common for all
  * {@link CompletedCheckpointStore}s.
@@ -34,4 +36,8 @@ public abstract class AbstractCompletedCheckpointStore implements CompletedCheck
 	public AbstractCompletedCheckpointStore() {
 		this.sharedStateRegistry = new SharedStateRegistry();
 	}
+
+	public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
+		this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 52a4eea..c8c68bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -106,6 +106,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
 			RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
 			Executor executor) throws Exception {
 
+		super(executor);
+
 		checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
 		checkNotNull(stateStorage, "State storage");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44fb035e/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 9cfdec7..f9161b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -45,8 +45,12 @@ public class SharedStateRegistry {
 	private final Executor asyncDisposalExecutor;
 
 	public SharedStateRegistry() {
+		this(Executors.directExecutor());
+	}
+
+	public SharedStateRegistry(Executor asyncDisposalExecutor) {
 		this.registeredStates = new HashMap<>();
-		this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
+		this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
 	}
 
 	/**


Mime
View raw message