flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWorkerStore
Date Mon, 23 Jan 2017 12:26:27 GMT
[FLINK-5495] [mesos] Provide executor to ZooKeeperMesosWorkerStore

The ZooKeeperMesosWorkerStore instantiates a ZooKeeperStateHandleStore which requires an
Executor instance. This executor is now given to the ZooKeeperMesosWorkerStore.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/404d4252
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/404d4252
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/404d4252

Branch: refs/heads/release-1.2
Commit: 404d425294184978979f79713727d871bf6516dd
Parents: 099cdd0
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Jan 16 14:14:18 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Jan 23 10:24:24 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  7 ++++---
 .../store/ZooKeeperMesosWorkerStore.java        | 21 ++++++++++++--------
 2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/404d4252/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 689c26a..c9b6eed 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -73,6 +73,7 @@ import java.net.URL;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -315,7 +316,7 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Mesos Flink Resource Manager");
 
 			// create the worker store to persist task information across restarts
-			MesosWorkerStore workerStore = createWorkerStore(config);
+			MesosWorkerStore workerStore = createWorkerStore(config, ioExecutor);
 
 			// we need the leader retrieval service here to be informed of new
 			// leader session IDs, even though there can be only one leader ever
@@ -497,7 +498,7 @@ public class MesosApplicationMasterRunner {
 		return mesos;
 	}
 
-	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception
{
+	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig, Executor executor)
throws Exception {
 		MesosWorkerStore workerStore;
 		HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig);
 		if (recoveryMode == HighAvailabilityMode.NONE) {
@@ -506,7 +507,7 @@ public class MesosApplicationMasterRunner {
 		else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
 			// note: the store is responsible for closing the client.
 			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
-			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);
+			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig, executor);
 		}
 		else {
 			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode +
".");

http://git-wip-us.apache.org/repos/asf/flink/blob/404d4252/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 551852e..cd88979 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -72,10 +73,10 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 	@SuppressWarnings("unchecked")
 	ZooKeeperMesosWorkerStore(
-		CuratorFramework client,
-		String storePath,
-		RetrievableStateStorageHelper<Worker> stateStorage
-	) throws Exception {
+			CuratorFramework client,
+			String storePath,
+			RetrievableStateStorageHelper<Worker> stateStorage,
+			Executor executor) throws Exception {
 		checkNotNull(storePath, "storePath");
 		checkNotNull(stateStorage, "stateStorage");
 
@@ -100,8 +101,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 		// using late-binding as a workaround for shaded curator dependency of flink-runtime.
 		this.workersInZooKeeper = ZooKeeperStateHandleStore.class
-			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class)
-			.newInstance(storeFacade, stateStorage);
+			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class, Executor.class)
+			.newInstance(storeFacade, stateStorage, executor);
 	}
 
 	@Override
@@ -284,7 +285,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 	 */
 	public static ZooKeeperMesosWorkerStore createMesosWorkerStore(
 			CuratorFramework client,
-			Configuration configuration) throws Exception {
+			Configuration configuration,
+			Executor executor) throws Exception {
 
 		checkNotNull(configuration, "Configuration");
 
@@ -297,6 +299,9 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		);
 
 		return new ZooKeeperMesosWorkerStore(
-			client, zooKeeperMesosWorkerStorePath, stateStorage);
+			client,
+			zooKeeperMesosWorkerStorePath,
+			stateStorage,
+			executor);
 	}
 }


Mime
View raw message