hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1058374 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/
Date Thu, 13 Jan 2011 00:31:41 GMT
Author: schen
Date: Thu Jan 13 00:31:41 2011
New Revision: 1058374

URL: http://svn.apache.org/viewvc?rev=1058374&view=rev
Log:
MAPREDUCE-2248. DistributedRaidFileSystem should unraid only the corrupt block
(Ramkumar Vadali via schen)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan 13 00:31:41 2011
@@ -473,6 +473,9 @@ Release 0.22.0 - Unreleased
     classes themselves are already deprecated. This removes an Eclipse error.
     (tomwhite via nigel)
 
+    MAPREDUCE-2248. DistributedRaidFileSystem should unraid only the corrupt
+    block (Ramkumar Vadali via schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
Thu Jan 13 00:31:41 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -155,18 +156,40 @@ public class DistributedRaidFileSystem e
    * from alternate locations if it encoumters read errors in the primary location.
    */
   private static class ExtFSDataInputStream extends FSDataInputStream {
+
+    private static class UnderlyingBlock {
+      // File that holds this block. Need not be the same as outer file.
+      public Path path;
+      // Offset within path where this block starts.
+      public long actualFileOffset;
+      // Offset within the outer file where this block starts.
+      public long originalFileOffset;
+      // Length of the block (length <= blk sz of outer file).
+      public long length;
+      public UnderlyingBlock(Path path, long actualFileOffset,
+          long originalFileOffset, long length) {
+        this.path = path;
+        this.actualFileOffset = actualFileOffset;
+        this.originalFileOffset = originalFileOffset;
+        this.length = length;
+      }
+    }
+
     /**
      * Create an input stream that wraps all the reads/positions/seeking.
      */
     private static class ExtFsInputStream extends FSInputStream {
 
-      //The underlying data input stream that the
-      // underlying filesystem will return.
-      private FSDataInputStream underLyingStream;
+      // Extents of "good" underlying data that can be read.
+      private UnderlyingBlock[] underlyingBlocks;
+      private long currentOffset;
+      private FSDataInputStream currentStream;
+      private UnderlyingBlock currentBlock;
       private byte[] oneBytebuff = new byte[1];
       private int nextLocation;
       private DistributedRaidFileSystem lfs;
       private Path path;
+      private FileStatus stat;
       private final DecodeInfo[] alternates;
       private final int buffersize;
       private final Configuration conf;
@@ -175,86 +198,133 @@ public class DistributedRaidFileSystem e
       ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
           DecodeInfo[] alternates, Path path, int stripeLength, int buffersize)
           throws IOException {
-        this.underLyingStream = lfs.fs.open(path, buffersize);
         this.path = path;
         this.nextLocation = 0;
+        // Construct array of blocks in file.
+        this.stat = lfs.getFileStatus(path);
+        long numBlocks = (this.stat.getLen() % this.stat.getBlockSize() == 0) ?
+                        this.stat.getLen() / this.stat.getBlockSize() :
+                        1 + this.stat.getLen() / this.stat.getBlockSize();
+        this.underlyingBlocks = new UnderlyingBlock[(int)numBlocks];
+        for (int i = 0; i < numBlocks; i++) {
+          long actualFileOffset = i * stat.getBlockSize();
+          long originalFileOffset = i * stat.getBlockSize();
+          long length = Math.min(
+            stat.getBlockSize(), stat.getLen() - originalFileOffset);
+          this.underlyingBlocks[i] = new UnderlyingBlock(
+            path, actualFileOffset, originalFileOffset, length);
+        }
+        this.currentOffset = 0;
+        this.currentBlock = null;
         this.alternates = alternates;
         this.buffersize = buffersize;
         this.conf = conf;
         this.lfs = lfs;
         this.stripeLength = stripeLength;
+        // Open a stream to the first block.
+        openCurrentStream();
       }
-      
+
+      private void closeCurrentStream() throws IOException {
+        if (currentStream != null) {
+          currentStream.close();
+          currentStream = null;
+        }
+      }
+
+      /**
+       * Open a stream to the file containing the current block
+       * and seek to the appropriate offset
+       */
+      private void openCurrentStream() throws IOException {
+        int blockIdx = (int)(currentOffset/stat.getBlockSize());
+        UnderlyingBlock block = underlyingBlocks[blockIdx];
+        // If the current path is the same as we want.
+        if (currentBlock == block ||
+           currentBlock != null && currentBlock.path == block.path) {
+          // If we have a valid stream, nothing to do.
+          if (currentStream != null) {
+            currentBlock = block;
+            return;
+          }
+        } else {
+          closeCurrentStream();
+        }
+        currentBlock = block;
+        currentStream = lfs.fs.open(currentBlock.path, buffersize);
+        long offset = block.actualFileOffset +
+          (currentOffset - block.originalFileOffset);
+        currentStream.seek(offset);
+      }
+
+      /**
+       * Returns the number of bytes available in the current block.
+       */
+      private int blockAvailable() {
+        return (int) (currentBlock.length -
+                (currentOffset - currentBlock.originalFileOffset));
+      }
+
       @Override
       public synchronized int available() throws IOException {
-        int value = underLyingStream.available();
+        // Application should not assume that any bytes are buffered here.
         nextLocation = 0;
-        return value;
+        return Math.min(blockAvailable(), currentStream.available());
       }
-      
+
       @Override
       public synchronized  void close() throws IOException {
-        underLyingStream.close();
+        closeCurrentStream();
         super.close();
       }
-      
+
+      @Override
+      public boolean markSupported() { return false; }
+
       @Override
       public void mark(int readLimit) {
-        underLyingStream.mark(readLimit);
+        // Mark and reset are not supported.
         nextLocation = 0;
       }
-      
+
       @Override
       public void reset() throws IOException {
-        underLyingStream.reset();
+        // Mark and reset are not supported.
         nextLocation = 0;
       }
-      
+
       @Override
       public synchronized int read() throws IOException {
-        long pos = underLyingStream.getPos();
-        while (true) {
-          try {
-            int value = underLyingStream.read();
-            nextLocation = 0;
-            return value;
-          } catch (BlockMissingException e) {
-            setAlternateLocations(e, pos);
-          } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
-          }
+        int value = read(oneBytebuff);
+        if (value < 0) {
+          return value;
+        } else {
+          return oneBytebuff[0];
         }
       }
-      
+
       @Override
       public synchronized int read(byte[] b) throws IOException {
-        long pos = underLyingStream.getPos();
-        while (true) {
-          try{
-            int value = underLyingStream.read(b);
-            nextLocation = 0;
-            return value;
-          } catch (BlockMissingException e) {
-            setAlternateLocations(e, pos);
-          } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
-          }
-        }
+        int value = read(b, 0, b.length);
+        nextLocation = 0;
+        return value;
       }
 
       @Override
       public synchronized int read(byte[] b, int offset, int len) 
         throws IOException {
-        long pos = underLyingStream.getPos();
         while (true) {
+          openCurrentStream();
           try{
-            int value = underLyingStream.read(b, offset, len);
+            int limit = Math.min(blockAvailable(), len);
+            int value = currentStream.read(b, offset, limit);
+            currentOffset += value;
             nextLocation = 0;
             return value;
           } catch (BlockMissingException e) {
-            setAlternateLocations(e, pos);
+            setAlternateLocations(e, currentOffset);
           } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
+            setAlternateLocations(e, currentOffset);
           }
         }
       }
@@ -262,43 +332,49 @@ public class DistributedRaidFileSystem e
       @Override
       public synchronized int read(long position, byte[] b, int offset, int len) 
         throws IOException {
-        long pos = underLyingStream.getPos();
-        while (true) {
-          try {
-            int value = underLyingStream.read(position, b, offset, len);
-            nextLocation = 0;
-            return value;
-          } catch (BlockMissingException e) {
-            setAlternateLocations(e, pos);
-          } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
-          }
+        long oldPos = currentOffset;
+        seek(position);
+        try {
+          return read(b, offset, len);
+        } finally {
+          seek(oldPos);
         }
       }
       
       @Override
       public synchronized long skip(long n) throws IOException {
-        long value = underLyingStream.skip(n);
+        long skipped = 0;
+        while (skipped < n) {
+          int val = read();
+          if (val < 0) {
+            break;
+          }
+          skipped++;
+        }
         nextLocation = 0;
-        return value;
+        return skipped;
       }
       
       @Override
       public synchronized long getPos() throws IOException {
-        long value = underLyingStream.getPos();
         nextLocation = 0;
-        return value;
+        return currentOffset;
       }
       
       @Override
       public synchronized void seek(long pos) throws IOException {
-        underLyingStream.seek(pos);
+        if (pos != currentOffset) {
+          closeCurrentStream();
+          currentOffset = pos;
+          openCurrentStream();
+        }
         nextLocation = 0;
       }
 
       @Override
       public boolean seekToNewSource(long targetPos) throws IOException {
-        boolean value = underLyingStream.seekToNewSource(targetPos);
+        seek(targetPos);
+        boolean value = currentStream.seekToNewSource(currentStream.getPos());
         nextLocation = 0;
         return value;
       }
@@ -309,49 +385,53 @@ public class DistributedRaidFileSystem e
       @Override
       public void readFully(long pos, byte[] b, int offset, int length) 
         throws IOException {
-        long post = underLyingStream.getPos();
-        while (true) {
-          try {
-            underLyingStream.readFully(pos, b, offset, length);
-            nextLocation = 0;
-            return;
-          } catch (BlockMissingException e) {
-            setAlternateLocations(e, post);
-          } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
+        long oldPos = currentOffset;
+        seek(pos);
+        try {
+          while (true) {
+            // This loop retries reading until successful. Unrecoverable errors
+            // cause exceptions.
+            // currentOffset is changed by read().
+            try {
+              while (length > 0) {
+                int n = read(b, offset, length);
+                if (n < 0) {
+                  throw new IOException("Premature EOF");
+                }
+                offset += n;
+                length -= n;
+              }
+              nextLocation = 0;
+              return;
+            } catch (BlockMissingException e) {
+              setAlternateLocations(e, currentOffset);
+            } catch (ChecksumException e) {
+              setAlternateLocations(e, currentOffset);
+            }
           }
+        } finally {
+          seek(oldPos);
         }
       }
-      
+
       @Override
       public void readFully(long pos, byte[] b) throws IOException {
-        long post = underLyingStream.getPos();
-        while (true) {
-          try {
-            underLyingStream.readFully(pos, b);
-            nextLocation = 0;
-            return;
-          } catch (BlockMissingException e) {
-            setAlternateLocations(e, post);
-          } catch (ChecksumException e) {
-            setAlternateLocations(e, pos);
-          }
-        }
+        readFully(pos, b, 0, b.length);
+        nextLocation = 0;
       }
 
       /**
-       * Extract good file from RAID
-       * @param curpos curexp the current exception
-       * @param curpos the position of the current operation to be retried
+       * Extract good block from RAID
        * @throws IOException if all alternate locations are exhausted
        */
-      private void setAlternateLocations(IOException curexp, long curpos) 
+      private void setAlternateLocations(IOException curexp, long offset) 
         throws IOException {
         while (alternates != null && nextLocation < alternates.length) {
           try {
             int idx = nextLocation++;
-            long corruptOffset = underLyingStream.getPos();
-
+            // Start offset of block.
+            long corruptOffset =
+              (offset / stat.getBlockSize()) * stat.getBlockSize();
             // Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
             Configuration clientConf = new Configuration(conf);
             Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
@@ -359,22 +439,21 @@ public class DistributedRaidFileSystem e
             clientConf.set("fs.hdfs.impl", clazz.getName());
             // Disable caching so that a previously cached RaidDfs is not used.
             clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-            Path npath = RaidNode.unRaid(clientConf, path,
+            Path npath = RaidNode.unRaidCorruptBlock(clientConf, path,
                          alternates[idx].destPath,
                          alternates[idx].createDecoder(),
                          stripeLength, corruptOffset);
             if (npath == null)
               continue;
 
-            FileSystem fs1 = getUnderlyingFileSystem(conf);
-            fs1.initialize(npath.toUri(), conf);
-            LOG.info("Opening alternate path " + npath + " at offset " + curpos);
-            FSDataInputStream fd = fs1.open(npath, buffersize);
-            fd.seek(curpos);
-            underLyingStream.close();
-            underLyingStream = fd;
-            lfs.fs = fs1;
-            path = npath;
+            closeCurrentStream();
+            LOG.info("Using block at offset " + corruptOffset + " from " +
+              npath);
+            currentBlock.path = npath;
+            currentBlock.actualFileOffset = 0;  // Single block in file.
+            // Dont change currentOffset, in case the user had done a seek?
+            openCurrentStream();
+
             return;
           } catch (Exception e) {
             LOG.info("Error in using alternate path " + path + ". " + e +

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Thu Jan 13 00:31:41 2011
@@ -738,7 +738,7 @@ public abstract class RaidNode implement
   /**
    * RAID an individual file
    */
-  static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
+  static public void doRaid(Configuration conf, FileStatus stat, Path destPath,
                       PolicyInfo.ErasureCodeType code, Statistics statistics,
                       Progressable reporter, boolean doSimulate,
                       int targetRepl, int metaRepl, int stripeLength)
@@ -870,6 +870,35 @@ public abstract class RaidNode implement
     return recoveredPath;
   }
 
+  public static Path unRaidCorruptBlock(Configuration conf, Path srcPath,
+    Path destPathPrefix, Decoder decoder, int stripeLength,
+      long corruptOffset) throws IOException {
+    // Test if parity file exists
+    ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+    if (ppair == null) {
+      LOG.error("Could not find parity file for " + srcPath);
+      return null;
+    }
+
+    final Path recoveryDestination = new Path(RaidNode.xorTempPrefix(conf));
+    FileSystem destFs = recoveryDestination.getFileSystem(conf);
+    final Path recoveredPrefix = 
+      destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
+    final Path recoveredBlock = 
+      new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
+    LOG.info("Creating recovered Block " + recoveredBlock);
+
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    FileStatus stat = srcFs.getFileStatus(srcPath);
+    long limit = Math.min(stat.getBlockSize(), stat.getLen() - corruptOffset);
+    java.io.OutputStream out = ppair.getFileSystem().create(recoveredBlock);
+    decoder.fixErasedBlock(srcFs, srcPath,
+        ppair.getFileSystem(), ppair.getPath(),
+        stat.getBlockSize(), corruptOffset, 0, limit, out);
+    out.close();
+    return recoveredBlock;
+  }
+
   /**
    * Periodically delete orphaned parity files.
    */

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
Thu Jan 13 00:31:41 2011
@@ -54,13 +54,12 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
 import org.apache.hadoop.raid.RaidNode;
-import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.RaidUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
 
 public class TestRaidDfs extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
       "build/contrib/raid/test/data")).getAbsolutePath();
-  final static String CONFIG_FILE = new File(TEST_DIR, 
-      "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
   final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
   final static int NUM_DATANODES = 3;
@@ -70,18 +69,16 @@ public class TestRaidDfs extends TestCas
   String hftp = null;
   MiniDFSCluster dfs = null;
   FileSystem fileSys = null;
-  RaidNode cnode = null;
   String jobTrackerName = null;
+  ErasureCodeType code;
+  int stripeLength;
 
-  private void mySetup(String erasureCode, int stripeLength,
-      int rsParityLength) throws Exception {
+  private void mySetup(
+      String erasureCode, int rsParityLength) throws Exception {
 
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     conf = new Configuration();
 
-    conf.set("raid.config.file", CONFIG_FILE);
-    conf.setBoolean("raid.config.reload", true);
-    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
     conf.setInt(RaidNode.RS_PARITY_LENGTH_KEY, rsParityLength);
 
     // scan all policies once every 5 second
@@ -105,41 +102,9 @@ public class TestRaidDfs extends TestCas
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
     FileSystem.setDefaultUri(conf, namenode);
-    
-    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
-    fileWriter.write("<?xml version=\"1.0\"?>\n");
-    String str = "<configuration> " +
-                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
-                     "<policy name = \"RaidTest1\"> " +
-                        "<erasureCode>" + erasureCode + "</erasureCode> " +
-                        "<property> " +
-                          "<name>targetReplication</name> " +
-                          "<value>1</value> " + 
-                          "<description>after RAIDing, decrease the replication factor
of a file to this value." +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>metaReplication</name> " +
-                          "<value>1</value> " + 
-                          "<description> replication factor of parity file" +
-                          "</description> " + 
-                        "</property> " +
-                        "<property> " +
-                          "<name>modTimePeriod</name> " +
-                          "<value>2000</value> " + 
-                          "<description> time (milliseconds) after a file is modified
to make it " +
-                                         "a candidate for RAIDing " +
-                          "</description> " + 
-                        "</property> " +
-                     "</policy>" +
-                   "</srcPath>" +
-                 "</configuration>";
-    fileWriter.write(str);
-    fileWriter.close();
   }
 
   private void myTearDown() throws Exception {
-    if (cnode != null) { cnode.stop(); cnode.join(); }
     if (dfs != null) { dfs.shutdown(); }
   }
   
@@ -218,7 +183,9 @@ public class TestRaidDfs extends TestCas
                   numBlocks, blockSize);
     long length = fileSys.getFileStatus(srcFile).getLen();
 
-    waitForFileRaided(LOG, fileSys, srcFile, destPath);
+    RaidNode.doRaid(conf, fileSys.getFileStatus(srcFile),
+      destPath, code, new RaidNode.Statistics(), new RaidUtils.DummyProgressable(),
+      false, repl, repl, stripeLength);
 
     // Delete first block of file
     for (int blockNumToCorrupt : listBlockNumToCorrupt) {
@@ -240,29 +207,25 @@ public class TestRaidDfs extends TestCas
   public void testRaidDfsRs() throws Exception {
     LOG.info("Test testRaidDfs started.");
 
+    code = ErasureCodeType.RS;
     long blockSize = 8192L;
     int numBlocks = 8;
-    int stripeLength = 3;
-    mySetup("rs", stripeLength, 3);
+    stripeLength = 3;
+    mySetup("rs", 3);
 
-    // Create an instance of the RaidNode
-    Configuration localConf = new Configuration(conf);
-    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-    cnode = RaidNode.createRaidNode(null, localConf);
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     int[][] corrupt = {{1, 2, 3}, {1, 4, 7}, {3, 6, 7}};
     try {
       for (int i = 0; i < corrupt.length; i++) {
         Path file = new Path("/user/dhruba/raidtest/file" + i);
         corruptBlockAndValidate(
-            file, destPath, corrupt[0], blockSize, numBlocks);
+            file, new Path("/destraid"), corrupt[i], blockSize, numBlocks);
       }
     } catch (Exception e) {
       LOG.info("testRaidDfs Exception " + e +
                 StringUtils.stringifyException(e));
       throw e;
     } finally {
-      if (cnode != null) { cnode.stop(); cnode.join(); }
       myTearDown();
     }
     LOG.info("Test testRaidDfs completed.");
@@ -272,31 +235,40 @@ public class TestRaidDfs extends TestCas
    * Test DistributedRaidFileSystem.readFully()
    */
   public void testReadFully() throws Exception {
-    mySetup("xor", 3, 1);
+    code = ErasureCodeType.XOR;
+    stripeLength = 3;
+    mySetup("xor", 1);
 
     try {
       Path file = new Path("/user/raid/raidtest/file1");
-      createTestFile(fileSys, file, 1, 7, 8192L);
-
-      // filter all filesystem calls from client
-      Configuration clientConf = new Configuration(conf);
-      clientConf.set("fs.hdfs.impl",
-                      "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
-      clientConf.set("fs.raid.underlyingfs.impl",
-                      "org.apache.hadoop.hdfs.DistributedFileSystem");
-      URI dfsUri = dfs.getFileSystem().getUri();
-      FileSystem.closeAll();
-      FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
+      long crc = createTestFile(fileSys, file, 1, 8, 8192L);
+      FileStatus stat = fileSys.getFileStatus(file);
+      LOG.info("Created " + file + ", crc=" + crc + ", len=" + stat.getLen());
 
-      FileStatus stat = raidfs.getFileStatus(file);
       byte[] filebytes = new byte[(int)stat.getLen()];
+      // Test that readFully returns the correct CRC when there are no errors.
+      DistributedRaidFileSystem raidfs = getRaidFS();
       FSDataInputStream stm = raidfs.open(file);
-      // Test that readFully returns.
-      stm.readFully(filebytes, 0, (int)stat.getLen());
-
+      stm.readFully(0, filebytes);
+      assertEquals(crc, bufferCRC(filebytes));
+      stm.close();
+
+      // Generate parity.
+      RaidNode.doRaid(conf, fileSys.getFileStatus(file),
+        new Path("/destraid"), code, new RaidNode.Statistics(),
+        new RaidUtils.DummyProgressable(),
+        false, 1, 1, stripeLength);
+      int[] corrupt = {0, 4, 7}; // first, last and middle block
+      for (int blockIdx : corrupt) {
+        LOG.info("Corrupt block " + blockIdx + " of file " + file);
+        LocatedBlocks locations = getBlockLocations(file);
+        corruptBlock(file, locations.get(blockIdx).getBlock(),
+            NUM_DATANODES, true);
+      }
+      // Test that readFully returns the correct CRC when there are errors.
       stm = raidfs.open(file);
-      // Test that readFully returns.
-      stm.readFully(filebytes);
+      stm.readFully(0, filebytes);
+      assertEquals(crc, bufferCRC(filebytes));
     } finally {
       myTearDown();
     }
@@ -309,24 +281,23 @@ public class TestRaidDfs extends TestCas
   public void testAccessTime() throws Exception {
     LOG.info("Test testAccessTime started.");
 
+    code = ErasureCodeType.XOR;
     long blockSize = 8192L;
     int numBlocks = 8;
     int repl = 1;
-    mySetup("xor", 3, 1);
+    stripeLength = 3;
+    mySetup("xor", 1);
 
     Path file = new Path("/user/dhruba/raidtest/file");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     createTestFilePartialLastBlock(fileSys, file, repl, numBlocks, blockSize);
     FileStatus stat = fileSys.getFileStatus(file);
 
-    int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
     try {
-      // Create an instance of the RaidNode
-      Configuration localConf = new Configuration(conf);
-      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-      cnode = RaidNode.createRaidNode(null, localConf);
+      RaidNode.doRaid(conf, fileSys.getFileStatus(file),
+        new Path("/destraid"), code, new RaidNode.Statistics(),
+        new RaidUtils.DummyProgressable(), false, repl, repl, stripeLength);
 
-      waitForFileRaided(LOG, fileSys, file, destPath);
       FileStatus newStat = fileSys.getFileStatus(file);
 
       assertEquals(stat.getModificationTime(), newStat.getModificationTime());
@@ -335,6 +306,7 @@ public class TestRaidDfs extends TestCas
       myTearDown();
     }
   }
+
   /**
    * Create a file, corrupt a block in it and ensure that the file can be
    * read through DistributedRaidFileSystem by XOR code.
@@ -342,15 +314,11 @@ public class TestRaidDfs extends TestCas
   public void testRaidDfsXor() throws Exception {
     LOG.info("Test testRaidDfs started.");
 
+    code = ErasureCodeType.XOR;
     long blockSize = 8192L;
     int numBlocks = 8;
-    int stripeLength = 3;
-    mySetup("xor", stripeLength, 1);
-
-    // Create an instance of the RaidNode
-    Configuration localConf = new Configuration(conf);
-    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-    cnode = RaidNode.createRaidNode(null, localConf);
+    stripeLength = 3;
+    mySetup("xor", 1);
 
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
@@ -358,14 +326,13 @@ public class TestRaidDfs extends TestCas
       for (int i = 0; i < corrupt.length; i++) {
         Path file = new Path("/user/dhruba/raidtest/" + i);
         corruptBlockAndValidate(
-            file, destPath, corrupt[0], blockSize, numBlocks);
+            file, new Path("/destraid"), corrupt[i], blockSize, numBlocks);
       }
     } catch (Exception e) {
       LOG.info("testRaidDfs Exception " + e +
                 StringUtils.stringifyException(e));
       throw e;
     } finally {
-      if (cnode != null) { cnode.stop(); cnode.join(); }
       myTearDown();
     }
     LOG.info("Test testRaidDfs completed.");
@@ -421,6 +388,13 @@ public class TestRaidDfs extends TestCas
     stm.close();
     return crc.getValue();
   }
+
+  static long bufferCRC(byte[] buf) {
+    CRC32 crc = new CRC32();
+    crc.update(buf, 0, buf.length);
+    return crc.getValue();
+  }
+
   //
   // validates that file matches the crc.
   //



Mime
View raw message