flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/5] flink git commit: [FLINK-7805][flip6] Recover YARN containers after AM restart.
Date Wed, 28 Feb 2018 22:02:44 GMT
[FLINK-7805][flip6] Recover YARN containers after AM restart.

Recover previously running containers after a restart of the ApplicationMaster.
This is a port of a feature that was already implemented prior to FLIP-6.
Extract RegisterApplicationMasterResponseReflector class into separate file.

This closes #5597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45397fe9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45397fe9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45397fe9

Branch: refs/heads/master
Commit: 45397fe974e1390cd39a34fc2eb216f3771ddf06
Parents: 035257e
Author: gyao <gary@data-artisans.com>
Authored: Wed Feb 28 13:20:23 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Feb 28 18:41:53 2018 +0100

----------------------------------------------------------------------
 ...isterApplicationMasterResponseReflector.java | 102 ++++++++++++++++
 .../flink/yarn/YarnFlinkResourceManager.java    |  52 ---------
 .../apache/flink/yarn/YarnResourceManager.java  |  17 ++-
 ...rApplicationMasterResponseReflectorTest.java | 117 +++++++++++++++++++
 4 files changed, 235 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
new file mode 100644
index 0000000..13b5745
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Looks up the method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}
+ * once and saves the method. This saves computation time on subsequent calls.
+ */
+class RegisterApplicationMasterResponseReflector {
+
+	private final Logger logger;
+
+	/**
+	 * Reflected method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
+	 */
+	private Method method;
+
+	RegisterApplicationMasterResponseReflector(final Logger log) {
+		this(log, RegisterApplicationMasterResponse.class);
+	}
+
+	@VisibleForTesting
+	RegisterApplicationMasterResponseReflector(final Logger log, final Class<?> clazz)
{
+		this.logger = requireNonNull(log);
+		requireNonNull(clazz);
+
+		try {
+			method = clazz.getMethod("getContainersFromPreviousAttempts");
+		} catch (NoSuchMethodException e) {
+			// that happens in earlier Hadoop versions (pre 2.2)
+			logger.info("Cannot reconnect to previously allocated containers. " +
+				"This YARN version does not support 'getContainersFromPreviousAttempts()'");
+		}
+	}
+
+	/**
+	 * Checks if a YARN application still has registered containers. If the application master
+	 * registered at the ResourceManager for the first time, this list will be empty. If the
+	 * application master registered a repeated time (after a failure and recovery), this list
+	 * will contain the containers that were previously allocated.
+	 *
+	 * @param response The response object from the registration at the ResourceManager.
+	 * @return A list with containers from previous application attempt.
+	 */
+	List<Container> getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse
response) {
+		return getContainersFromPreviousAttemptsUnsafe(response);
+	}
+
+	/**
+	 * Same as {@link #getContainersFromPreviousAttempts(RegisterApplicationMasterResponse)}
but
+	 * allows to pass objects that are not of type {@link RegisterApplicationMasterResponse}.
+	 */
+	@VisibleForTesting
+	List<Container> getContainersFromPreviousAttemptsUnsafe(final Object response) {
+		if (method != null && response != null) {
+			try {
+				@SuppressWarnings("unchecked")
+				final List<Container> containers = (List<Container>) method.invoke(response);
+				if (containers != null && !containers.isEmpty()) {
+					return containers;
+				}
+			} catch (Exception t) {
+				logger.error("Error invoking 'getContainersFromPreviousAttempts()'", t);
+			}
+		}
+
+		return Collections.emptyList();
+	}
+
+	@VisibleForTesting
+	Method getMethod() {
+		return method;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 4d8142f..8e686bb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -47,10 +47,8 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.slf4j.Logger;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -616,56 +614,6 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		}
 	}
 
-	/**
-	 * Looks up the getContainersFromPreviousAttempts method on RegisterApplicationMasterResponse
-	 * once and saves the method. This saves computation time on the sequent calls.
-	 */
-	private static class RegisterApplicationMasterResponseReflector {
-
-		private Logger logger;
-		private Method method;
-
-		public RegisterApplicationMasterResponseReflector(Logger log) {
-			this.logger = log;
-
-			try {
-				method = RegisterApplicationMasterResponse.class
-					.getMethod("getContainersFromPreviousAttempts");
-
-			} catch (NoSuchMethodException e) {
-				// that happens in earlier Hadoop versions
-				logger.info("Cannot reconnect to previously allocated containers. " +
-					"This YARN version does not support 'getContainersFromPreviousAttempts()'");
-			}
-		}
-
-		/**
-		 * Checks if a YARN application still has registered containers. If the application master
-		 * registered at the ResourceManager for the first time, this list will be empty. If the
-		 * application master registered a repeated time (after a failure and recovery), this list
-		 * will contain the containers that were previously allocated.
-		 *
-		 * @param response The response object from the registration at the ResourceManager.
-		 * @return A list with containers from previous application attempt.
-		 */
-		private List<Container> getContainersFromPreviousAttempts(RegisterApplicationMasterResponse
response) {
-			if (method != null && response != null) {
-				try {
-					@SuppressWarnings("unchecked")
-					List<Container> list = (List<Container>) method.invoke(response);
-					if (list != null && !list.isEmpty()) {
-						return list;
-					}
-				} catch (Throwable t) {
-					logger.error("Error invoking 'getContainersFromPreviousAttempts()'", t);
-				}
-			}
-
-			return Collections.emptyList();
-		}
-
-	}
-
 	// ------------------------------------------------------------------------
 	//  Actor props factory
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 5380356..f3ec04b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -192,11 +193,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode>
impleme
 			restPort = -1;
 		}
 
-		resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
+		final RegisterApplicationMasterResponse registerApplicationMasterResponse =
+			resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
+		getContainersFromPreviousAttempts(registerApplicationMasterResponse);
 
 		return resourceManagerClient;
 	}
 
+	private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse)
{
+		final List<Container> containersFromPreviousAttempts =
+			new RegisterApplicationMasterResponseReflector(log).getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+
+		log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(),
containersFromPreviousAttempts);
+
+		for (final Container container : containersFromPreviousAttempts) {
+			workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
+		}
+	}
+
 	protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration)
{
 		// create the client to communicate with the node managers
 		NMClient nodeManagerClient = NMClient.createNMClient();
@@ -315,6 +329,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode>
impleme
 				closeTaskManagerConnection(new ResourceID(
 					container.getContainerId().toString()), new Exception(container.getDiagnostics()));
 			}
+			workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/45397fe9/flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
new file mode 100644
index 0000000..af33e65
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link RegisterApplicationMasterResponseReflector}.
+ */
+public class RegisterApplicationMasterResponseReflectorTest extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RegisterApplicationMasterResponseReflectorTest.class);
+
+	@Mock
+	private Container mockContainer;
+
+	@Before
+	public void setUp() {
+		MockitoAnnotations.initMocks(this);
+	}
+
+	@Test
+	public void testCallsMethodIfPresent() {
+		final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector
=
+			new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);
+
+		final List<Container> containersFromPreviousAttemptsUnsafe =
+			registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new
+				HasMethod());
+
+		assertThat(containersFromPreviousAttemptsUnsafe, hasSize(1));
+	}
+
+	@Test
+	public void testDoesntCallMethodIfAbsent() {
+		final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector
=
+			new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);
+
+		final List<Container> containersFromPreviousAttemptsUnsafe =
+			registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new
+				Object());
+
+		assertThat(containersFromPreviousAttemptsUnsafe, empty());
+	}
+
+	@Test
+	public void testGetMethodReflectiveHadoop22() {
+		assumeTrue(
+			"Method getContainersFromPreviousAttempts is not supported by Hadoop: " +
+				VersionInfo.getVersion(),
+			isHadoopVersionGreaterThanOrEquals(2, 2));
+
+		final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector
=
+			new RegisterApplicationMasterResponseReflector(LOG);
+
+		final Method method = registerApplicationMasterResponseReflector.getMethod();
+		assertThat(method, notNullValue());
+	}
+
+	private static boolean isHadoopVersionGreaterThanOrEquals(final int major, final int minor)
{
+		final String[] splitVersion = VersionInfo.getVersion().split("\\.");
+		final int[] versions = Arrays.stream(splitVersion).mapToInt(Integer::parseInt).toArray();
+		return versions[0] >= major && versions[1] >= minor;
+	}
+
+	/**
+	 * Class which has a method with the same signature as
+	 * {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
+	 */
+	private class HasMethod {
+
+		/**
+		 * Called from {@link #testCallsMethodIfPresent()}.
+		 */
+		@SuppressWarnings("unused")
+		public List<Container> getContainersFromPreviousAttempts() {
+			return Collections.singletonList(mockContainer);
+		}
+	}
+}


Mime
View raw message