hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject hadoop git commit: HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. (Daniel Templeton via kasha)
Date Fri, 26 Feb 2016 00:31:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk b2951f9fb -> c2460dad6


HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. (Daniel Templeton
via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2460dad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2460dad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2460dad

Branch: refs/heads/trunk
Commit: c2460dad642feee1086442d33c30c24ec77236b9
Parents: b2951f9
Author: Karthik Kambatla <kasha@cloudera.com>
Authored: Thu Feb 25 16:31:01 2016 -0800
Committer: Karthik Kambatla <kasha@cloudera.com>
Committed: Thu Feb 25 16:31:09 2016 -0800

----------------------------------------------------------------------
 .../metrics2/sink/RollingFileSystemSink.java    | 143 +++++++++++++------
 .../sink/RollingFileSystemSinkTestBase.java     |   7 +-
 .../sink/TestRollingFileSystemSink.java         |  10 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../sink/TestRollingFileSystemSinkWithHdfs.java |  34 +++--
 ...TestRollingFileSystemSinkWithSecureHdfs.java |   3 +-
 6 files changed, 138 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
index 2c0a26a..9a43901 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java
@@ -132,6 +132,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
   private static final FastDateFormat DATE_FORMAT =
       FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
   private final Object lock = new Object();
+  private boolean initialized = false;
+  private SubsetConfiguration properties;
+  private Configuration conf;
   private String source;
   private boolean ignoreError;
   private boolean allowAppend;
@@ -163,63 +166,102 @@ public class RollingFileSystemSink implements MetricsSink, Closeable
{
   protected static FileSystem suppliedFilesystem = null;
 
   @Override
-  public void init(SubsetConfiguration conf) {
-    basePath = new Path(conf.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
-    source = conf.getString(SOURCE_KEY, SOURCE_DEFAULT);
-    ignoreError = conf.getBoolean(IGNORE_ERROR_KEY, false);
-    allowAppend = conf.getBoolean(ALLOW_APPEND_KEY, false);
+  public void init(SubsetConfiguration metrics2Properties) {
+    properties = metrics2Properties;
+    basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
+    source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
+    ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
+    allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
 
-    Configuration configuration = loadConf();
-
-    UserGroupInformation.setConfiguration(configuration);
+    conf = loadConf();
+    UserGroupInformation.setConfiguration(conf);
 
     // Don't do secure setup if it's not needed.
     if (UserGroupInformation.isSecurityEnabled()) {
       // Validate config so that we don't get an NPE
-      checkForProperty(conf, KEYTAB_PROPERTY_KEY);
-      checkForProperty(conf, USERNAME_PROPERTY_KEY);
+      checkForProperty(properties, KEYTAB_PROPERTY_KEY);
+      checkForProperty(properties, USERNAME_PROPERTY_KEY);
 
 
       try {
         // Login as whoever we're supposed to be and let the hostname be pulled
         // from localhost. If security isn't enabled, this does nothing.
-        SecurityUtil.login(configuration, conf.getString(KEYTAB_PROPERTY_KEY),
-            conf.getString(USERNAME_PROPERTY_KEY));
+        SecurityUtil.login(conf, properties.getString(KEYTAB_PROPERTY_KEY),
+            properties.getString(USERNAME_PROPERTY_KEY));
       } catch (IOException ex) {
         throw new MetricsException("Error logging in securely: ["
             + ex.toString() + "]", ex);
       }
     }
+  }
+
+  /**
+   * Initialize the connection to HDFS and create the base directory. Also
+   * launch the flush thread.
+   */
+  private boolean initFs() {
+    boolean success = false;
 
-    fileSystem = getFileSystem(configuration);
+    fileSystem = getFileSystem();
 
     // This step isn't strictly necessary, but it makes debugging issues much
     // easier. We try to create the base directory eagerly and fail with
     // copious debug info if it fails.
     try {
       fileSystem.mkdirs(basePath);
+      success = true;
     } catch (Exception ex) {
-      throw new MetricsException("Failed to create " + basePath + "["
-          + SOURCE_KEY + "=" + source + ", "
-          + IGNORE_ERROR_KEY + "=" + ignoreError + ", "
-          + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
-          + KEYTAB_PROPERTY_KEY + "="
-          + conf.getString(KEYTAB_PROPERTY_KEY) + ", "
-          + conf.getString(KEYTAB_PROPERTY_KEY) + "="
-          + configuration.get(conf.getString(KEYTAB_PROPERTY_KEY)) + ", "
-          + USERNAME_PROPERTY_KEY + "="
-          + conf.getString(USERNAME_PROPERTY_KEY) + ", "
-          + conf.getString(USERNAME_PROPERTY_KEY) + "="
-          + configuration.get(conf.getString(USERNAME_PROPERTY_KEY))
-          + "] -- " + ex.toString(), ex);
+      if (!ignoreError) {
+        throw new MetricsException("Failed to create " + basePath + "["
+            + SOURCE_KEY + "=" + source + ", "
+            + ALLOW_APPEND_KEY + "=" + allowAppend + ", "
+            + stringifySecurityProperty(KEYTAB_PROPERTY_KEY) + ", "
+            + stringifySecurityProperty(USERNAME_PROPERTY_KEY)
+            + "] -- " + ex.toString(), ex);
+      }
     }
 
-    // If we're permitted to append, check if we actually can
-    if (allowAppend) {
-      allowAppend = checkAppend(fileSystem);
+    if (success) {
+      // If we're permitted to append, check if we actually can
+      if (allowAppend) {
+        allowAppend = checkAppend(fileSystem);
+      }
+
+      flushTimer = new Timer("RollingFileSystemSink Flusher", true);
     }
 
-    flushTimer = new Timer("RollingFileSystemSink Flusher", true);
+    return success;
+  }
+
+  /**
+   * Turn a security property into a nicely formatted set of <i>name=value</i>
+   * strings, allowing for either the property or the configuration not to be
+   * set.
+   *
+   * @param properties the sink properties
+   * @param conf the conf
+   * @param property the property to stringify
+   * @return the stringified property
+   */
+  private String stringifySecurityProperty(String property) {
+    String securityProperty;
+
+    if (properties.containsKey(property)) {
+      String propertyValue = properties.getString(property);
+      String confValue = conf.get(properties.getString(property));
+
+      if (confValue != null) {
+        securityProperty = property + "=" + propertyValue
+            + ", " + properties.getString(property) + "=" + confValue;
+      } else {
+        securityProperty = property + "=" + propertyValue
+            + ", " + properties.getString(property) + "=<NOT SET>";
+      }
+    } else {
+      securityProperty = property + "=<NOT SET>";
+    }
+
+    return securityProperty;
   }
 
   /**
@@ -242,17 +284,17 @@ public class RollingFileSystemSink implements MetricsSink, Closeable
{
    * @return the configuration to use
    */
   private Configuration loadConf() {
-    Configuration conf;
+    Configuration c;
 
     if (suppliedConf != null) {
-      conf = suppliedConf;
+      c = suppliedConf;
     } else {
       // The config we're handed in init() isn't the one we want here, so we
       // create a new one to pick up the full settings.
-      conf = new Configuration();
+      c = new Configuration();
     }
 
-    return conf;
+    return c;
   }
 
   /**
@@ -263,7 +305,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * @return the file system to use
    * @throws MetricsException thrown if the file system could not be retrieved
    */
-  private FileSystem getFileSystem(Configuration conf) throws MetricsException {
+  private FileSystem getFileSystem() throws MetricsException {
     FileSystem fs = null;
 
     if (suppliedFilesystem != null) {
@@ -317,22 +359,29 @@ public class RollingFileSystemSink implements MetricsSink, Closeable
{
     // because if currentDirPath is null, then currentOutStream is null, but
     // currentOutStream can be null for other reasons.
     if ((currentOutStream == null) || !path.equals(currentDirPath)) {
-      // Close the stream. This step could have been handled already by the
-      // flusher thread, but if it has, the PrintStream will just swallow the
-      // exception, which is fine.
-      if (currentOutStream != null) {
-        currentOutStream.close();
+      // If we're not yet connected to HDFS, create the connection
+      if (!initialized) {
+        initialized = initFs();
       }
 
-      currentDirPath = path;
+      if (initialized) {
+        // Close the stream. This step could have been handled already by the
+        // flusher thread, but if it has, the PrintStream will just swallow the
+        // exception, which is fine.
+        if (currentOutStream != null) {
+          currentOutStream.close();
+        }
 
-      try {
-        rollLogDir();
-      } catch (IOException ex) {
-        throwMetricsException("Failed to create new log file", ex);
-      }
+        currentDirPath = path;
 
-      scheduleFlush(now);
+        try {
+          rollLogDir();
+        } catch (IOException ex) {
+          throwMetricsException("Failed to create new log file", ex);
+        }
+
+        scheduleFlush(now);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
index f1ad058..9914c5e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java
@@ -175,7 +175,7 @@ public class RollingFileSystemSinkTestBase {
     String prefix = methodName.getMethodName().toLowerCase();
 
     ConfigBuilder builder = new ConfigBuilder().add("*.period", 10000)
-        .add(prefix + ".sink.mysink0.class", ErrorSink.class.getName())
+        .add(prefix + ".sink.mysink0.class", MockSink.class.getName())
         .add(prefix + ".sink.mysink0.basepath", path)
         .add(prefix + ".sink.mysink0.source", "testsrc")
         .add(prefix + ".sink.mysink0.context", "test1")
@@ -503,8 +503,9 @@ public class RollingFileSystemSinkTestBase {
    * This class is a {@link RollingFileSystemSink} wrapper that tracks whether
    * an exception has been thrown during operations.
    */
-  public static class ErrorSink extends RollingFileSystemSink {
+  public static class MockSink extends RollingFileSystemSink {
     public static volatile boolean errored = false;
+    public static volatile boolean initialized = false;
 
     @Override
     public void init(SubsetConfiguration conf) {
@@ -515,6 +516,8 @@ public class RollingFileSystemSinkTestBase {
 
         throw new MetricsException(ex);
       }
+
+      initialized = true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
index da63235..3c6cd27 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.metrics2.MetricsSystem;
 import org.junit.Test;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test the {@link RollingFileSystemSink} class in the context of the local file
@@ -106,7 +108,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase
{
     new MyMetrics1().registerWith(ms);
 
     methodDir.setWritable(false);
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     try {
       // publish the metrics
@@ -114,7 +116,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase
{
 
       assertTrue("No exception was generated while writing metrics "
           + "even though the target directory was not writable",
-          ErrorSink.errored);
+          MockSink.errored);
 
       ms.stop();
       ms.shutdown();
@@ -135,7 +137,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase
{
     new MyMetrics1().registerWith(ms);
 
     methodDir.setWritable(false);
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     try {
       // publish the metrics
@@ -144,7 +146,7 @@ public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase
{
       assertFalse("An exception was generated while writing metrics "
           + "when the target directory was not writable, even though the "
           + "sink is set to ignore errors",
-          ErrorSink.errored);
+          MockSink.errored);
 
       ms.stop();
       ms.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e3990ea..104b46d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1047,6 +1047,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
     (Wei Zhou via wang)
 
+    HDFS-9858. RollingFileSystemSink can throw an NPE on non-secure clusters. 
+    (Daniel Templeton via kasha)
+
 Release 2.8.0 - UNRELEASED
 
   NEW FEATURES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
index 0f3725b..9984b34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
@@ -151,12 +151,12 @@ public class TestRollingFileSystemSinkWithHdfs
     new MyMetrics1().registerWith(ms);
 
     shutdownHdfs();
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     ms.publishMetricsNow(); // publish the metrics
 
     assertTrue("No exception was generated while writing metrics "
-        + "even though HDFS was unavailable", ErrorSink.errored);
+        + "even though HDFS was unavailable", MockSink.errored);
 
     ms.stop();
     ms.shutdown();
@@ -178,12 +178,12 @@ public class TestRollingFileSystemSinkWithHdfs
     ms.publishMetricsNow(); // publish the metrics
 
     shutdownHdfs();
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     ms.stop();
 
     assertTrue("No exception was generated while stopping sink "
-        + "even though HDFS was unavailable", ErrorSink.errored);
+        + "even though HDFS was unavailable", MockSink.errored);
 
     ms.shutdown();
   }
@@ -203,13 +203,13 @@ public class TestRollingFileSystemSinkWithHdfs
     new MyMetrics1().registerWith(ms);
 
     shutdownHdfs();
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     ms.publishMetricsNow(); // publish the metrics
 
     assertFalse("An exception was generated writing metrics "
         + "while HDFS was unavailable, even though the sink is set to "
-        + "ignore errors", ErrorSink.errored);
+        + "ignore errors", MockSink.errored);
 
     ms.stop();
     ms.shutdown();
@@ -231,13 +231,13 @@ public class TestRollingFileSystemSinkWithHdfs
     ms.publishMetricsNow(); // publish the metrics
 
     shutdownHdfs();
-    ErrorSink.errored = false;
+    MockSink.errored = false;
 
     ms.stop();
 
     assertFalse("An exception was generated stopping sink "
         + "while HDFS was unavailable, even though the sink is set to "
-        + "ignore errors", ErrorSink.errored);
+        + "ignore errors", MockSink.errored);
 
     ms.shutdown();
   }
@@ -283,4 +283,22 @@ public class TestRollingFileSystemSinkWithHdfs
 
     ms.stop();
   }
+
+  /**
+   * Test that a failure to connect to HDFS does not cause the init() method
+   * to fail.
+   */
+  @Test
+  public void testInitWithNoHDFS() {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    shutdownHdfs();
+    MockSink.errored = false;
+    initMetricsSystem(path, true, false);
+
+    assertTrue("The sink was not initialized as expected",
+        MockSink.initialized);
+    assertFalse("The sink threw an unexpected error on initialization",
+        MockSink.errored);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2460dad/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
index 12b71f8..dce4fdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.junit.Test;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test the {@link RollingFileSystemSink} class in the context of HDFS with
@@ -147,7 +148,7 @@ public class TestRollingFileSystemSinkWithSecureHdfs
 
       assertTrue("No exception was generated initializing the sink against a "
           + "secure cluster even though the principal and keytab properties "
-          + "were missing", ErrorSink.errored);
+          + "were missing", MockSink.errored);
     } finally {
       if (cluster != null) {
         cluster.shutdown();


Mime
View raw message