hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject hadoop git commit: HDFS-9637. Tests for RollingFileSystemSink. Addendum patch to add missing file.
Date Fri, 12 Feb 2016 04:41:16 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 fb57c01ea -> d3a0bb6d3


HDFS-9637. Tests for RollingFileSystemSink. Addendum patch to add missing file.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3a0bb6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3a0bb6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3a0bb6d

Branch: refs/heads/branch-2
Commit: d3a0bb6d32495ce258694d68a5912795c49be8dc
Parents: fb57c01
Author: Karthik Kambatla <kasha@cloudera.com>
Authored: Thu Feb 11 20:41:06 2016 -0800
Committer: Karthik Kambatla <kasha@cloudera.com>
Committed: Thu Feb 11 20:41:06 2016 -0800

----------------------------------------------------------------------
 .../sink/TestRollingFileSystemSinkWithHdfs.java | 292 +++++++++++++++++++
 1 file changed, 292 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3a0bb6d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
new file mode 100644
index 0000000..0fc04ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Calendar;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
+import org.junit.After;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of HDFS.
+ */
+public class TestRollingFileSystemSinkWithHdfs
+    extends RollingFileSystemSinkTestBase {
+  private static final int  NUM_DATANODES = 4;
+  private MiniDFSCluster cluster;
+
+  /**
+   * Create a {@link MiniDFSCluster} instance with four nodes.  The
+   * node count is required to allow append to function. Also clear the
+   * sink's test flags.
+   *
+   * @throws IOException thrown if cluster creation fails
+   */
+  @Before
+  public void setupHdfs() throws IOException {
+    Configuration conf = new Configuration();
+
+    // It appears that since HDFS-265, append is always enabled.
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    // Also clear sink flags
+    RollingFileSystemSink.isTest = false;
+    RollingFileSystemSink.hasFlushed = false;
+  }
+
+  /**
+   * Stop the {@link MiniDFSCluster}.
+   */
+  @After
+  public void shutdownHdfs() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test writing logs to HDFS.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testWrite() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, true);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS if append is enabled and the log file already
+   * exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertExtraContents(doAppendTest(path, false, true, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS if append is enabled, the log file already
+   * exists, and the sink is set to ignore errors.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testSilentAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertExtraContents(doAppendTest(path, false, true, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS without append enabled, when the log file already
+   * exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testNoAppend() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test writing logs to HDFS without append enabled, with ignore errors
+   * enabled, and when the log file already exists.
+   *
+   * @throws Exception thrown when things break
+   */
+  @Test
+  public void testSilentOverwrite() throws Exception {
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+
+    assertMetricsContents(doAppendTest(path, true, false, 2));
+  }
+
+  /**
+   * Test that writing to HDFS fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testFailedWrite() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    assertTrue("No exception was generated while writing metrics "
+        + "even though HDFS was unavailable", ErrorSink.errored);
+
+    ms.stop();
+    ms.shutdown();
+  }
+
+  /**
+   * Test that closing a file in HDFS fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testFailedClose() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    try {
+      ms.stop();
+      fail("No exception was generated while stopping sink "
+          + "even though HDFS was unavailable");
+    } catch (MetricsException ex) {
+      // Expected
+    }
+
+    ms.shutdown();
+  }
+
+  /**
+   * Test that writing to HDFS fails silently when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   * @throws java.lang.InterruptedException thrown if interrupted
+   */
+  @Test
+  public void testSilentFailedWrite() throws IOException, InterruptedException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    assertFalse("An exception was generated writing metrics "
+        + "while HDFS was unavailable, even though the sink is set to "
+        + "ignore errors", ErrorSink.errored);
+
+    ms.stop();
+    ms.shutdown();
+  }
+
+  /**
+   * Test that closing a file in HDFS silently fails when HDFS is unavailable.
+   *
+   * @throws IOException thrown when reading or writing log files
+   */
+  @Test
+  public void testSilentFailedClose() throws IOException {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    ms.publishMetricsNow(); // publish the metrics
+
+    shutdownHdfs();
+    ErrorSink.errored = false;
+
+    ms.stop();
+
+    assertFalse("An exception was generated stopping sink "
+        + "while HDFS was unavailable, even though the sink is set to "
+        + "ignore errors", ErrorSink.errored);
+
+    ms.shutdown();
+  }
+
+  /**
+   * This test specifically checks whether the flusher thread is automatically
+   * flushing the files.  It unfortunately can only test with the alternative
+   * flushing schedule (because of test timing), but it's better than nothing.
+   *
+   * @throws Exception thrown if something breaks
+   */
+  @Test
+  public void testFlushThread() throws Exception {
+    RollingFileSystemSink.isTest = true;
+    RollingFileSystemSink.hasFlushed = false;
+
+    String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    // Publish the metrics
+    ms.publishMetricsNow();
+    // Pubish again because the first write seems to get properly flushed
+    // regardless.
+    ms.publishMetricsNow();
+
+    // Sleep until the flusher has run
+    while (!RollingFileSystemSink.hasFlushed) {
+      Thread.sleep(50L);
+    }
+
+    Calendar now = getNowNotTopOfHour();
+    FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
+    Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
+    Path currentFile =
+        findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
+    FileStatus status = fs.getFileStatus(currentFile);
+
+    // Each metrics record is 118+ bytes, depending on hostname
+    assertTrue("The flusher thread didn't flush the log contents. Expected "
+        + "at least 236 bytes in the log file, but got " + status.getLen(),
+        status.getLen() >= 236);
+
+    ms.stop();
+  }
+}


Mime
View raw message