hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1351548 - in /hadoop/common/trunk/hadoop-hdfs-project: ./ hadoop-hdfs-raid/ hadoop-hdfs-raid/src/ hadoop-hdfs-raid/src/main/ hadoop-hdfs-raid/src/main/conf/ hadoop-hdfs-raid/src/main/java/ hadoop-hdfs-raid/src/main/java/org/ hadoop-hdfs-ra...
Date Tue, 19 Jun 2012 00:55:30 GMT
Author: schen
Date: Tue Jun 19 00:55:28 2012
New Revision: 1351548

URL: http://svn.apache.org/viewvc?rev=1351548&view=rev
Log:
MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen)


Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/conf/
      - copied from r1350823, hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/raid/conf/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/
      - copied from r1350823, hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/raid/src/java/org/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/sbin/
      - copied from r1350823, hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/raid/bin/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/
      - copied from r1350823, hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/raid/src/test/org/
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/trunk/hadoop-hdfs-project/pom.xml

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml?rev=1351548&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/pom.xml Tue Jun 19 00:55:28 2012
@@ -0,0 +1,202 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project-dist</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project-dist</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-hdfs-raid</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>Apache Hadoop HDFS Raid</name>
+  <description>Apache Hadoop HDFS Raid</description>
+
+
+  <properties>
+    <hadoop.component>raid</hadoop.component>
+    <is.hadoop.component>false</is.hadoop.component>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-archives</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-mrapp-generated-classpath</id>
+            <phase>generate-test-resources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <!--
+              This is needed to run the unit tests. It generates the required classpath
+              that is required in the env of the launch container in the mini mr/yarn cluster.
+              -->
+              <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+      <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+          </excludes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-site-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>docs</id>
+                <phase>prepare-package</phase>
+                <goals>
+                  <goal>site</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <profile>
+      <id>dist</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <dependencies>
+              <dependency>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-assemblies</artifactId>
+                <version>${project.version}</version>
+              </dependency>
+            </dependencies>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+                <configuration>
+                  <finalName>${project.artifactId}-${project.version}</finalName>
+                  <appendAssemblyId>false</appendAssemblyId>
+                  <attach>false</attach>
+                  <descriptorRefs>
+                    <descriptorRef>hadoop-raid-dist</descriptorRef>
+                  </descriptorRefs>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>tar</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <target if="tar">
+                    <!-- Using Unix script to preserve symlinks -->
+                    <echo file="${project.build.directory}/dist-maketar.sh">
+
+                      which cygpath 2> /dev/null
+                      if [ $? = 1 ]; then
+                      BUILD_DIR="${project.build.directory}"
+                      else
+                      BUILD_DIR=`cygpath --unix '${project.build.directory}'`
+                      fi
+                      cd $BUILD_DIR
+                      tar czf ${project.artifactId}-${project.version}.tar.gz ${project.artifactId}-${project.version}
+                    </echo>
+                    <exec executable="sh" dir="${project.build.directory}" failonerror="true">
+                      <arg line="./dist-maketar.sh"/>
+                    </exec>
+                  </target>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java Tue Jun 19 00:55:28 2012
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
@@ -56,8 +58,10 @@ public class RaidBlockSender implements 
   private DataInputStream checksumIn; // checksum datastream
   private DataChecksum checksum; // checksum stream
   private long offset; // starting position to read
+  /** Initial position to read */
+  private long initialOffset;
   private long endOffset; // ending position
-  private int bytesPerChecksum; // chunk size
+  private int chunkSize; // chunk size
   private int checksumSize; // checksum size
   private boolean corruptChecksumOk; // if need to verify checksum
   private boolean chunkOffsetOK; // if need to send chunk offset
@@ -74,6 +78,8 @@ public class RaidBlockSender implements 
    * not sure if there will be much more improvement.
    */
   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
+      HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
   private volatile ChunkChecksum lastChunkChecksum = null;
 
   
@@ -125,12 +131,13 @@ public class RaidBlockSender implements 
        * is mostly corrupted. For now just truncate bytesPerchecksum to
        * blockLength.
        */        
-      bytesPerChecksum = checksum.getBytesPerChecksum();
-      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
+      int size = checksum.getBytesPerChecksum();
+      if (size > 10*1024*1024 && size > replicaVisibleLength) {
         checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
             Math.max((int)replicaVisibleLength, 10*1024*1024));
-        bytesPerChecksum = checksum.getBytesPerChecksum();        
+        size = checksum.getBytesPerChecksum();        
       }
+      chunkSize = size;
       checksumSize = checksum.getChecksumSize();
 
       if (length < 0) {
@@ -147,12 +154,12 @@ public class RaidBlockSender implements 
         throw new IOException(msg);
       }
       
-      offset = (startOffset - (startOffset % bytesPerChecksum));
+      offset = (startOffset - (startOffset % chunkSize));
       if (length >= 0) {
         // Make sure endOffset points to end of a checksumed chunk.
         long tmpLen = startOffset + length;
-        if (tmpLen % bytesPerChecksum != 0) {
-          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        if (tmpLen % chunkSize != 0) {
+          tmpLen += (chunkSize - tmpLen % chunkSize);
         }
         if (tmpLen < endOffset) {
           // will use on-disk checksum here since the end is a stable chunk
@@ -162,7 +169,7 @@ public class RaidBlockSender implements 
 
       // seek to the right offsets
       if (offset > 0) {
-        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        long checksumSkip = (offset / chunkSize) * checksumSize;
         // note blockInStream is seeked when created below
         if (checksumSkip > 0) {
           // Should we use seek() for checksum file as well?
@@ -178,7 +185,7 @@ public class RaidBlockSender implements 
       throw ioe;
     }
   }
-
+  
   /**
    * close opened files.
    */
@@ -227,57 +234,85 @@ public class RaidBlockSender implements 
     // otherwise just return the same exception.
     return ioe;
   }
-
+  
   /**
-   * Sends upto maxChunks chunks of data.
-   * 
-   * When blockInPosition is >= 0, assumes 'out' is a 
-   * {@link SocketOutputStream} and tries 
-   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
-   * send data (and updates blockInPosition).
+   * @param datalen Length of data 
+   * @return number of chunks for data of given size
+   */
+  private int numberOfChunks(long datalen) {
+    return (int) ((datalen + chunkSize - 1)/chunkSize);
+  }
+  
+  /**
+   * Write packet header into {@code pkt}
    */
-  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
-                         throws IOException {
-    // Sends multiple chunks in one packet with a single write().
-
-    int len = (int) Math.min(endOffset - offset,
-                             (((long) bytesPerChecksum) * ((long) maxChunks)));
-    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
-    int packetLen = len + numChunks*checksumSize + 4;
-    boolean lastDataPacket = offset + len == endOffset && len > 0;
+  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
-
-
-    PacketHeader header = new PacketHeader(
-      packetLen, offset, seqno, (len == 0), len);
+    PacketHeader header = new PacketHeader(packetLen, offset, seqno,
+        (dataLen == 0), dataLen, false);
     header.putInBuffer(pkt);
+  }
+  
+  /**
+   * Read checksum into given buffer
+   * @param buf buffer to read the checksum into
+   * @param checksumOffset offset at which to write the checksum into buf
+   * @param checksumLen length of checksum to write
+   * @throws IOException on error
+   */
+  private void readChecksum(byte[] buf, final int checksumOffset,
+      final int checksumLen) throws IOException {
+    if (checksumSize <= 0 && checksumIn == null) {
+      return;
+    }
+    try {
+      checksumIn.readFully(buf, checksumOffset, checksumLen);
+    } catch (IOException e) {
+      LOG.warn(" Could not read or failed to veirfy checksum for data"
+          + " at offset " + offset + " for block " + block, e);
+      IOUtils.closeStream(checksumIn);
+      checksumIn = null;
+      if (corruptChecksumOk) {
+        if (checksumOffset < checksumLen) {
+          // Just fill the array with zeros.
+          Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+  
+  /**
+   * Sends a packet with up to maxChunks chunks of data.
+   * 
+   * @param pkt buffer used for writing packet data
+   * @param maxChunks maximum number of chunks to send
+   * @param out stream to send data to
+   * @param transferTo use transferTo to send data
+   * @param throttler used for throttling data transfer bandwidth
+   */
+  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
+      boolean transferTo, DataTransferThrottler throttler) throws IOException {
+    int dataLen = (int) Math.min(endOffset - offset,
+                             (chunkSize * (long) maxChunks));
+    
+    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
+    int checksumDataLen = numChunks * checksumSize;
+    int packetLen = dataLen + checksumDataLen + 4;
+    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
+
+    writePacketHeader(pkt, dataLen, packetLen);
 
     int checksumOff = pkt.position();
-    int checksumLen = numChunks * checksumSize;
     byte[] buf = pkt.array();
     
     if (checksumSize > 0 && checksumIn != null) {
-      try {
-        checksumIn.readFully(buf, checksumOff, checksumLen);
-      } catch (IOException e) {
-        LOG.warn(" Could not read or failed to veirfy checksum for data" +
-                 " at offset " + offset + " for block " + block + " got : "
-                 + StringUtils.stringifyException(e));
-        IOUtils.closeStream(checksumIn);
-        checksumIn = null;
-        if (corruptChecksumOk) {
-          if (checksumOff < checksumLen) {
-            // Just fill the array with zeros.
-            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
-          }
-        } else {
-          throw e;
-        }
-      }
+      readChecksum(buf, checksumOff, checksumDataLen);
 
       // write in progress that we need to use to get last checksum
       if (lastDataPacket && lastChunkChecksum != null) {
-        int start = checksumOff + checksumLen - checksumSize;
+        int start = checksumOff + checksumDataLen - checksumSize;
         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
         
         if (updatedChecksum != null) {
@@ -286,62 +321,86 @@ public class RaidBlockSender implements 
       }
     }
     
-    int dataOff = checksumOff + checksumLen;
-    
-    if (blockInPosition < 0) {
-      //normal transfer
-      IOUtils.readFully(blockIn, buf, dataOff, len);
+    int dataOff = checksumOff + checksumDataLen;
+    if (!transferTo) { // normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, dataLen);
 
       if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
-
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            long failedPos = offset + len -dLeft;
-            throw new ChecksumException("Checksum failed at " + 
-                                        failedPos, failedPos);
-          }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
-        }
+        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
       }
-      //writing is done below (mainly to handle IOException)
     }
     
     try {
-      if (blockInPosition >= 0) {
-        //use transferTo(). Checks on out and blockIn are already done. 
-
+      if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        //first write the packet
-        sockOut.write(buf, 0, dataOff);
+        sockOut.write(buf, 0, dataOff); // First write checksum
+        
         // no need to flush. since we know out is not a buffered stream. 
-
         sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
-                                blockInPosition, len);
-
-        blockInPosition += len;
-      } else {
+                                blockInPosition, dataLen);
+        blockInPosition += dataLen;
+      } else { 
         // normal transfer
-        out.write(buf, 0, dataOff + len);
+        out.write(buf, 0, dataOff + dataLen);
       }
-      
     } catch (IOException e) {
-      /* exception while writing to the client (well, with transferTo(),
-       * it could also be while reading from the local file).
+      /* Exception while writing to the client. Connection closure from
+       * the other end is mostly the case and we do not care much about
+       * it. But other things can go wrong, especially in transferTo(),
+       * which we do not want to ignore.
+       *
+       * The message parsing below should not be considered as a good
+       * coding example. NEVER do it to drive a program logic. NEVER.
+       * It was done here because the NIO throws an IOException for EPIPE.
        */
+      String ioem = e.getMessage();
+      if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
+        LOG.error("BlockSender.sendChunks() exception: ", e);
+      }
       throw ioeToSocketException(e);
     }
 
-    return len;
+    if (throttler != null) { // rebalancing so throttle
+      throttler.throttle(packetLen);
+    }
+
+    return dataLen;
+  }
+  
+  /**
+   * Compute checksum for chunks and verify the checksum that is read from
+   * the metadata file is correct.
+   * 
+   * @param buf buffer that has checksum and data
+   * @param dataOffset position where data is written in the buf
+   * @param datalen length of data
+   * @param numChunks number of chunks corresponding to data
+   * @param checksumOffset offset where checksum is written in the buf
+   * @throws ChecksumException on failed checksum verification
+   */
+  public void verifyChecksum(final byte[] buf, final int dataOffset,
+      final int datalen, final int numChunks, final int checksumOffset)
+      throws ChecksumException {
+    int dOff = dataOffset;
+    int cOff = checksumOffset;
+    int dLeft = datalen;
+
+    for (int i = 0; i < numChunks; i++) {
+      checksum.reset();
+      int dLen = Math.min(dLeft, chunkSize);
+      checksum.update(buf, dOff, dLen);
+      if (!checksum.compare(buf, cOff)) {
+        long failedPos = offset + datalen - dLeft;
+        throw new ChecksumException("Checksum failed at " + failedPos,
+            failedPos);
+      }
+      dLeft -= dLen;
+      dOff += dLen;
+      cOff += checksumSize;
+    }
   }
 
+
   /**
    * sendBlock() is used to read block and its metadata and stream the data to
    * either a client or to another datanode. 
@@ -356,79 +415,61 @@ public class RaidBlockSender implements 
    */
   public long sendBlock(DataOutputStream out, OutputStream baseStream)
       throws IOException {
-    if( out == null ) {
+    if (out == null) {
       throw new IOException( "out stream is null" );
     }
-
-    long initialOffset = offset;
+    initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
-      try {
-        checksum.writeHeader(out);
-        if ( chunkOffsetOK ) {
-          out.writeLong( offset );
-        }
-        out.flush();
-      } catch (IOException e) { //socket error
-        throw ioeToSocketException(e);
-      }
-      
       int maxChunksPerPacket;
       int pktSize = PacketHeader.PKT_HEADER_LEN;
-      
-      if (transferToAllowed && !verifyChecksum && 
-          baseStream instanceof SocketOutputStream && 
-          blockIn instanceof FileInputStream) {
-        
+      boolean transferTo = transferToAllowed && !verifyChecksum
+          && baseStream instanceof SocketOutputStream
+          && blockIn instanceof FileInputStream;
+      if (transferTo) {
         FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-        
-        // blockInPosition also indicates sendChunks() uses transferTo.
         blockInPosition = fileChannel.position();
         streamForSendChunks = baseStream;
+        maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
-        // assure a mininum buffer size.
-        maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE, 
-                                       MIN_BUFFER_WITH_TRANSFERTO)
-                              + bytesPerChecksum - 1)/bytesPerChecksum;
-        
-        // allocate smaller buffer while using transferTo(). 
+        // Smaller packet size to only hold checksum when doing transferTo
         pktSize += checksumSize * maxChunksPerPacket;
       } else {
         maxChunksPerPacket = Math.max(1,
-            (HdfsConstants.IO_FILE_BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
-        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+            numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
+        // Packet size includes both checksum and data
+        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
-        long len = sendChunks(pktBuf, maxChunksPerPacket, 
-                              streamForSendChunks);
+        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
+            transferTo, null);
         offset += len;
-        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
-                            checksumSize);
+        totalRead += len + (numberOfChunks(len) * checksumSize);
         seqno++;
       }
       try {
         // send an empty packet to mark the end of the block
-        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
+        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+            null);
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
       }
+      blockReadFully = true;
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+            initialOffset, endTime - startTime));
       }
       close();
     }
-
-    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
     return totalRead;
   }
   
@@ -440,6 +481,13 @@ public class RaidBlockSender implements 
     public InputStream createStream(long offset) throws IOException; 
   }
   
+  /**
+   * @return the checksum type that will be used with this block transfer.
+   */
+  public DataChecksum getChecksum() {
+    return checksum;
+  }
+  
   private static class BlockInputStreamFactory implements InputStreamFactory {
     private final ExtendedBlock block;
     private final FsDatasetSpi<?> data;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java Tue Jun 19 00:55:28 2012
@@ -50,7 +50,7 @@ public class NameNodeRaidUtil {
       final boolean doAccessTime, final boolean needBlockToken
       ) throws FileNotFoundException, UnresolvedLinkException, IOException {
     return namesystem.getBlockLocations(src, offset, length,
-        doAccessTime, needBlockToken);
+        doAccessTime, needBlockToken, true);
   }
 }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/BlockFixer.java Tue Jun 19 00:55:28 2012
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.raid;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -44,14 +47,17 @@ import java.lang.reflect.InvocationTarge
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,6 +67,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.net.NetUtils;
 
@@ -649,7 +656,7 @@ public abstract class BlockFixer extends
       mdOut.writeShort(BlockMetadataHeader.VERSION);
       
       // Create a summer and write out its header.
-      int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+      int bytesPerChecksum = conf.getInt("dfs.bytes-per-checksum", 512);
       DataChecksum sum =
         DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
                                      bytesPerChecksum);
@@ -709,8 +716,8 @@ public abstract class BlockFixer extends
         blockContents.close();
         // Reopen
         blockContents = new FileInputStream(localBlockFile);
-        sendFixedBlock(datanode, blockContents, blockMetadata, block,
-                       blockSize);
+        sendFixedBlock(datanode, blockContents, blockMetadata, block, 
+            blockSize);
       } finally {
         if (blockContents != null) {
           blockContents.close();
@@ -780,9 +787,11 @@ public abstract class BlockFixer extends
                               });
         
         DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
+        DataChecksum checksum = blockSender.getChecksum();
         new Sender(out).writeBlock(block.getBlock(), block.getBlockToken(), "",
             nodes, null, BlockConstructionStage.PIPELINE_SETUP_CREATE,
-            1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(metadataIn));
+            1, 0L, blockSize, 0L, DataChecksum.newDataChecksum(
+                checksum.getChecksumType(), checksum.getBytesPerChecksum()));
         blockSender.sendBlock(out, baseStream);
         
         LOG.info("Sent block " + block.getBlock() + " to " + datanode.getName());

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DirectoryTraversal.java Tue Jun 19 00:55:28 2012
@@ -112,6 +112,8 @@ public class DirectoryTraversal {
 
   public List<FileStatus> getFilteredFiles(FileFilter filter, int limit) {
     List<FileStatus> filtered = new ArrayList<FileStatus>();
+    if (limit == 0) 
+      return filtered;
 
     // We need this semaphore to block when the number of running workitems
     // is equal to the number of threads. FixedThreadPool limits the number
@@ -120,20 +122,26 @@ public class DirectoryTraversal {
     Semaphore slots = new Semaphore(numThreads);
 
     while (true) {
-      synchronized(filtered) {
-        if (filtered.size() >= limit) break;
-      }
       FilterFileWorkItem work = null;
       try {
+        slots.acquire();
+        synchronized(filtered) {
+          if (filtered.size() >= limit) {
+            slots.release();
+            break;
+          }
+        }
         Node next = getNextDirectoryNode();
         if (next == null) {
+          slots.release();
           break;
         }
         work = new FilterFileWorkItem(filter, next, filtered, slots);
-        slots.acquire();
       } catch (InterruptedException ie) {
+        slots.release();
         break;
       } catch (IOException e) {
+        slots.release();
         break;
       }
       executor.execute(work);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/DistRaid.java Tue Jun 19 00:55:28 2012
@@ -277,6 +277,7 @@ public class DistRaid extends Configured
     */
    public boolean checkComplete() throws IOException {
      JobID jobID = runningJob.getJobID();
+     LOG.info("Checking job " + jobID);
      try {
       if (runningJob.isComplete()) {
          // delete job directory

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/GaloisField.java Tue Jun 19 00:55:28 2012
@@ -208,7 +208,7 @@ public class GaloisField {
    * @param len consider x and y only from 0...len-1
    */
   public void solveVandermondeSystem(int[] x, int[] y, int len) {
-    assert(x.length <= len && y.length <= len);
+    assert(y.length <= len);
     for (int i = 0; i < len - 1; i++) {
       for (int j = len - 1; j > i; j--) {
         y[j] = y[j] ^ mulTable[x[i]][y[j - 1]];
@@ -302,4 +302,49 @@ public class GaloisField {
     }
     return result;
   }
+
+  /**
+   * Perform Gaussian elimination on the given matrix. This matrix has to be a
+   * fat matrix (number of rows > number of columns).
+   */
+  public void gaussianElimination(int[][] matrix) {
+    assert(matrix != null && matrix.length > 0 && matrix[0].length > 0
+           && matrix.length < matrix[0].length);
+    int height = matrix.length;
+    int width = matrix[0].length;
+    for (int i = 0; i < height; i++) {
+      boolean pivotFound = false;
+      // scan the column for a nonzero pivot and swap it to the diagonal
+      for (int j = i; j < height; j++) {
+        if (matrix[i][j] != 0) {
+          int[] tmp = matrix[i];
+          matrix[i] = matrix[j];
+          matrix[j] = tmp;
+          pivotFound = true;
+          break;
+        }
+      }
+      if (!pivotFound) {
+        continue;
+      }
+      int pivot = matrix[i][i];
+      for (int j = i; j < width; j++) {
+        matrix[i][j] = divide(matrix[i][j], pivot);
+      }
+      for (int j = i + 1; j < height; j++) {
+        int lead = matrix[j][i];
+        for (int k = i; k < width; k++) {
+          matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
+        }
+      }
+    }
+    for (int i = height - 1; i >=0; i--) {
+      for (int j = 0; j < i; j++) {
+        int lead = matrix[j][i];
+        for (int k = i; k < width; k++) {
+          matrix[j][k] = add(matrix[j][k], multiply(lead, matrix[i][k]));
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/JobMonitor.java Tue Jun 19 00:55:28 2012
@@ -44,12 +44,13 @@ class JobMonitor implements Runnable {
   volatile boolean running = true;
 
   private Map<String, List<DistRaid>> jobs;
+  public static final String JOBMONITOR_INTERVAL_KEY = "raid.jobmonitor.interval";
   private long jobMonitorInterval;
   private volatile long jobsMonitored = 0;
   private volatile long jobsSucceeded = 0;
 
   public JobMonitor(Configuration conf) {
-    jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+    jobMonitorInterval = conf.getLong(JOBMONITOR_INTERVAL_KEY, 60000);
     jobs = new java.util.HashMap<String, List<DistRaid>>();
   }
 
@@ -112,6 +113,7 @@ class JobMonitor implements Runnable {
           } catch (IOException ioe) {
             // If there was an error, consider the job finished.
             addJob(finishedJobs, key, job);
+            LOG.error("JobMonitor exception", ioe);
           }
         }
       }
@@ -159,6 +161,17 @@ class JobMonitor implements Runnable {
   public long jobsSucceeded() {
     return this.jobsSucceeded;
   }
+  
+  // For test code
+  int runningJobsCount() {
+    int total = 0;
+    synchronized(jobs) {
+      for (String key: jobs.keySet()) {
+        total += jobs.get(key).size();
+      }
+    }
+    return total;
+  }
 
   private static void addJob(Map<String, List<DistRaid>> jobsMap,
                               String jobName, DistRaid job) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidNode.java Tue Jun 19 00:55:28 2012
@@ -80,6 +80,8 @@ public abstract class RaidNode implement
   }
   public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
   public static final long SLEEP_TIME = 10000L; // 10 seconds
+  public static final String TRIGGER_MONITOR_SLEEP_TIME_KEY = 
+      "hdfs.raid.trigger.monitor.sleep.time";
   public static final int DEFAULT_PORT = 60000;
   // Default stripe length = 5, parity length for RS code = 3
   public static final int DEFAULT_STRIPE_LENGTH = 5;
@@ -126,6 +128,7 @@ public abstract class RaidNode implement
 
   /** Deamon thread to trigger policies */
   Daemon triggerThread = null;
+  public static long triggerMonitorSleepTime = SLEEP_TIME;
 
   /** Deamon thread to delete obsolete parity files */
   PurgeMonitor purgeMonitor = null;
@@ -299,6 +302,10 @@ public abstract class RaidNode implement
     this.blockFixer = BlockFixer.createBlockFixer(conf);
     this.blockFixerThread = new Daemon(this.blockFixer);
     this.blockFixerThread.start();
+ // start the deamon thread to fire polcies appropriately
+    RaidNode.triggerMonitorSleepTime = conf.getLong(
+        TRIGGER_MONITOR_SLEEP_TIME_KEY, 
+        SLEEP_TIME);
 
     // start the deamon thread to fire polcies appropriately
     this.triggerThread = new Daemon(new TriggerMonitor());
@@ -503,7 +510,7 @@ public abstract class RaidNode implement
         }
       }
       while (running) {
-        Thread.sleep(SLEEP_TIME);
+        Thread.sleep(RaidNode.triggerMonitorSleepTime);
 
         boolean reloaded = configMgr.reloadConfigsIfNecessary();
         if (reloaded) {
@@ -542,7 +549,7 @@ public abstract class RaidNode implement
 
           // Apply the action on accepted paths
           LOG.info("Triggering Policy Action " + info.getName() +
-                   " " + info.getSrcPath());
+                   " " + info.getSrcPath() + " raid " + filteredPaths.size() + " files");
           try {
             raidFiles(info, filteredPaths);
           } catch (Exception e) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/RaidShell.java Tue Jun 19 00:55:28 2012
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.retry.RetryP
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -296,9 +297,22 @@ public class RaidShell extends Configure
     for (int i = startindex; i < argv.length; i = i + 2) {
       String path = argv[i];
       long corruptOffset = Long.parseLong(argv[i+1]);
-      LOG.debug("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
-      paths[j] = new Path(raidnode.recoverFile(path, corruptOffset));
-      LOG.debug("Raidshell created recovery file " + paths[j]);
+      LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
+      Path recovered = new Path("/tmp/recovered." + System.currentTimeMillis());
+      FileSystem fs = recovered.getFileSystem(conf);
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      Configuration raidConf = new Configuration(conf);
+      raidConf.set("fs.hdfs.impl",
+                     "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+      raidConf.set("fs.raid.underlyingfs.impl",
+                     "org.apache.hadoop.hdfs.DistributedFileSystem");
+      raidConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+      java.net.URI dfsUri = dfs.getUri();
+      FileSystem raidFs = FileSystem.get(dfsUri, raidConf);
+      FileUtil.copy(raidFs, new Path(path), fs, recovered, false, conf);
+
+      paths[j] = recovered;
+      LOG.info("Raidshell created recovery file " + paths[j]);
       j++;
     }
     return paths;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/main/java/org/apache/hadoop/raid/ReedSolomonCode.java Tue Jun 19 00:55:28 2012
@@ -17,6 +17,8 @@
  */
 
 package org.apache.hadoop.raid;
+import java.util.Set;
+
 
 public class ReedSolomonCode implements ErasureCode {
 
@@ -103,4 +105,79 @@ public class ReedSolomonCode implements 
   public int symbolSize() {
     return (int) Math.round(Math.log(GF.getFieldSize()) / Math.log(2));
   }
+
+  /**
+   * Given parity symbols followed by message symbols, return the locations of
+   * symbols that are corrupted. Can resolve up to (parity length / 2) error
+   * locations.
+   * @param data The message and parity. The parity should be placed in the
+   *             first part of the array. In each integer, the relevant portion
+   *             is present in the least significant bits of each int.
+   *             The number of elements in data is stripeSize() + paritySize().
+   *             <b>Note that data may be changed after calling this method.</b>
+   * @param errorLocations The set to put the error location results
+   * @return true If the locations can be resolved, return true.
+   */
+  public boolean computeErrorLocations(int[] data,
+      Set<Integer> errorLocations) {
+    assert(data.length == paritySize + stripeSize && errorLocations != null);
+    errorLocations.clear();
+    int maxError = paritySize / 2;
+    int[][] syndromeMatrix = new int[maxError][];
+    for (int i = 0; i < syndromeMatrix.length; ++i) {
+      syndromeMatrix[i] = new int[maxError + 1];
+    }
+    int[] syndrome = new int[paritySize];
+
+    if (computeSyndrome(data, syndrome)) {
+      // Parity check OK. No error location added.
+      return true;
+    }
+    for (int i = 0; i < maxError; ++i) {
+      for (int j = 0; j < maxError + 1; ++j) {
+        syndromeMatrix[i][j] = syndrome[i + j];
+      }
+    }
+    GF.gaussianElimination(syndromeMatrix);
+    int[] polynomial = new int[maxError + 1];
+    polynomial[0] = 1;
+    for (int i = 0; i < maxError; ++i) {
+      polynomial[i + 1] = syndromeMatrix[maxError - 1 - i][maxError];
+    }
+    for (int i = 0; i < paritySize + stripeSize; ++i) {
+      int possibleRoot = GF.divide(1, primitivePower[i]);
+      if (GF.substitute(polynomial, possibleRoot) == 0) {
+        errorLocations.add(i);
+      }
+    }
+    // Now recover with error locations and check the syndrome again
+    int[] locations = new int[errorLocations.size()];
+    int k = 0;
+    for (int loc : errorLocations) {
+      locations[k++] = loc;
+    }
+    int [] erasedValue = new int[locations.length];
+    decode(data, locations, erasedValue);
+    for (int i = 0; i < locations.length; ++i) {
+      data[locations[i]] = erasedValue[i];
+    }
+    return computeSyndrome(data, syndrome);
+  }
+
+  /**
+   * Compute the syndrome of the input [parity, message]
+   * @param data [parity, message]
+   * @param syndrome The syndromes (checksums) of the data
+   * @return true If syndromes are all zeros
+   */
+  private boolean computeSyndrome(int[] data, int [] syndrome) {
+    boolean corruptionFound = false;
+    for (int i = 0; i < paritySize; i++) {
+      syndrome[i] = GF.substitute(data, primitivePower[i]);
+      if (syndrome[i] != 0) {
+        corruptionFound = true;
+      }
+    }
+    return !corruptionFound;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Jun 19 00:55:28 2012
@@ -47,8 +47,8 @@ import org.apache.hadoop.util.StringUtil
 
 public class TestRaidDfs extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
-  final static String LOG_DIR = "/raidlog";
+      "target/test-data")).getAbsolutePath();
+  final static String LOG_DIR = "target/raidlog";
   final static long RELOAD_INTERVAL = 1000;
   final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
   final static int NUM_DATANODES = 3;
@@ -414,6 +414,7 @@ public class TestRaidDfs extends TestCas
     LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
     if (newcrc.getValue() != crc) {
       LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
+      return false;
     }
     return true;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestBlockFixer.java Tue Jun 19 00:55:28 2012
@@ -26,12 +26,14 @@ import java.util.List;
 import java.util.Random;
 import java.util.zip.CRC32;
 
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +41,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -53,9 +57,11 @@ public class TestBlockFixer {
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestBlockFixer");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
+  public static final String DistBlockFixer_JAR =
+      JarFinder.getJar(DistBlockFixer.class);
   final static long RELOAD_INTERVAL = 1000;
   final static int NUM_DATANODES = 3;
   Configuration conf;
@@ -546,6 +552,8 @@ public class TestBlockFixer {
 
     conf.setBoolean("dfs.permissions", false);
 
+    conf.set("mapreduce.framework.name", "yarn");
+
     dfs = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
     dfs.waitActive();
     fileSys = dfs.getFileSystem();
@@ -553,11 +561,28 @@ public class TestBlockFixer {
 
     FileSystem.setDefaultUri(conf, namenode);
     mr = new MiniMRCluster(4, namenode, 3);
-    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    JobConf jobConf = mr.createJobConf();
+    jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
     FileSystem.setDefaultUri(conf, namenode);
     conf.set("mapred.job.tracker", jobTrackerName);
+    conf.set("mapreduce.framework.name", "yarn");
+    String rmAdress = jobConf.get("yarn.resourcemanager.address");
+    if (rmAdress != null) {
+      conf.set("yarn.resourcemanager.address", rmAdress);
+    }
+    String schedulerAdress =
+      jobConf.get("yarn.resourcemanager.scheduler.address");
+    if (schedulerAdress != null) {
+      conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+    }
+    String jobHistoryAddress = 
+        jobConf.get("mapreduce.jobhistory.address");
+    if (jobHistoryAddress != null) {
+      conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+    }
+    conf.set(JobContext.JAR, TestBlockFixer.DistBlockFixer_JAR);
     
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
@@ -609,10 +634,11 @@ public class TestBlockFixer {
     if (dfs != null) { dfs.shutdown(); }
   }
 
-  private long getCRC(FileSystem fs, Path p) throws IOException {
+  public static long getCRC(FileSystem fs, Path p) throws IOException {
     CRC32 crc = new CRC32();
     FSDataInputStream stm = fs.open(p);
-    for (int b = 0; b > 0; b = stm.read()) {
+    int b;
+    while ((b = stm.read())>=0) {
       crc.update(b);
     }
     stm.close();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestDirectoryTraversal.java Tue Jun 19 00:55:28 2012
@@ -40,7 +40,7 @@ public class TestDirectoryTraversal exte
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestDirectoryTraversal");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
 
   MiniDFSCluster dfs = null;
   FileSystem fs = null;
@@ -211,7 +211,7 @@ public class TestDirectoryTraversal exte
 
   private void mySetup() throws IOException {
     conf = new Configuration();
-    dfs = new MiniDFSCluster(conf, 6, true, null);
+    dfs = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     dfs.waitActive();
     fs = dfs.getFileSystem();
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestErasureCodes.java Tue Jun 19 00:55:28 2012
@@ -169,6 +169,57 @@ public class TestErasureCodes extends Te
     assertTrue("Decode failed", java.util.Arrays.equals(copy, message[0]));
   }
 
+  public void testComputeErrorLocations() {
+    for (int i = 0; i < TEST_TIMES; ++i) {
+      verifyErrorLocations(10, 4, 1);
+      verifyErrorLocations(10, 4, 2);
+    }
+  }
+
+  public void verifyErrorLocations(int stripeSize, int paritySize, int errors) {
+    int[] message = new int[stripeSize];
+    int[] parity = new int[paritySize];
+    Set<Integer> errorLocations = new HashSet<Integer>();
+    for (int i = 0; i < message.length; ++i) {
+      message[i] = RAND.nextInt(256);
+    }
+    while (errorLocations.size() < errors) {
+      int loc = RAND.nextInt(stripeSize + paritySize);
+      errorLocations.add(loc);
+    }
+    ReedSolomonCode codec = new ReedSolomonCode(stripeSize, paritySize);
+    codec.encode(message, parity);
+    int[] data = combineArrays(parity, message);
+    for (Integer i : errorLocations) {
+      data[i] = randError(data[i]);
+    }
+    Set<Integer> recoveredLocations = new HashSet<Integer>();
+    boolean resolved = codec.computeErrorLocations(data, recoveredLocations);
+    if (resolved) {
+      assertEquals(errorLocations, recoveredLocations);
+    }
+  }
+
+  private int randError(int actual) {
+    while (true) {
+      int r = RAND.nextInt(256);
+      if (r != actual) {
+        return r;
+      }
+    }
+  }
+
+  private int[] combineArrays(int[] array1, int[] array2) {
+    int[] result = new int[array1.length + array2.length];
+    for (int i = 0; i < array1.length; ++i) {
+      result[i] = array1[i];
+    }
+    for (int i = 0; i < array2.length; ++i) {
+      result[i + array1.length] = array2[i];
+    }
+    return result;
+  }
+
   private int[] randomErasedLocation(int erasedLen, int dataLen) {
     int[] erasedLocations = new int[erasedLen];
     for (int i = 0; i < erasedLen; i++) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidFilter.java Tue Jun 19 00:55:28 2012
@@ -36,7 +36,7 @@ import org.apache.hadoop.raid.protocol.P
 
 public class TestRaidFilter extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
   final static Log LOG =
     LogFactory.getLog("org.apache.hadoop.raid.TestRaidFilter");
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidHar.java Tue Jun 19 00:55:28 2012
@@ -22,6 +22,7 @@ import java.io.FileWriter;
 import java.io.FileNotFoundException;
 import java.util.Random;
 
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 
 /**
@@ -41,7 +43,7 @@ import org.apache.hadoop.mapred.MiniMRCl
  */
 public class TestRaidHar extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+     "target/test-data")).getAbsolutePath();
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
@@ -96,11 +98,27 @@ public class TestRaidHar extends TestCas
     fileSys = dfs.getFileSystem();
     namenode = fileSys.getUri().toString();
     mr = new MiniMRCluster(taskTrackers, namenode, 3);
-    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    JobConf jobConf = mr.createJobConf();
+    jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
     FileSystem.setDefaultUri(conf, namenode);
     conf.set("mapred.job.tracker", jobTrackerName);
+    conf.set("mapreduce.framework.name", "yarn");
+    String rmAdress = jobConf.get("yarn.resourcemanager.address");
+    if (rmAdress != null) {
+      conf.set("yarn.resourcemanager.address", rmAdress);
+    }
+    String schedulerAdress =
+      jobConf.get("yarn.resourcemanager.scheduler.address");
+    if (schedulerAdress != null) {
+      conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+    }
+    String jobHistoryAddress = 
+        jobConf.get("mapreduce.jobhistory.address");
+    if (jobHistoryAddress != null) {
+      conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+    }
   }
     
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidNode.java Tue Jun 19 00:55:28 2012
@@ -37,9 +37,13 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
 
 /**
@@ -49,7 +53,8 @@ import org.apache.hadoop.raid.protocol.P
   */
 public class TestRaidNode extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
+  public static final String DistRaid_JAR = JarFinder.getJar(DistRaid.class);
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
@@ -76,6 +81,8 @@ public class TestRaidNode extends TestCa
     conf.setBoolean("raid.config.reload", true);
     conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
     conf.setBoolean("dfs.permissions.enabled", true);
+    conf.setLong(JobMonitor.JOBMONITOR_INTERVAL_KEY, 20000);
+    conf.setLong(RaidNode.TRIGGER_MONITOR_SLEEP_TIME_KEY, 3000L);
 
     // scan all policies once every 5 second
     conf.setLong("raid.policy.rescan.interval", 5000);
@@ -103,11 +110,27 @@ public class TestRaidNode extends TestCa
     namenode = fileSys.getUri().toString();
     final int taskTrackers = 4;
     mr = new MiniMRCluster(taskTrackers, namenode, 3);
-    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    JobConf jobConf = mr.createJobConf();
+    jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
     FileSystem.setDefaultUri(conf, namenode);
     conf.set("mapred.job.tracker", jobTrackerName);
+    conf.set("mapreduce.framework.name", "yarn");
+    String rmAdress = jobConf.get("yarn.resourcemanager.address");
+    if (rmAdress != null) {
+      conf.set("yarn.resourcemanager.address", rmAdress);
+    }
+    String schedulerAdress =
+      jobConf.get("yarn.resourcemanager.scheduler.address");
+    if (schedulerAdress != null) {
+      conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+    }
+    String jobHistoryAddress = 
+        jobConf.get("mapreduce.jobhistory.address");
+    if (jobHistoryAddress != null) {
+      conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+    }
   }
 
   class ConfigBuilder {
@@ -238,9 +261,9 @@ public class TestRaidNode extends TestCa
     LOG.info("Test testPathFilter started.");
 
     long blockSizes    []  = {1024L};
-    long stripeLengths []  = {1, 2, 5, 6, 10, 11, 12};
-    long targetReplication = 1;
-    long metaReplication   = 1;
+    int stripeLengths []  = {5, 6, 10, 11, 12};
+    int targetReplication = 1;
+    int metaReplication   = 1;
     int  numBlock          = 11;
     int  iter = 0;
 
@@ -284,7 +307,8 @@ public class TestRaidNode extends TestCa
       LOG.info("doTestPathFilter created test files for iteration " + iter);
 
       // create an instance of the RaidNode
-      cnode = RaidNode.createRaidNode(null, conf);
+      Configuration localConf = new Configuration(conf);
+      cnode = RaidNode.createRaidNode(null, localConf);
       FileStatus[] listPaths = null;
 
       // wait till file is raided
@@ -314,7 +338,6 @@ public class TestRaidNode extends TestCa
       }
       // assertEquals(listPaths.length, 1); // all files raided
       LOG.info("doTestPathFilter all files found in Raid.");
-      Thread.sleep(20000); // Without this wait, unit test crashes
 
       // check for error at beginning of file
       shell = new RaidShell(conf);
@@ -466,16 +489,23 @@ public class TestRaidNode extends TestCa
     LOG.info("doCheckPolicy completed:");
   }
 
-  private void createTestFiles(String path, String destpath) throws IOException {
+  static public void createTestFiles(FileSystem fileSys, 
+      String path, String destpath, int nfile,
+      int nblock) throws IOException {
+    createTestFiles(fileSys, path, destpath, nfile, nblock, (short)1);
+  }
+
+  static void createTestFiles(FileSystem fileSys, String path, String destpath, int nfile,
+      int nblock, short repl) throws IOException {
     long blockSize         = 1024L;
     Path dir = new Path(path);
     Path destPath = new Path(destpath);
     fileSys.delete(dir, true);
     fileSys.delete(destPath, true);
    
-    for(int i = 0 ; i < 10; i++){
+    for(int i = 0 ; i < nfile; i++){
       Path file = new Path(path + "file" + i);
-      createOldFile(fileSys, file, 1, 7, blockSize);
+      createOldFile(fileSys, file, repl, nblock, blockSize);
     }
   }
 
@@ -499,12 +529,15 @@ public class TestRaidNode extends TestCa
 
     RaidNode cnode = null;
     try {
-      createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
-      createTestFiles("/user/dhruba/raidtest2/", "/destraid/user/dhruba/raidtest2");
+      createTestFiles(fileSys, "/user/dhruba/raidtest/",
+          "/destraid/user/dhruba/raidtest", 5, 7);
+      createTestFiles(fileSys, "/user/dhruba/raidtest2/",
+          "/destraid/user/dhruba/raidtest2", 5, 7);
       LOG.info("Test testDistRaid created test files");
 
       Configuration localConf = new Configuration(conf);
       localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
       cnode = RaidNode.createRaidNode(null, localConf);
       // Verify the policies are parsed correctly
       for (PolicyList policyList : cnode.getAllPolicies()) {
@@ -540,15 +573,13 @@ public class TestRaidNode extends TestCa
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
-
+      
       start = System.currentTimeMillis();
       while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
-
+      assertEquals(dcnode.jobMonitor.jobsSucceeded(), dcnode.jobMonitor.jobsMonitored());
       LOG.info("Test testDistRaid successful.");
       
     } catch (Exception e) {
@@ -647,24 +678,19 @@ public class TestRaidNode extends TestCa
 
     RaidNode cnode = null;
     try {
-      createTestFiles(
-        "/user/dhruba/raidtest/1/", "/destraid/user/dhruba/raidtest/1");
-      createTestFiles(
-        "/user/dhruba/raidtest/2/", "/destraid/user/dhruba/raidtest/2");
-      createTestFiles(
-        "/user/dhruba/raidtest/3/", "/destraid/user/dhruba/raidtest/3");
-      createTestFiles(
-        "/user/dhruba/raidtest/4/", "/destraid/user/dhruba/raidtest/4");
+      for(int i = 0; i < 4; i++){
+        Path file = new Path("/user/dhruba/raidtest/dir" + i + "/file" + i);
+        createOldFile(fileSys, file, 1, 7, 1024L);
+      }
+
       LOG.info("Test testSuspendTraversal created test files");
 
       Configuration localConf = new Configuration(conf);
-      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
-      localConf.setInt("raid.distraid.max.files", 3);
+      localConf.setInt("raid.distraid.max.jobs", 2);
+      localConf.setInt("raid.distraid.max.files", 2);
       localConf.setInt("raid.directorytraversal.threads", 1);
-      // This is too dependent on the implementation of getFilteredFiles().
-      // It relies on the threading behavior where two directories are traversed
-      // before returning because the list of files is modified in a separate
-      // thread from the one that decides if there are enough files.
+      localConf.set(JobContext.JAR, TestRaidNode.DistRaid_JAR);
+      // 4 test files: 2 jobs with 2 files each.
       final int numJobsExpected = 2;
       cnode = RaidNode.createRaidNode(null, localConf);
 
@@ -677,10 +703,20 @@ public class TestRaidNode extends TestCa
       start = System.currentTimeMillis();
       while (dcnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
+        LOG.info("Waiting for num jobs succeeded " + dcnode.jobMonitor.jobsSucceeded() + 
+         " to reach " + numJobsExpected);
+        Thread.sleep(3000);
+      }
+      // Wait for any running jobs to finish.
+      start = System.currentTimeMillis();
+      while (dcnode.jobMonitor.runningJobsCount() > 0 &&
+             System.currentTimeMillis() - start < MAX_WAITTIME) {
+        LOG.info("Waiting for zero running jobs: " +
+             dcnode.jobMonitor.runningJobsCount());
         Thread.sleep(1000);
       }
-      assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
-      assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+      assertEquals(numJobsExpected, dcnode.jobMonitor.jobsMonitored());
+      assertEquals(numJobsExpected, dcnode.jobMonitor.jobsSucceeded());
 
       LOG.info("Test testSuspendTraversal successful.");
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidPurge.java Tue Jun 19 00:55:28 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.raid.protocol.P
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.hdfs.TestRaidDfs;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 /**
@@ -58,7 +59,7 @@ import org.apache.hadoop.raid.protocol.P
  */
 public class TestRaidPurge extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
   final static String CONFIG_FILE = new File(TEST_DIR, 
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
@@ -113,11 +114,27 @@ public class TestRaidPurge extends TestC
     fileSys = dfs.getFileSystem();
     namenode = fileSys.getUri().toString();
     mr = new MiniMRCluster(taskTrackers, namenode, 3);
-    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    JobConf jobConf = mr.createJobConf();
+    jobTrackerName = "localhost:" + jobConf.get(JTConfig.JT_IPC_ADDRESS);
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
 
     FileSystem.setDefaultUri(conf, namenode);
     conf.set("mapred.job.tracker", jobTrackerName);
+    conf.set("mapreduce.framework.name", "yarn");
+    String rmAdress = jobConf.get("yarn.resourcemanager.address");
+    if (rmAdress != null) {
+      conf.set("yarn.resourcemanager.address", rmAdress);
+    }
+    String schedulerAdress =
+      jobConf.get("yarn.resourcemanager.scheduler.address");
+    if (schedulerAdress != null) {
+      conf.set("yarn.resourcemanager.scheduler.address", schedulerAdress);
+    }
+    String jobHistoryAddress = 
+        jobConf.get("mapreduce.jobhistory.address");
+    if (jobHistoryAddress != null) {
+      conf.set("mapreduce.jobhistory.address", jobHistoryAddress);
+    }
   }
     
   /**
@@ -235,6 +252,7 @@ public class TestRaidPurge extends TestC
 
       // create an instance of the RaidNode
       Configuration localConf = new Configuration(conf);
+      
       localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
       cnode = RaidNode.createRaidNode(null, localConf);
       FileStatus[] listPaths = null;
@@ -299,7 +317,7 @@ public class TestRaidPurge extends TestC
     createClusters(true);
     mySetup(1, 1, 5, harDelay);
     Path dir = new Path("/user/dhruba/raidtest/");
-    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    Path destPath = new Path("/raid/user/dhruba/raidtest");
     Path file1 = new Path(dir + "/file");
     RaidNode cnode = null;
     try {
@@ -308,7 +326,6 @@ public class TestRaidPurge extends TestC
 
       // create an instance of the RaidNode
       Configuration localConf = new Configuration(conf);
-      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
       cnode = RaidNode.createRaidNode(null, localConf);
 
       // Wait till har is created.
@@ -334,14 +351,7 @@ public class TestRaidPurge extends TestC
       boolean found = false;
       FileStatus[] listPaths = null;
       while (!found || listPaths == null || listPaths.length > 1) {
-        try {
-          listPaths = fileSys.listStatus(destPath);
-        } catch (FileNotFoundException e) {
-          // If the parent directory is deleted because the har is deleted
-          // and the parent is empty, try again.
-          Thread.sleep(1000);
-          continue;
-        }
+        listPaths = fileSys.listStatus(destPath);
         if (listPaths != null) {
           for (FileStatus s: listPaths) {
             LOG.info("testPurgeHar waiting for parity file to be recreated" +

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShell.java Tue Jun 19 00:55:28 2012
@@ -47,7 +47,7 @@ public class TestRaidShell extends TestC
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestRaidShell");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+      "target/test-data")).getAbsolutePath();
   final static String CONFIG_FILE = new File(TEST_DIR,
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
@@ -249,7 +249,8 @@ public class TestRaidShell extends TestC
   private long getCRC(FileSystem fs, Path p) throws IOException {
     CRC32 crc = new CRC32();
     FSDataInputStream stm = fs.open(p);
-    for (int b = 0; b > 0; b = stm.read()) {
+    int b;
+    while ((b = stm.read())>=0) {
       crc.update(b);
     }
     stm.close();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestRaidShellFsck.java Tue Jun 19 00:55:28 2012
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *     http:www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -51,8 +51,8 @@ public class TestRaidShellFsck {
     LogFactory.getLog("org.apache.hadoop.raid.TestRaidShellFsck");
   final static String TEST_DIR = 
     new File(System.
-             getProperty("test.build.data", "build/contrib/raid/test/data")).
-    getAbsolutePath();
+             getProperty("test.build.data", "target/test-data")).getAbsolutePath();
+
   final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").
     getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
@@ -262,7 +262,7 @@ public class TestRaidShellFsck {
               }
               
             } else {
-              // case without HAR
+               // case without HAR
               for (FileStatus f : listPaths) {
                 Path found = new Path(f.getPath().toUri().getPath());
                 if (parityFilePath.equals(found)) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonDecoder.java Tue Jun 19 00:55:28 2012
@@ -42,7 +42,7 @@ public class TestReedSolomonDecoder exte
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestReedSolomonDecoder");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+        "target/test-data")).getAbsolutePath();
   final static int NUM_DATANODES = 3;
 
   Configuration conf;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java?rev=1351548&r1=1350823&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-raid/src/test/java/org/apache/hadoop/raid/TestReedSolomonEncoder.java Tue Jun 19 00:55:28 2012
@@ -49,7 +49,7 @@ public class TestReedSolomonEncoder exte
   final static Log LOG = LogFactory.getLog(
                             "org.apache.hadoop.raid.TestReedSolomonEncoder");
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "build/contrib/raid/test/data")).getAbsolutePath();
+        "target/test-data")).getAbsolutePath();
   final static int NUM_DATANODES = 3;
 
   Configuration conf;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1351548&r1=1351547&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Jun 19 00:55:28 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.blo
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
-class INodeFile extends INode implements BlockCollection {
+public class INodeFile extends INode implements BlockCollection {
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 
   //Number of bits for Block size

Modified: hadoop/common/trunk/hadoop-hdfs-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/pom.xml?rev=1351548&r1=1351547&r2=1351548&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/pom.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/pom.xml Tue Jun 19 00:55:28 2012
@@ -34,6 +34,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
     <module>hadoop-hdfs</module>
     <module>hadoop-hdfs-httpfs</module>
     <module>hadoop-hdfs/src/contrib/bkjournal</module>
+    <module>hadoop-hdfs-raid</module>
   </modules>
 
   <build>



Mime
View raw message