hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [18/27] hadoop git commit: HDFS-7746. Add a test randomly mixing append, truncate and snapshot operations.
Date Thu, 05 Mar 2015 23:04:25 GMT
HDFS-7746. Add a test randomly mixing append, truncate and snapshot operations.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ded0200e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ded0200e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ded0200e

Branch: refs/heads/YARN-2928
Commit: ded0200e9c98dea960db756bb208ff475d710e28
Parents: 22426a1
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Thu Mar 5 10:21:29 2015 +0800
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Thu Mar 5 10:21:29 2015 +0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/TestAppendSnapshotTruncate.java | 478 +++++++++++++++++++
 2 files changed, 481 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded0200e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d9008d9..f9541e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -709,6 +709,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-1522. Combine two BLOCK_FILE_PREFIX constants into one.
     (Dongming Liang via shv)
 
+    HDFS-7746. Add a test randomly mixing append, truncate and snapshot
+    operations. (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ded0200e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
new file mode 100644
index 0000000..5c4c7b4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendSnapshotTruncate.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.TestFileTruncate;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Test randomly mixing append, snapshot and truncate operations.
+ * Use local file system to simulate the each operation and verify
+ * the correctness.
+ */
+public class TestAppendSnapshotTruncate {
+  static {
+    GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
+  }
+  private static final Log LOG = LogFactory.getLog(TestAppendSnapshotTruncate.class);
+  private static final int BLOCK_SIZE = 1024;
+  private static final int DATANODE_NUM = 3;
+  private static final short REPLICATION = 3;
+
+  static final int SHORT_HEARTBEAT = 1;
+  static final String[] EMPTY_STRINGS = {};
+
+  static Configuration conf;
+  static MiniDFSCluster cluster;
+  static DistributedFileSystem dfs;
+
+  @BeforeClass
+  public static void startUp() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, SHORT_HEARTBEAT);
+    conf.setLong(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .format(true)
+        .numDataNodes(DATANODE_NUM)
+        .nameNodePort(NameNode.DEFAULT_PORT)
+        .waitSafeMode(true)
+        .build();
+    dfs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if(dfs != null) {
+      dfs.close();
+    }
+    if(cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+
+  /** Test randomly mixing append, snapshot and truncate operations. */
+  @Test
+  public void testAST() throws Exception {
+    final String dirPathString = "/dir";
+    final Path dir = new Path(dirPathString);
+    dfs.mkdirs(dir);
+    dfs.allowSnapshot(dir);
+
+    final File localDir = new File(
+        System.getProperty("test.build.data", "target/test/data")
+        + dirPathString);
+    if (localDir.exists()) {
+      FileUtil.fullyDelete(localDir);
+    }
+    localDir.mkdirs();
+
+    final DirWorker w = new DirWorker(dir, localDir, 3);
+    w.startAllFiles();
+    w.start();
+    Worker.sleep(10L*1000);
+    w.stop();
+    w.stoptAllFiles();
+    w.checkEverything();
+  }
+
+  static final FileFilter FILE_ONLY = new FileFilter() {
+    @Override
+    public boolean accept(File f) {
+      return f.isFile();
+    }
+  };
+
+  static class DirWorker extends Worker {
+    final Path dir;
+    final File localDir;
+    
+    final FileWorker[] files;
+
+    private Map<String, Path> snapshotPaths = new HashMap<String, Path>();
+    private AtomicInteger snapshotCount = new AtomicInteger();
+
+    DirWorker(Path dir, File localDir, int nFiles) throws IOException {
+      super(dir.getName());
+      this.dir = dir;
+      this.localDir = localDir;
+
+      this.files = new FileWorker[nFiles];
+      for(int i = 0; i < files.length; i++) {
+        files[i] = new FileWorker(dir, localDir, String.format("file%02d", i));
+      }
+    }
+
+    static String getSnapshotName(int n) {
+      return String.format("s%02d", n);
+    }
+
+    String createSnapshot(String snapshot) throws IOException {
+      final StringBuilder b = new StringBuilder("createSnapshot: ")
+          .append(snapshot).append(" for ").append(dir);
+
+      {
+        //copy all local files to a sub dir to simulate snapshot. 
+        final File subDir = new File(localDir, snapshot);
+        Assert.assertFalse(subDir.exists());
+        subDir.mkdir();
+
+        for(File f : localDir.listFiles(FILE_ONLY)) {
+          FileUtils.copyFile(f, new File(subDir, f.getName()));
+        }
+      }
+      
+      final Path p = dfs.createSnapshot(dir, snapshot);
+      snapshotPaths.put(snapshot, p);
+      return b.toString();
+    }
+
+    String checkSnapshot(String snapshot) throws IOException {
+      final StringBuilder b = new StringBuilder("checkSnapshot: ")
+          .append(snapshot);
+
+      final File subDir = new File(localDir, snapshot);
+      Assert.assertTrue(subDir.exists());
+      
+      final File[] localFiles = subDir.listFiles(FILE_ONLY);
+      final Path p = snapshotPaths.get(snapshot);
+      final FileStatus[] statuses = dfs.listStatus(p);
+      Assert.assertEquals(localFiles.length, statuses.length);
+      b.append(p).append(" vs ").append(subDir).append(", ")
+       .append(statuses.length).append(" entries");
+      
+      Arrays.sort(localFiles);
+      Arrays.sort(statuses);
+      for(int i = 0; i < statuses.length; i++) {
+        FileWorker.checkFullFile(statuses[i].getPath(), localFiles[i]);
+      }
+      return b.toString();
+    }
+
+    String deleteSnapshot(String snapshot) throws IOException {
+      final StringBuilder b = new StringBuilder("deleteSnapshot: ")
+          .append(snapshot).append(" from ").append(dir);
+      FileUtil.fullyDelete(new File(localDir, snapshot));
+      dfs.deleteSnapshot(dir, snapshot);
+      snapshotPaths.remove(snapshot);
+      return b.toString();
+    }
+
+    
+    @Override
+    public String call() throws Exception {
+      final Random r = DFSUtil.getRandom();
+      final int op = r.nextInt(6);
+      if (op <= 1) {
+        pauseAllFiles();
+        try {
+          final String snapshot = getSnapshotName(snapshotCount.getAndIncrement());
+          return createSnapshot(snapshot);
+        } finally {
+          startAllFiles();
+        }
+      } else if (op <= 3) {
+        final String[] keys = snapshotPaths.keySet().toArray(EMPTY_STRINGS);
+        if (keys.length == 0) {
+          return "NO-OP";
+        }
+        final String snapshot = keys[r.nextInt(keys.length)];
+        final String s = checkSnapshot(snapshot);
+        
+        if (op == 2) {
+          return deleteSnapshot(snapshot);
+        }
+        return s;
+      } else {
+        return "NO-OP";
+      }
+    }
+
+    void pauseAllFiles() {
+      for(FileWorker f : files) { 
+        f.pause();
+      }
+
+      for(int i = 0; i < files.length; ) {
+        sleep(100);
+        for(; i < files.length && files[i].isPaused(); i++);
+      }
+    }
+    
+    void startAllFiles() {
+      for(FileWorker f : files) { 
+        f.start();
+      }
+    }
+    
+    void stoptAllFiles() throws InterruptedException {
+      for(FileWorker f : files) { 
+        f.stop();
+      }
+    }
+
+    void checkEverything() throws IOException {
+      LOG.info("checkEverything");
+      for(FileWorker f : files) { 
+        f.checkFullFile();
+        Preconditions.checkState(f.state.get() != State.ERROR);
+      }
+      for(String snapshot : snapshotPaths.keySet()) {
+        checkSnapshot(snapshot);
+      }
+      Preconditions.checkState(state.get() != State.ERROR);
+    }
+  }
+
+  static class FileWorker extends Worker {
+    final Path file;
+    final File localFile;
+
+    FileWorker(Path dir, File localDir, String filename) throws IOException {
+      super(filename);
+      this.file = new Path(dir, filename);
+      this.localFile = new File(localDir, filename);
+
+      localFile.createNewFile();
+      dfs.create(file, false, 4096, REPLICATION, BLOCK_SIZE).close();
+    }
+
+    @Override
+    public String call() throws IOException {
+      final Random r = DFSUtil.getRandom();
+      final int op = r.nextInt(9);
+      if (op == 0) {
+        return checkFullFile();
+      } else {
+        final int nBlocks = r.nextInt(4) + 1;
+        final int lastBlockSize = r.nextInt(BLOCK_SIZE) + 1;
+        final int nBytes = nBlocks*BLOCK_SIZE + lastBlockSize;
+
+        if (op <= 4) {
+          return append(nBytes);
+        } else if (op <= 6) {
+          return truncateArbitrarily(nBytes);
+        } else {
+          return truncateToBlockBoundary(nBlocks);
+        }
+      }
+    }
+
+    String append(int n) throws IOException {
+      final StringBuilder b = new StringBuilder("append ")
+          .append(n).append(" bytes to ").append(file.getName());
+
+      final byte[] bytes = new byte[n];
+      DFSUtil.getRandom().nextBytes(bytes);
+      
+      { // write to local file
+        final FileOutputStream out = new FileOutputStream(localFile, true);
+        out.write(bytes, 0, bytes.length);
+        out.close();
+      }
+
+      {
+        final FSDataOutputStream out = dfs.append(file);
+        out.write(bytes, 0, bytes.length);
+        out.close();
+      }
+      return b.toString();
+    }
+    
+    String truncateArbitrarily(int nBytes) throws IOException {
+      Preconditions.checkArgument(nBytes > 0);
+      final int length = checkLength();
+      final StringBuilder b = new StringBuilder("truncateArbitrarily: ")
+          .append(nBytes).append(" bytes from ").append(file.getName())
+          .append(", length=" + length);
+
+      truncate(length > nBytes? length - nBytes: 0, b);
+      return b.toString();
+    }
+
+    String truncateToBlockBoundary(int nBlocks) throws IOException {
+      Preconditions.checkArgument(nBlocks > 0);
+      final int length = checkLength();
+      final StringBuilder b = new StringBuilder("truncateToBlockBoundary: ")
+          .append(nBlocks).append(" blocks from ").append(file.getName())
+          .append(", length=" + length);
+      final int n =  (nBlocks - 1)*BLOCK_SIZE + (length%BLOCK_SIZE);
+      Preconditions.checkState(truncate(length > n? length - n: 0, b), b);
+      return b.toString();
+    }
+
+    private boolean truncate(long newLength, StringBuilder b) throws IOException {
+      final RandomAccessFile raf = new RandomAccessFile(localFile, "rw");
+      raf.setLength(newLength);
+      raf.close();
+
+      final boolean isReady = dfs.truncate(file, newLength);
+      b.append(", newLength=").append(newLength)
+       .append(", isReady=").append(isReady);
+      if (!isReady) {
+        TestFileTruncate.checkBlockRecovery(file, dfs);
+      }
+      return isReady;
+    }
+    
+    int checkLength() throws IOException {
+      return checkLength(file, localFile);
+    }
+
+    static int checkLength(Path file, File localFile) throws IOException {
+      final long length = dfs.getFileStatus(file).getLen();
+      Assert.assertEquals(localFile.length(), length);
+      Assert.assertTrue(length <= Integer.MAX_VALUE);
+      return (int)length;
+    }
+    
+    String checkFullFile() throws IOException {
+      return checkFullFile(file, localFile);
+    }
+
+    static String checkFullFile(Path file, File localFile) throws IOException {
+      final StringBuilder b = new StringBuilder("checkFullFile: ")
+          .append(file.getName()).append(" vs ").append(localFile);
+      final byte[] bytes = new byte[checkLength(file, localFile)];
+      b.append(", length=").append(bytes.length);
+      
+      final FileInputStream in = new FileInputStream(localFile); 
+      for(int n = 0; n < bytes.length; ) {
+        n += in.read(bytes, n, bytes.length - n);
+      }
+      in.close();
+      
+      AppendTestUtil.checkFullFile(dfs, file, bytes.length, bytes,
+          "File content mismatch: " + b, false);
+      return b.toString();
+    }
+  }
+  
+  static abstract class Worker implements Callable<String> {
+    enum State {
+      IDLE(false), RUNNING(false), STOPPED(true), ERROR(true);
+      
+      final boolean isTerminated;
+      State(boolean isTerminated) {
+        this.isTerminated = isTerminated;
+      }
+    };
+
+    final String name;
+    final AtomicReference<State> state = new AtomicReference<State>(State.IDLE);
+    final AtomicBoolean isCalling = new AtomicBoolean();
+    final AtomicReference<Thread> thread = new AtomicReference<Thread>();
+    
+    Worker(String name) {
+      this.name = name;
+    }
+
+    void start() {
+      Preconditions.checkState(state.compareAndSet(State.IDLE, State.RUNNING));
+      
+      if (thread.get() == null) {
+        final Thread t = new Thread(null, new Runnable() {
+          @Override
+          public void run() {
+            final Random r = DFSUtil.getRandom();
+            for(State s; (s = state.get()) == State.RUNNING || s == State.IDLE;) {
+              if (s == State.RUNNING) {
+                isCalling.set(true);
+                try {
+                  LOG.info(call());
+                } catch (Exception e) {
+                  LOG.error("Worker " + name + " failed.", e);
+                  state.set(State.ERROR);
+                  return;
+                }
+                isCalling.set(false);
+              }
+              sleep(r.nextInt(100) + 50);
+            }
+          }
+        }, name);
+        Preconditions.checkState(thread.compareAndSet(null, t));
+        t.start();
+      }
+    }
+
+    boolean isPaused() {
+      return state.get() == State.IDLE && !isCalling.get();
+    }
+
+    void pause() {
+      Preconditions.checkState(state.compareAndSet(State.RUNNING, State.IDLE));
+    }
+
+    void stop() throws InterruptedException {
+      if (state.get() == State.ERROR) {
+        return;
+      }
+
+      state.set(State.STOPPED);
+      thread.get().join();
+    }
+
+    static void sleep(final long sleepTimeMs) {
+      try {
+        Thread.sleep(sleepTimeMs);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}


Mime
View raw message