flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/3] flink git commit: [FLINK-5508] [mesos] Introduce ZooKeeperUtilityFactory to create ZooKeeper utility classes
Date Mon, 23 Jan 2017 11:05:40 GMT
[FLINK-5508] [mesos] Introduce ZooKeeperUtilityFactory to create ZooKeeper utility classes

This commit adds utility classes to abstract the CuratorFramework dependency from ZooKeeper
utility classes away. That way it is possible for modules outside of flink-runtime to use
these utility classes without facing the problem of a relocated curator dependency.

Address PR comments

This closes #3157.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df3f1197
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df3f1197
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df3f1197

Branch: refs/heads/master
Commit: df3f11979ed2895ed548766bac061e6cda8f6881
Parents: 04f4ecc
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Jan 18 15:06:12 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Jan 23 10:20:54 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  47 ++++---
 .../services/MesosServices.java                 |  51 ++++++++
 .../services/MesosServicesUtils.java            |  57 +++++++++
 .../services/StandaloneMesosServices.java       |  39 ++++++
 .../services/ZooKeeperMesosServices.java        |  70 +++++++++++
 .../store/ZooKeeperMesosWorkerStore.java        |  93 ++------------
 .../MesosFlinkResourceManagerTest.java          |  12 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  10 +-
 .../runtime/zookeeper/ZooKeeperSharedCount.java |  53 ++++++++
 .../runtime/zookeeper/ZooKeeperSharedValue.java |  53 ++++++++
 .../zookeeper/ZooKeeperUtilityFactory.java      | 123 +++++++++++++++++++
 .../zookeeper/ZooKeeperVersionedValue.java      |  43 +++++++
 12 files changed, 540 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index c9b6eed..de76d8e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -26,18 +26,16 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
-import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
-import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
 import org.apache.flink.mesos.util.MesosArtifactServer;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.mesos.util.ZooKeeperUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
@@ -48,7 +46,6 @@ import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
 import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -73,7 +70,6 @@ import java.net.URL;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -201,6 +197,7 @@ public class MesosApplicationMasterRunner {
 		MesosArtifactServer artifactServer = null;
 		ExecutorService futureExecutor = null;
 		ExecutorService ioExecutor = null;
+		MesosServices mesosServices = null;
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------
@@ -224,6 +221,8 @@ public class MesosApplicationMasterRunner {
 				numberProcessors,
 				new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
 
+			mesosServices = MesosServicesUtils.createMesosServices(config);
+
 			// TM configuration
 			final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
 
@@ -316,7 +315,9 @@ public class MesosApplicationMasterRunner {
 			LOG.debug("Starting Mesos Flink Resource Manager");
 
 			// create the worker store to persist task information across restarts
-			MesosWorkerStore workerStore = createWorkerStore(config, ioExecutor);
+			MesosWorkerStore workerStore = mesosServices.createMesosWorkerStore(
+				config,
+				ioExecutor);
 
 			// we need the leader retrieval service here to be informed of new
 			// leader session IDs, even though there can be only one leader ever
@@ -394,6 +395,14 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
+			if (mesosServices != null) {
+				try {
+					mesosServices.close(false);
+				} catch (Throwable tt) {
+					LOG.error("Error closing the mesos services.", tt);
+				}
+			}
+
 			return INIT_ERROR_EXIT_CODE;
 		}
 
@@ -424,6 +433,12 @@ public class MesosApplicationMasterRunner {
 			futureExecutor,
 			ioExecutor);
 
+		try {
+			mesosServices.close(true);
+		} catch (Throwable t) {
+			LOG.error("Failed to clean up and close MesosServices.", t);
+		}
+
 		return 0;
 	}
 
@@ -498,24 +513,6 @@ public class MesosApplicationMasterRunner {
 		return mesos;
 	}
 
-	private static MesosWorkerStore createWorkerStore(Configuration flinkConfig, Executor executor)
throws Exception {
-		MesosWorkerStore workerStore;
-		HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig);
-		if (recoveryMode == HighAvailabilityMode.NONE) {
-			workerStore = new StandaloneMesosWorkerStore();
-		}
-		else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
-			// note: the store is responsible for closing the client.
-			CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
-			workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig, executor);
-		}
-		else {
-			throw new IllegalConfigurationException("Unexpected recovery mode '" + recoveryMode +
".");
-		}
-
-		return workerStore;
-	}
-
 	/**
 	 * Generate a container specification as a TaskManager template.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
new file mode 100644
index 0000000..5655bfc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServices.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Service factory interface for Mesos.
+ */
+public interface MesosServices {
+
+	/**
+	 * Creates a {@link MesosWorkerStore} which is used to persist mesos worker in high availability
+	 * mode.
+	 *
+	 * @param configuration to be used
+	 * @param executor to run asynchronous tasks
+	 * @return a mesos worker store
+	 * @throws Exception if the mesos worker store could not be created
+	 */
+	MesosWorkerStore createMesosWorkerStore(
+		Configuration configuration,
+		Executor executor) throws Exception;
+
+	/**
+	 * Closes all state maintained by the mesos services implementation.
+	 *
+	 * @param cleanup is true if a cleanup shall be performed
+	 * @throws Exception if the closing operation failed
+	 */
+	void close(boolean cleanup) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
new file mode 100644
index 0000000..13eb759
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+
+public class MesosServicesUtils {
+
+	/**
+	 * Creates a {@link MesosServices} instance depending on the high availability settings.
+	 *
+	 * @param configuration containing the high availability settings
+	 * @return a mesos services instance
+	 * @throws Exception if the mesos services instance could not be created
+	 */
+	public static MesosServices createMesosServices(Configuration configuration) throws Exception
{
+		HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
+
+		switch (highAvailabilityMode) {
+			case NONE:
+				return new StandaloneMesosServices();
+
+			case ZOOKEEPER:
+				final String zkMesosRootPath = configuration.getString(
+					ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
+					ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH);
+
+				ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
+					configuration,
+					zkMesosRootPath);
+
+				return new ZooKeeperMesosServices(zooKeeperUtilityFactory);
+
+			default:
+				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
new file mode 100644
index 0000000..dfbc2c3
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link MesosServices} implementation for the standalone mode.
+ */
+public class StandaloneMesosServices implements MesosServices {
+
+	@Override
+	public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor)
{
+		return new StandaloneMesosWorkerStore();
+	}
+
+	@Override
+	public void close(boolean cleanup) throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
new file mode 100644
index 0000000..2883e4f
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.services;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link MesosServices} implementation for the ZooKeeper high availability based mode.
+ */
+public class ZooKeeperMesosServices implements MesosServices {
+
+	// Factory to create ZooKeeper utility classes
+	private final ZooKeeperUtilityFactory zooKeeperUtilityFactory;
+
+	public ZooKeeperMesosServices(ZooKeeperUtilityFactory zooKeeperUtilityFactory) {
+		this.zooKeeperUtilityFactory = Preconditions.checkNotNull(zooKeeperUtilityFactory);
+	}
+
+	@Override
+	public MesosWorkerStore createMesosWorkerStore(Configuration configuration, Executor executor)
throws Exception {
+		RetrievableStateStorageHelper<MesosWorkerStore.Worker> stateStorageHelper =
+			ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");
+
+		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> zooKeeperStateHandleStore = zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
+			"/workers",
+			stateStorageHelper,
+			executor);
+
+		ZooKeeperSharedValue frameworkId = zooKeeperUtilityFactory.createSharedValue("/frameworkId",
new byte[0]);
+		ZooKeeperSharedCount totalTaskCount = zooKeeperUtilityFactory.createSharedCount("/taskCount",
0);
+
+		return new ZooKeeperMesosWorkerStore(
+			zooKeeperStateHandleStore,
+			frameworkId,
+			totalTaskCount);
+	}
+
+	@Override
+	public void close(boolean cleanup) throws Exception {
+		// this also closes the underlying CuratorFramework instance
+		zooKeeperUtilityFactory.close(cleanup);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index cd88979..5246b94 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -18,17 +18,12 @@
 
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
-import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
+import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -39,7 +34,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -53,56 +47,26 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 	private final Object startStopLock = new Object();
 
-	/** Root store path in ZK. */
-	private final String storePath;
-
-	/** Client (not a namespace facade) */
-	private final CuratorFramework client;
-
 	/** Flag indicating whether this instance is running. */
 	private boolean isRunning;
 
 	/** A persistent value of the assigned framework ID */
-	private final SharedValue frameworkIdInZooKeeper;
+	private final ZooKeeperSharedValue frameworkIdInZooKeeper;
 
 	/** A persistent count of all tasks created, for generating unique IDs */
-	private final SharedCount totalTaskCountInZooKeeper;
+	private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
 
 	/** A persistent store of serialized workers */
 	private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
 
 	@SuppressWarnings("unchecked")
-	ZooKeeperMesosWorkerStore(
-			CuratorFramework client,
-			String storePath,
-			RetrievableStateStorageHelper<Worker> stateStorage,
-			Executor executor) throws Exception {
-		checkNotNull(storePath, "storePath");
-		checkNotNull(stateStorage, "stateStorage");
-
-		// Keep a reference to the original client and not the namespace facade. The namespace
-		// facade cannot be closed.
-		this.client = checkNotNull(client, "client");
-		this.storePath = storePath;
-
-		// All operations will have the given path as root
-		client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient());
-		CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath);
-
-		// Track the assignd framework ID.
-		frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]);
-
-		// Keep a count of all tasks created ever, as the basis for a unique ID.
-		totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0);
-
-		// Keep track of the workers in state handle storage.
-		facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient());
-		CuratorFramework storeFacade = client.usingNamespace(facade.getNamespace() + "/workers");
-
-		// using late-binding as a workaround for shaded curator dependency of flink-runtime.
-		this.workersInZooKeeper = ZooKeeperStateHandleStore.class
-			.getConstructor(CuratorFramework.class, RetrievableStateStorageHelper.class, Executor.class)
-			.newInstance(storeFacade, stateStorage, executor);
+	public ZooKeeperMesosWorkerStore(
+		ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper,
+		ZooKeeperSharedValue frameworkIdInZooKeeper,
+		ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
+		this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
+		this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
+		this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
 	}
 
 	@Override
@@ -124,10 +88,8 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 
 				if(cleanup) {
 					workersInZooKeeper.removeAndDiscardAllState();
-					client.delete().deletingChildrenIfNeeded().forPath(storePath);
 				}
 
-				client.close();
 				isRunning = false;
 			}
 		}
@@ -188,7 +150,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 			int nextCount;
 			boolean success;
 			do {
-				VersionedValue<Integer> count = totalTaskCountInZooKeeper.getVersionedValue();
+				ZooKeeperVersionedValue<Integer> count = totalTaskCountInZooKeeper.getVersionedValue();
 				nextCount = count.getValue() + 1;
 				success = totalTaskCountInZooKeeper.trySetCount(count, nextCount);
 			}
@@ -275,33 +237,4 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		checkNotNull(taskID, "taskID");
 		return String.format("/%s", taskID.getValue());
 	}
-
-	/**
-	 * Create the ZooKeeper-backed Mesos worker store.
-	 * @param client the curator client.
-	 * @param configuration the Flink configuration.
-	 * @return a worker store.
-	 * @throws Exception
-	 */
-	public static ZooKeeperMesosWorkerStore createMesosWorkerStore(
-			CuratorFramework client,
-			Configuration configuration,
-			Executor executor) throws Exception {
-
-		checkNotNull(configuration, "Configuration");
-
-		RetrievableStateStorageHelper<MesosWorkerStore.Worker> stateStorage =
-			ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");
-
-		String zooKeeperMesosWorkerStorePath = configuration.getString(
-			ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH
-		);
-
-		return new ZooKeeperMesosWorkerStore(
-			client,
-			zooKeeperMesosWorkerStorePath,
-			stateStorage,
-			executor);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 93ccf68..dcf6a82 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.TestLogger;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
@@ -73,15 +74,18 @@ import static org.mockito.Mockito.*;
 /**
  * General tests for the Mesos resource manager component.
  */
-public class MesosFlinkResourceManagerTest {
+public class MesosFlinkResourceManagerTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MesosFlinkResourceManagerTest.class);
 
 	private static ActorSystem system;
 
-	private static Configuration config = new Configuration() {{
-		setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
-		setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
+	private static Configuration config = new Configuration() {
+		private static final long serialVersionUID = -952579203067648838L;
+
+		{
+			setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+			setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
 	}};
 
 	@BeforeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 621edcb..1b73dc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.ConfigurationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
@@ -65,6 +66,7 @@ public class ZooKeeperUtils {
 	 * @return {@link CuratorFramework} instance
 	 */
 	public static CuratorFramework startCuratorFramework(Configuration configuration) {
+		Preconditions.checkNotNull(configuration, "configuration");
 		String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
 
 		if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
@@ -376,15 +378,19 @@ public class ZooKeeperUtils {
 		}
 	}
 
-	private static String generateZookeeperPath(String root, String namespace) {
+	public static String generateZookeeperPath(String root, String namespace) {
 		if (!namespace.startsWith("/")) {
-			namespace = "/" + namespace;
+			namespace = '/' + namespace;
 		}
 
 		if (namespace.endsWith("/")) {
 			namespace = namespace.substring(0, namespace.length() - 1);
 		}
 
+		if (root.endsWith("/")) {
+			root = root.substring(0, root.length() - 1);
+		}
+
 		return root + namespace;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
new file mode 100644
index 0000000..d6afbba
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedCount.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class for a {@link SharedCount} so that we don't expose a curator dependency in
our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperSharedCount {
+
+	private final SharedCount sharedCount;
+
+	public ZooKeeperSharedCount(SharedCount sharedCount) {
+		this.sharedCount = Preconditions.checkNotNull(sharedCount);
+	}
+
+	public void start() throws Exception {
+		sharedCount.start();
+	}
+
+	public void close() throws IOException {
+		sharedCount.close();
+	}
+
+	public ZooKeeperVersionedValue<Integer> getVersionedValue() {
+		return new ZooKeeperVersionedValue<>(sharedCount.getVersionedValue());
+	}
+
+	public boolean trySetCount(ZooKeeperVersionedValue<Integer> previous, int newCount)
throws Exception {
+		return sharedCount.trySetCount(previous.getVersionedValue(), newCount);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
new file mode 100644
index 0000000..fbe818d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperSharedValue.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Wrapper class for a {@link SharedValue} so that we don't expose a curator dependency in
our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperSharedValue {
+
+	private final SharedValue sharedValue;
+
+	public ZooKeeperSharedValue(SharedValue sharedValue) {
+		this.sharedValue = Preconditions.checkNotNull(sharedValue);
+	}
+
+	public void start() throws Exception {
+		sharedValue.start();
+	}
+
+	public void close() throws IOException {
+		sharedValue.close();
+	}
+
+	public void setValue(byte[] newValue) throws Exception {
+		sharedValue.setValue(newValue);
+	}
+
+	public byte[] getValue() {
+		return sharedValue.getValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
new file mode 100644
index 0000000..d3b7dc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ * Creates ZooKeeper utility classes without exposing the {@link CuratorFramework} dependency.
The
+ * curator framework is cached in this instance and shared among all created ZooKeeper utility
+ * instances. This requires that the utility classes DO NOT close the provided curator framework.
+ *
+ * <p>The curator framework is closed by calling the {@link #close(boolean)} method.
+ */
+public class ZooKeeperUtilityFactory {
+
+	private final CuratorFramework root;
+
+	// Facade bound to the provided path
+	private final CuratorFramework facade;
+
+	public ZooKeeperUtilityFactory(Configuration configuration, String path) throws Exception
{
+		Preconditions.checkNotNull(path, "path");
+
+		root = ZooKeeperUtils.startCuratorFramework(configuration);
+
+		root.newNamespaceAwareEnsurePath(path).ensure(root.getZookeeperClient());
+		facade = root.usingNamespace(ZooKeeperUtils.generateZookeeperPath(root.getNamespace(),
path));
+	}
+
+	/**
+	 * Closes the ZooKeeperUtilityFactory. This entails closing the cached {@link CuratorFramework}
+	 * instance. If cleanup is true, then the initial path and all its children are deleted.
+	 *
+	 * @param cleanup deletes the initial path and all of its children to clean up
+	 * @throws Exception when deleting the znodes
+	 */
+	public void close(boolean cleanup) throws Exception {
+		if (cleanup) {
+			facade.delete().deletingChildrenIfNeeded().forPath("/");
+		}
+
+		root.close();
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperStateHandleStore} instance with the provided arguments.
+	 *
+	 * @param zkStateHandleStorePath specifying the path in ZooKeeper to store the state handles
to
+	 * @param stateStorageHelper storing the actual state data
+	 * @param executor to run asynchronous callbacks of the state handle store
+	 * @param <T> Type of the state to be stored
+	 * @return a ZooKeeperStateHandleStore instance
+	 * @throws Exception if ZooKeeper could not create the provided state handle store path
in
+	 *     ZooKeeper
+	 */
+	public <T extends Serializable> ZooKeeperStateHandleStore<T> createZooKeeperStateHandleStore(
+			String zkStateHandleStorePath,
+			RetrievableStateStorageHelper<T> stateStorageHelper,
+			Executor executor) throws Exception {
+
+		facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
+		CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
+			ZooKeeperUtils.generateZookeeperPath(
+				facade.getNamespace(),
+				zkStateHandleStorePath));
+
+		return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, stateStorageHelper,
executor);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperSharedValue} to store a shared value between multiple instances.
+	 *
+	 * @param path to the shared value in ZooKeeper
+	 * @param seedValue for the shared value
+	 * @return a shared value
+	 */
+	public ZooKeeperSharedValue createSharedValue(String path, byte[] seedValue) {
+		return new ZooKeeperSharedValue(
+			new SharedValue(
+				facade,
+				path,
+				seedValue));
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperSharedCount} to store a shared count between multiple instances.
+	 *
+	 * @param path to the shared count in ZooKeeper
+	 * @param seedCount for the shared count
+	 * @return a shared count
+	 */
+	public ZooKeeperSharedCount createSharedCount(String path, int seedCount) {
+		return new ZooKeeperSharedCount(
+			new SharedCount(
+				facade,
+				path,
+				seedCount));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3f1197/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
new file mode 100644
index 0000000..d23cfc0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperVersionedValue.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Wrapper class for a {@link VersionedValue} so that we don't expose a curator dependency
in our
+ * internal APIs. Such an exposure is problematic due to the relocation of curator.
+ */
+public class ZooKeeperVersionedValue<T> {
+
+	private final VersionedValue<T> versionedValue;
+
+	public ZooKeeperVersionedValue(VersionedValue<T> versionedValue) {
+		this.versionedValue = Preconditions.checkNotNull(versionedValue);
+	}
+
+	public T getValue() {
+		return versionedValue.getValue();
+	}
+
+	VersionedValue<T> getVersionedValue() {
+		return versionedValue;
+	}
+}


Mime
View raw message