flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-3972] subclasses of ResourceID may not to be serializable
Date Fri, 27 May 2016 16:34:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 08eca761e -> 4d41bd8fa


[FLINK-3972] subclasses of ResourceID may not to be serializable

WorkerTypes are currently subclasses of ResourceID. ResourceID
implements Serializable but not necessarily its subclasses. This may
lead to problems when these subclasses are used as ResourceIDs,
i.e. serialization may fail with NotSerializableExceptions. Currently,
subclasses are never send over the wire but they might be in the future.

Instead of relying on subclasses of ResourceID for the WorkerTypes, we
let them implement an interface to retrieve the ResourceID of a
WorkerType.

This closes #2037


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d41bd8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d41bd8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d41bd8f

Branch: refs/heads/master
Commit: 4d41bd8fa787315d203bb34973f8608d84c5b6ac
Parents: 08eca76
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed May 25 17:20:34 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri May 27 18:35:53 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/FlinkResourceManager.java  | 12 ++++++---
 .../clusterframework/types/ResourceID.java      | 27 ++++++++++++++------
 .../types/ResourceIDRetrievable.java            | 27 ++++++++++++++++++++
 .../flink/yarn/RegisteredYarnWorkerNode.java    | 14 ++++++++--
 .../flink/yarn/YarnContainerInLaunch.java       | 13 ++++++++--
 .../flink/yarn/YarnFlinkResourceManager.java    | 18 ++++++++++---
 6 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 8766e15..631f8d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -93,7 +94,7 @@ import static java.util.Objects.requireNonNull;
  * </ol>
  *
  */
-public abstract class FlinkResourceManager<WorkerType extends ResourceID> extends FlinkUntypedActor
{
+public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable>
extends FlinkUntypedActor {
 
 	/** The exit code with which the process is stopped in case of a fatal error */
 	protected static final int EXIT_CODE_FATAL_ERROR = -13;
@@ -356,7 +357,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID>
extend
 			WorkerType newWorker = workerRegistered(resourceID);
 			WorkerType oldWorker = registeredWorkers.put(resourceID, newWorker);
 			if (oldWorker != null) {
-				LOG.warn("Worker {} had been registered before.", resourceID);
+				LOG.warn("TaskManager {} had been registered before.", resourceID);
+			} else {
+				LOG.info("TaskManager {} has registered.", resourceID);
 			}
 			jobManager.tell(decorateMessage(
 				new RegisterResourceSuccessful(taskManager, msg)),
@@ -507,8 +510,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID>
extend
 
 					// put the consolidated TaskManagers into our bookkeeping
 					for (WorkerType worker : consolidated) {
-						registeredWorkers.put(worker, worker);
-						toHandle.remove(worker);
+						ResourceID resourceID = worker.getResourceID();
+						registeredWorkers.put(resourceID, worker);
+						toHandle.remove(resourceID);
 					}
 				}
 				catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index 9d82c76..8cf9ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
 /**
  * Class for Resource Ids assigned at the FlinkResourceManager.
  */
-public class ResourceID implements Serializable {
+public final class ResourceID implements ResourceIDRetrievable, Serializable {
 
 	private static final long serialVersionUID = 42L;
 
@@ -47,9 +47,13 @@ public class ResourceID implements Serializable {
 
 	@Override
 	public final boolean equals(Object o) {
-		return this == o ||
-				(o != null && o.getClass() == ResourceID.class && 
-					this.resourceId.equals(((ResourceID) o).resourceId));
+		if (this == o) {
+			return true;
+		} else if (o == null || o.getClass() != getClass()) {
+			return false;
+		} else {
+			return resourceId.equals(((ResourceID) o).resourceId);
+		}
 	}
 
 	@Override
@@ -59,12 +63,19 @@ public class ResourceID implements Serializable {
 
 	@Override
 	public String toString() {
-		return "ResourceID (" + resourceId + ')';
+		return "ResourceID{" +
+			"resourceId='" + resourceId + '\'' +
+			'}';
 	}
 
-	// ------------------------------------------------------------------------
-	//  factory
-	// ------------------------------------------------------------------------
+	/**
+	 * A ResourceID can always retrieve a ResourceID.
+	 * @return This instance.
+	 */
+	@Override
+	public ResourceID getResourceID() {
+		return this;
+	}
 	
 	/**
 	 * Generate a random resource id.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
new file mode 100644
index 0000000..b45d53c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.clusterframework.types;
+
+/**
+ * An interface to retrieve the ResourceID of an object.
+ */
+public interface ResourceIDRetrievable {
+
+	ResourceID getResourceID();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index a6c094d..974c4df 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
@@ -27,14 +28,18 @@ import static java.util.Objects.requireNonNull;
 /**
  * A representation of a registered Yarn container managed by the {@link YarnFlinkResourceManager}.
  */
-public class RegisteredYarnWorkerNode extends ResourceID {
+public class RegisteredYarnWorkerNode implements ResourceIDRetrievable {
+
 
 	/** The container on which the worker runs */
 	private final Container yarnContainer;
 
+	/** The resource id associated with this worker type */
+	private final ResourceID resourceID;
+
 	public RegisteredYarnWorkerNode(Container yarnContainer) {
-		super(yarnContainer.getId().toString());
 		this.yarnContainer = requireNonNull(yarnContainer);
+		this.resourceID = YarnFlinkResourceManager.extractResourceID(yarnContainer);
 	}
 
 	public Container yarnContainer() {
@@ -49,4 +54,9 @@ public class RegisteredYarnWorkerNode extends ResourceID {
 			"yarnContainer=" + yarnContainer +
 			'}';
 	}
+
+	@Override
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
index 03c5b3a..370df26 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
@@ -27,20 +28,23 @@ import static java.util.Objects.requireNonNull;
  * This class describes a container in which a TaskManager is being launched (or
  * has been launched) but where the TaskManager has not properly registered, yet.
  */
-public class YarnContainerInLaunch extends ResourceID {
+public class YarnContainerInLaunch implements ResourceIDRetrievable {
 
 	private final Container container;
 
 	private final long timestamp;
 
+	/** The resource id associated with this worker type */
+	private final ResourceID resourceID;
+
 	public YarnContainerInLaunch(Container container) {
 		this(container, System.currentTimeMillis());
 	}
 
 	public YarnContainerInLaunch(Container container, long timestamp) {
-		super(container.getId().toString());
 		this.container = requireNonNull(container);
 		this.timestamp = timestamp;
+		this.resourceID = YarnFlinkResourceManager.extractResourceID(container);
 	}
 
 	// ------------------------------------------------------------------------
@@ -59,4 +63,9 @@ public class YarnContainerInLaunch extends ResourceID {
 	public String toString() {
 		return "ContainerInLaunch @ " + timestamp + ": " + container;
 	}
+
+	@Override
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d41bd8f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 71fc371..da56ff8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -215,7 +215,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			final long now = System.currentTimeMillis();
 			for (Container c : containersFromPreviousAttempts) {
 				YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(c, now);
-				containersInLaunch.put(containerInLaunch, containerInLaunch);
+				containersInLaunch.put(containerInLaunch.getResourceID(), containerInLaunch);
 			}
 
 			// adjust the progress indicator
@@ -388,7 +388,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			if (numRegistered + containersInLaunch.size() < numRequired) {
 				// start a TaskManager
 				final YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(container);
-				containersInLaunch.put(containerInLaunch, containerInLaunch);
+				final ResourceID resourceID = containerInLaunch.getResourceID();
+				containersInLaunch.put(resourceID, containerInLaunch);
 
 				String message = "Launching TaskManager in container " + containerInLaunch
 					+ " on host " + container.getNodeId().getHost();
@@ -398,12 +399,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 				try {
 					// set a special environment variable to uniquely identify this container
 					taskManagerLaunchContext.getEnvironment()
-						.put(ENV_FLINK_CONTAINER_ID, containerInLaunch.getResourceIdString());
+						.put(ENV_FLINK_CONTAINER_ID, resourceID.getResourceIdString());
 					nodeManagerClient.startContainer(container, taskManagerLaunchContext);
 				}
 				catch (Throwable t) {
 					// failed to launch the container
-					containersInLaunch.remove(containerInLaunch);
+					containersInLaunch.remove(resourceID);
 
 					// return container, a new one will be requested eventually
 					LOG.error("Could not start TaskManager in container " + containerInLaunch, t);
@@ -516,6 +517,15 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Extracts a unique ResourceID from the Yarn Container.
+	 * @param container The Yarn container
+	 * @return The ResourceID for the container
+	 */
+	static ResourceID extractResourceID(Container container) {
+		return new ResourceID(container.getId().toString());
+	}
+
 	private void updateProgress() {
 		final int required = getDesignatedWorkerPoolSize();
 		final int available = getNumberOfRegisteredTaskManagers() + containersInLaunch.size();


Mime
View raw message