flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
Date Fri, 12 Oct 2018 17:02:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648131#comment-16648131
] 

ASF GitHub Bot commented on FLINK-10516:
----------------------------------------

yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner
fail to initialize FileSystem with correct Flink Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 462682f7e5f..f3dd27b5ef0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -160,6 +162,13 @@ protected int run(String[] args) {
 
 			final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties);
 
+			// configure the filesystems
+			try {
+				FileSystem.initialize(flinkConfig);
+			} catch (IOException e) {
+				throw new IOException("Error while configuring the filesystems.", e);
+			}
+
 			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 			if (remoteKeytabPrincipal != null && f.exists()) {
 				String keytabPath = f.getAbsolutePath();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index b15374b2a4b..929dbdbce3b 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -29,9 +32,14 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +56,7 @@
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -56,6 +65,8 @@
 /**
  * Tests for the {@link YarnApplicationMasterRunner}.
  */
+@PrepareForTest(FileSystem.class)
+@RunWith(PowerMockRunner.class)
 public class YarnApplicationMasterRunnerTest {
 	private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
 
@@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable
{
 			taskManagerConf, workingDirectory, taskManagerMainClass, LOG);
 		assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme());
 	}
+
+	@Test
+	public void testRunAndInitializeFileSystem() throws Exception {
+		// Mock necessary system variables
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+		// Create dynamic properties to be used in the Flink configuration
+		map.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, "myKey=myValue");
+		CommonTestUtils.setEnv(map);
+
+		// Create a temporary flink-conf.yaml and to be deleted on JVM exits
+		File currDir = new File(System.getenv().get(ApplicationConstants.Environment.PWD.key()));
+		String path = String.format("%s/%s.%s", currDir, "flink-conf", "yaml");
+		File f = new File(path);
+		f.createNewFile();
+		f.deleteOnExit();
+
+		// Mock FileSystem.initialize()
+		PowerMockito.mockStatic(FileSystem.class);
+		PowerMockito.doNothing().when(FileSystem.class);
+		FileSystem.initialize(any(Configuration.class));
+
+		String[] args = new String[5];
+		YarnApplicationMasterRunner yarnApplicationMasterRunner = new YarnApplicationMasterRunner();
+		yarnApplicationMasterRunner.run(args);
+
+		// Verify FileSystem.initialize() is invoked with the correct Flink config
+		ArgumentCaptor<Configuration> propertiesCaptor =
+			ArgumentCaptor.forClass(Configuration.class);
+		PowerMockito.verifyStatic();
+		FileSystem.initialize(propertiesCaptor.capture());
+		assertEquals("myValue", propertiesCaptor.getValue().getString("myKey", ""));
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration
during setup
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10516
>                 URL: https://issues.apache.org/jira/browse/FLINK-10516
>             Project: Flink
>          Issue Type: Bug
>          Components: YARN
>    Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>            Reporter: Shuyi Chen
>            Assignee: Shuyi Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to prevent
future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message