beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables
Date Thu, 04 May 2017 15:48:38 GMT
Repository: beam
Updated Branches:
  refs/heads/master e1d4aa963 -> 588f57a1e


[BEAM-1491]Identify HADOOP_CONF_DIR(or YARN_CONF_DIR) environment variables


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16717083
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16717083
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16717083

Branch: refs/heads/master
Commit: 1671708340fb9fc57cdc91c3bbacdff3ae6af4af
Parents: e1d4aa9
Author: yangping.wu <yangping.wu@qunar.com>
Authored: Thu May 4 14:04:08 2017 +0800
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu May 4 08:45:04 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hadoop-file-system/pom.xml         |   6 +
 .../sdk/io/hdfs/HadoopFileSystemOptions.java    |  73 +++++++++--
 .../io/hdfs/HadoopFileSystemOptionsTest.java    | 125 +++++++++++++++++++
 3 files changed, 197 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 3ec9848..562277e 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -187,6 +187,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
index 31250bc..45f43e2 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -17,12 +17,22 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
@@ -30,20 +40,69 @@ import org.apache.hadoop.conf.Configuration;
  */
 public interface HadoopFileSystemOptions extends PipelineOptions {
   @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems.
"
-      + "To specify on the command-line, represent the value as a JSON list of JSON maps,
where "
-      + "each map represents the entire configuration for a single Hadoop filesystem. For
example "
-      + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
+      + "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml'
"
+      + "based upon the HADOOP_CONF_DIR and YARN_CONF_DIR environment variables. "
+      + "To specify configuration on the command-line, represent the value as a JSON list
of JSON "
+      + "maps, where each map represents the entire configuration for a single Hadoop filesystem.
"
+      + "For example --hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\",
...},"
       + "{\"fs.default.name\": \"s3a://\", ...},...]'")
   @Default.InstanceFactory(ConfigurationLocator.class)
   List<Configuration> getHdfsConfiguration();
   void setHdfsConfiguration(List<Configuration> value);
 
   /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
-  class ConfigurationLocator implements DefaultValueFactory<Configuration> {
+  class ConfigurationLocator implements DefaultValueFactory<List<Configuration>>
{
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationLocator.class);
     @Override
-    public Configuration create(PipelineOptions options) {
-      // TODO: Find default configuration to use
-      return null;
+    public List<Configuration> create(PipelineOptions options) {
+      // Find default configuration when HADOOP_CONF_DIR or YARN_CONF_DIR is set.
+      List<Configuration> configurationList = readConfigurationFromHadoopYarnConfigDirs();
+      return configurationList.size() > 0 ? configurationList : null;
+    }
+
+    private List<Configuration> readConfigurationFromHadoopYarnConfigDirs() {
+      List<Configuration> configurationList = Lists.newArrayList();
+
+      /*
+      * If we find a configuration in HADOOP_CONF_DIR and YARN_CONF_DIR,
+      * we should be returning them both separately.
+      *
+      * Also, ensure that we only load one configuration if both
+      * HADOOP_CONF_DIR and YARN_CONF_DIR point to the same location.
+      */
+      Set<String> confDirs = Sets.newHashSet();
+      for (String confDir : Lists.newArrayList("HADOOP_CONF_DIR", "YARN_CONF_DIR")) {
+        if (getEnvironment().containsKey(confDir)) {
+          String hadoopConfDir = getEnvironment().get(confDir);
+          if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+            confDirs.add(hadoopConfDir);
+          }
+        }
+      }
+
+      for (String confDir : confDirs) {
+        if (new File(confDir).exists()) {
+          Configuration conf = new Configuration(false);
+          boolean confLoaded = false;
+          for (String confName : Lists.newArrayList("core-site.xml", "hdfs-site.xml")) {
+            File confFile = new File(confDir, confName);
+            if (confFile.exists()) {
+              LOG.debug("Adding {} to hadoop configuration", confFile.getAbsolutePath());
+              conf.addResource(new Path(confFile.getAbsolutePath()));
+              confLoaded = true;
+            }
+          }
+          if (confLoaded) {
+            configurationList.add(conf);
+          }
+        }
+      }
+      return configurationList;
+    }
+
+    @VisibleForTesting
+    Map<String, String> getEnvironment() {
+      return System.getenv();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16717083/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
index 634528b..c99fc5b 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -18,13 +18,26 @@
 package org.apache.beam.sdk.io.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.AbstractMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.hamcrest.Matchers;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -33,6 +46,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class HadoopFileSystemOptionsTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
   @Test
   public void testParsingHdfsConfiguration() {
     HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
@@ -45,4 +60,114 @@ public class HadoopFileSystemOptionsTest {
     assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
         new AbstractMap.SimpleEntry("propertyB", "B")));
   }
+
+  @Test
+  public void testDefaultUnsetEnvHdfsConfiguration() {
+    HadoopFileSystemOptions.ConfigurationLocator projectFactory =
+            spy(new HadoopFileSystemOptions.ConfigurationLocator());
+    when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+    assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+  }
+
+  @Test
+  public void testDefaultJustSetHadoopConfDirConfiguration() throws IOException {
+    Files.write(createPropertyData("A"),
+        tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("B"),
+        tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+    HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+            spy(new HadoopFileSystemOptions.ConfigurationLocator());
+    Map<String, String> environment = Maps.newHashMap();
+    environment.put("HADOOP_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+    when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+    List<Configuration> configurationList =
+        configurationLocator.create(PipelineOptionsFactory.create());
+    assertEquals(1, configurationList.size());
+    assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+    assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+  }
+
+  @Test
+  public void testDefaultJustSetYarnConfDirConfiguration() throws IOException {
+    Files.write(createPropertyData("A"),
+        tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("B"),
+        tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+    HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+            spy(new HadoopFileSystemOptions.ConfigurationLocator());
+    Map<String, String> environment = Maps.newHashMap();
+    environment.put("YARN_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+    when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+    List<Configuration> configurationList =
+        configurationLocator.create(PipelineOptionsFactory.create());
+    assertEquals(1, configurationList.size());
+    assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+    assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+  }
+
+  @Test
+  public void testDefaultSetYarnConfDirAndHadoopConfDirAndSameConfiguration() throws IOException
{
+    Files.write(createPropertyData("A"),
+        tmpFolder.newFile("core-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("B"),
+        tmpFolder.newFile("hdfs-site.xml"), StandardCharsets.UTF_8);
+    HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+            spy(new HadoopFileSystemOptions.ConfigurationLocator());
+    Map<String, String> environment = Maps.newHashMap();
+    environment.put("YARN_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+    environment.put("HADOOP_CONF_DIR", tmpFolder.getRoot().getAbsolutePath());
+    when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+    List<Configuration> configurationList =
+        configurationLocator.create(PipelineOptionsFactory.create());
+    assertEquals(1, configurationList.size());
+    assertThat(configurationList.get(0).get("propertyA"), Matchers.equalTo("A"));
+    assertThat(configurationList.get(0).get("propertyB"), Matchers.equalTo("B"));
+  }
+
+  @Test
+  public void testDefaultSetYarnConfDirAndHadoopConfDirNotSameConfiguration() throws IOException
{
+    File hadoopConfDir = tmpFolder.newFolder("hadoop");
+    File yarnConfDir = tmpFolder.newFolder("yarn");
+    Files.write(createPropertyData("A"),
+        new File(hadoopConfDir, "core-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("B"),
+        new File(hadoopConfDir, "hdfs-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("C"),
+        new File(yarnConfDir, "core-site.xml"), StandardCharsets.UTF_8);
+    Files.write(createPropertyData("D"),
+        new File(yarnConfDir, "hdfs-site.xml"), StandardCharsets.UTF_8);
+    HadoopFileSystemOptions.ConfigurationLocator configurationLocator =
+              spy(new HadoopFileSystemOptions.ConfigurationLocator());
+    Map<String, String> environment = Maps.newHashMap();
+    environment.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath());
+    environment.put("HADOOP_CONF_DIR", yarnConfDir.getAbsolutePath());
+    when(configurationLocator.getEnvironment()).thenReturn(environment);
+
+    List<Configuration> configurationList =
+        configurationLocator.create(PipelineOptionsFactory.create());
+    assertEquals(2, configurationList.size());
+    int hadoopConfIndex = configurationList.get(0).get("propertyA") != null ? 0 : 1;
+    assertThat(
+        configurationList.get(hadoopConfIndex).get("propertyA"), Matchers.equalTo("A"));
+    assertThat(
+        configurationList.get(hadoopConfIndex).get("propertyB"), Matchers.equalTo("B"));
+    assertThat(
+        configurationList.get(1 - hadoopConfIndex).get("propertyC"), Matchers.equalTo("C"));
+    assertThat(
+        configurationList.get(1 - hadoopConfIndex).get("propertyD"), Matchers.equalTo("D"));
+  }
+
+  private static String createPropertyData(String property) {
+    return "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+        + "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"
+        + "<configuration>\n"
+        + "    <property>\n"
+        + "        <name>property" + property + "</name>\n"
+        + "        <value>" + property + "</value>\n"
+        + "    </property>\n"
+        + "</configuration>";
+  }
 }


Mime
View raw message