hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r808037 [3/3] - in /hadoop/hdfs/branches/HDFS-326: ./ ivy/ lib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/h...
Date Wed, 26 Aug 2009 15:03:35 GMT
Propchange: hadoop/hdfs/branches/HDFS-326/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -0,0 +1,3 @@
+/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
+/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
+/hadoop/core/trunk/CHANGES.txt:776175-785643,785929-786278

Modified: hadoop/hdfs/branches/HDFS-326/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/build.xml?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/build.xml Wed Aug 26 15:03:33 2009
@@ -636,6 +636,7 @@
         </batchtest>
         <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
           <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
+          <fileset dir="${test.src.dir}/hdfs" includes="**/${testcase}.java"/>
         </batchtest>
       </junit>
       <antcall target="checkfailure"/>
@@ -695,6 +696,7 @@
       </batchtest>
       <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
         <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
+        <fileset dir="${test.src.dir}/hdfs-with-mr" includes="**/${testcase}.java"/>
       </batchtest>
     </junit>
     <antcall target="checkfailure"/>

Propchange: hadoop/hdfs/branches/HDFS-326/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
+/hadoop/hdfs/trunk/build.xml:804973-807690

Modified: hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/ivy/ivysettings.xml Wed Aug 26 15:03:33 2009
@@ -74,7 +74,7 @@
     rather than look for them online.
 
     -->
-    <module organisation="org.apache.hadoop" name="Hadoop.*" resolver="internal"/>
+   
     <!--until commons cli is external, we need to pull it in from the snapshot repository -if present -->
     <module organisation="org.apache.commons" name=".*" resolver="external-and-snapshots"/>
   </modules>

Modified: hadoop/hdfs/branches/HDFS-326/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/ivy/libraries.properties?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/ivy/libraries.properties (original)
+++ hadoop/hdfs/branches/HDFS-326/ivy/libraries.properties Wed Aug 26 15:03:33 2009
@@ -1,3 +1,4 @@
+
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   You may obtain a copy of the License at

Modified: hadoop/hdfs/branches/HDFS-326/ivybuild.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/ivybuild.xml?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/ivybuild.xml (original)
+++ hadoop/hdfs/branches/HDFS-326/ivybuild.xml Wed Aug 26 15:03:33 2009
@@ -262,8 +262,10 @@
       </exec>
     </presetdef>
     <property name="issue" value="HDFS-326"/>
+    <property name="issue" value="MAPREDUCE-233"/>
+    <property name="hadoop.svn.host" value="svn.apache.org" />
     <property name="hadoop-svn"
-      value="https://svn.apache.org/repos/asf/hadoop/common"/>
+      value="https://${hadoop.svn.host}/repos/asf/hadoop/hdfs"/>    
     <property name="trunk"
       value="${hadoop-svn}/trunk"/>
     <property name="branch"

Modified: hadoop/hdfs/branches/HDFS-326/lib/hadoop-core-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/lib/hadoop-core-0.21.0-dev.jar?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/hdfs/branches/HDFS-326/lib/hadoop-core-test-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/lib/hadoop-core-test-0.21.0-dev.jar?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-0.21.0-dev.jar?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-examples-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-examples-0.21.0-dev.jar?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-test-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/lib/hadoop-mapred-test-0.21.0-dev.jar?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
Binary files - no diff available.

Propchange: hadoop/hdfs/branches/HDFS-326/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:804973-807690

Propchange: hadoop/hdfs/branches/HDFS-326/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
+/hadoop/hdfs/trunk/src/java:804973-807690

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Aug 26 15:03:33 2009
@@ -2346,7 +2346,7 @@
     // it. When all the packets for a block are sent out and acks for each
     // if them are received, the DataStreamer closes the current block.
     //
-    private class DataStreamer extends Daemon {
+    class DataStreamer extends Daemon {
       private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
       private int recoveryErrorCount = 0; // number of times block recovery failed
       private volatile boolean streamerClosed = false;
@@ -2356,8 +2356,8 @@
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
-      private volatile boolean hasError = false;
-      private volatile int errorIndex = 0;
+      volatile boolean hasError = false;
+      volatile int errorIndex = 0;
   
       /*
        * streamer thread is the only thread that opens streams to datanode, 
@@ -2838,7 +2838,8 @@
           LOG.debug("Connecting to " + nodes[0].getName());
           InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
           s = socketFactory.createSocket();
-          int timeoutValue = (socketTimeout > 0) ? (3000 * nodes.length + socketTimeout) : 0;
+          int timeoutValue = (socketTimeout > 0) ? (HdfsConstants.READ_TIMEOUT_EXTENSION
+              * nodes.length + socketTimeout) : 0;
           NetUtils.connect(s, target, timeoutValue);
           s.setSoTimeout(timeoutValue);
           s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Aug 26 15:03:33 2009
@@ -258,7 +258,9 @@
   @Override
   public FileStatus[] listStatus(Path p) throws IOException {
     FileStatus[] infos = dfs.listPaths(getPathName(p));
-    if (infos == null) return null;
+    if (infos == null) 
+      throw new FileNotFoundException("File " + p + " does not exist.");
+    
     FileStatus[] stats = new FileStatus[infos.length];
     for (int i = 0; i < infos.length; i++) {
       stats[i] = makeQualified(infos[i]);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Aug 26 15:03:33 2009
@@ -56,6 +56,7 @@
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
 
   public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
+  public static final int MIN_NUM_OF_VALID_VOLUMES = 1;// for a DN to run
 
   // SafeMode actions
   public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Wed Aug 26 15:03:33 2009
@@ -60,6 +60,7 @@
 
   // Timeouts for communicating with DataNode for streaming writes/reads
   public static int READ_TIMEOUT = 60 * 1000;
+  public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 26 15:03:33 2009
@@ -119,6 +119,9 @@
       
       // check if there is a disk error
       IOException cause = FSDataset.getCauseIfDiskError(ioe);
+      DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
+          cause);
+      
       if (cause != null) { // possible disk error
         ioe = cause;
         datanode.checkDiskError(ioe); // may throw an exception here
@@ -833,7 +836,14 @@
             SUCCESS.write(replyOut);
             replyOut.flush();
         } catch (Exception e) {
+          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
           if (running) {
+            try {
+              datanode.checkDiskError(e); // may throw an exception here
+            } catch (IOException ioe) {
+              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
+                  ioe);
+            }
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
@@ -993,7 +1003,13 @@
               running = false;
             }
         } catch (IOException e) {
+          LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
+            try {
+              datanode.checkDiskError(e); // may throw an exception here
+            } catch (IOException ioe) {
+              LOG.warn("DataNode.chekDiskError failed in run() with: ", ioe);
+            }
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Aug 26 15:03:33 2009
@@ -290,8 +290,9 @@
           int dLen = Math.min(dLeft, bytesPerChecksum);
           checksum.update(buf, dOff, dLen);
           if (!checksum.compare(buf, cOff)) {
+            long failedPos = offset + len -dLeft;
             throw new ChecksumException("Checksum failed at " + 
-                                        (offset + len - dLeft), len);
+                                        failedPos, failedPos);
           }
           dLeft -= dLen;
           dOff += dLen;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Aug 26 15:03:33 2009
@@ -764,11 +764,14 @@
   }
   
   
-  /* Check if there is no space in disk or the disk is read-only
-   *  when IOException occurs. 
-   * If so, handle the error */
-  protected void checkDiskError( IOException e ) throws IOException {
-    if (e.getMessage() != null && 
+  /** Check if there is no space in disk 
+   *  @param e that caused this checkDiskError call
+   **/
+  protected void checkDiskError(Exception e ) throws IOException {
+    
+    LOG.warn("checkDiskError: exception: ", e);
+    
+    if (e.getMessage() != null &&
         e.getMessage().startsWith("No space left on device")) {
       throw new DiskOutOfSpaceException("No space left on device");
     } else {
@@ -776,8 +779,11 @@
     }
   }
   
-  /* Check if there is no disk space and if so, handle the error*/
-  protected void checkDiskError( ) throws IOException {
+  /**
+   *  Check if there is a disk failure and if so, handle the error
+   *
+   **/
+  protected void checkDiskError( ) {
     try {
       data.checkDataDir();
     } catch(DiskErrorException de) {
@@ -786,13 +792,31 @@
   }
   
   private void handleDiskError(String errMsgr) {
-    LOG.warn("DataNode is shutting down.\n" + errMsgr);
-    shouldRun = false;
+    boolean hasEnoughResource = data.hasEnoughResource();
+    LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResource);
+    
+    //if hasEnoughtResource = true - more volumes are available, so we don't want 
+    // to shutdown DN completely and don't want NN to remove it.
+    int dp_error = DatanodeProtocol.DISK_ERROR;
+    if(hasEnoughResource == false) {
+      // DN will be shutdown and NN should remove it
+      dp_error = DatanodeProtocol.FATAL_DISK_ERROR;
+    }
+    //inform NameNode
     try {
       namenode.errorReport(
-                           dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
+                           dnRegistration, dp_error, errMsgr);
     } catch(IOException ignored) {              
     }
+    
+    
+    if(hasEnoughResource) {
+      scheduleBlockReport(0);
+      return; // do not shutdown
+    }
+    
+    LOG.warn("DataNode is shutting down.\n" + errMsgr);
+    shouldRun = false; 
   }
     
   /** Number of concurrent xceivers per node. */
@@ -1308,6 +1332,9 @@
       } catch (IOException ie) {
         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
+        // check if there are any disk problem
+        datanode.checkDiskError();
+        
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -1465,8 +1492,10 @@
   public String toString() {
     return getServiceName() + " {" +
       "data=" + data +
-      ", localName='" + dnRegistration.getName() + "'" +
-      ", storageID='" + dnRegistration.getStorageID() + "'" +
+      (dnRegistration != null?       
+       (", localName='" + dnRegistration.getName() + "'" +
+       ", storageID='" + dnRegistration.getStorageID() + "'" )
+       : "") +
       ", xmitsInProgress=" + xmitsInProgress.get() +
       ", state=" + getServiceState() +
       "}";

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 26 15:03:33 2009
@@ -267,7 +267,8 @@
         mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
         mirrorSock = datanode.newSocket();
         try {
-          int timeoutValue = targets.length * datanode.socketTimeout;
+          int timeoutValue = datanode.socketTimeout
+              + (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
           int writeTimeout = datanode.socketWriteTimeout + 
                       (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Aug 26 15:03:33 2009
@@ -17,27 +17,42 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.mortbay.log.Log;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -462,9 +477,25 @@
     FSVolumeSet(FSVolume[] volumes) {
       this.volumes = volumes;
     }
+    
+    private int numberOfVolumes() {
+      return volumes.length;
+    }
       
     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+      
+      if(volumes.length < 1) {
+        throw new DiskOutOfSpaceException("No more available volumes");
+      }
+      
+      // since volumes could've been removed because of the failure
+      // make sure we are not out of bounds
+      if(curVolume >= volumes.length) {
+        curVolume = 0;
+      }
+      
       int startVolume = curVolume;
+      
       while (true) {
         FSVolume volume = volumes[curVolume];
         curVolume = (curVolume + 1) % volumes.length;
@@ -505,10 +536,46 @@
       }
     }
       
-    synchronized void checkDirs() throws DiskErrorException {
+    /**
+     * goes over all the volumes and checkDir eachone of them
+     * if one throws DiskErrorException - removes from the list of active 
+     * volumes. 
+     * @return list of all the removed volumes
+     */
+    synchronized List<FSVolume> checkDirs() {
+      
+      ArrayList<FSVolume> removed_vols = null;  
+      
       for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].checkDirs();
+        FSVolume fsv = volumes[idx];
+        try {
+          fsv.checkDirs();
+        } catch (DiskErrorException e) {
+          DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
+          if(removed_vols == null) {
+            removed_vols = new ArrayList<FSVolume>(1);
+          }
+          removed_vols.add(volumes[idx]);
+          volumes[idx] = null; //remove the volume
+        }
+      }
+      
+      // repair array - copy non null elements
+      int removed_size = (removed_vols==null)? 0 : removed_vols.size();
+      if(removed_size > 0) {
+        FSVolume fsvs[] = new FSVolume [volumes.length-removed_size];
+        for(int idx=0,idy=0; idx<volumes.length; idx++) {
+          if(volumes[idx] != null) {
+            fsvs[idy] = volumes[idx];
+            idy++;
+          }
+        }
+        volumes = fsvs; // replace array of volumes
       }
+      Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
+          "volumes. List of current volumes: " +   toString());
+      
+      return removed_vols;
     }
       
     public String toString() {
@@ -685,7 +752,14 @@
       return volumes.getDfsUsed();
     }
   }
-  
+  /**
+   * Return true - if there are still valid volumes 
+   * on the DataNode
+   */
+  public boolean hasEnoughResource(){
+    return volumes.numberOfVolumes() >= MIN_NUM_OF_VALID_VOLUMES;
+  }
+
   /**
    * Return total capacity, used and unused
    */
@@ -1232,17 +1306,32 @@
    * Check whether the given block is a valid one.
    */
   public boolean isValidBlock(Block b) {
-    return validateBlockFile(b) != null;
+    File f = null;;
+    try {
+      f = validateBlockFile(b);
+    } catch(IOException e) {
+      Log.warn("Block " + b + " is not valid:",e);
+    }
+    
+    return f != null;
   }
 
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(Block b) {
+  File validateBlockFile(Block b) throws IOException {
     //Should we check for metadata file too?
     File f = getFile(b);
-    if(f != null && f.exists())
-      return f;
+    
+    if(f != null ) {
+      if(f.exists())
+        return f;
+   
+      // if file is not null, but doesn't exist - possibly disk failed
+      DataNode datanode = DataNode.getDataNode();
+      datanode.checkDiskError();
+    }
+    
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
     }
@@ -1381,10 +1470,51 @@
 
   /**
    * check if a data directory is healthy
+   * if some volumes failed - make sure to remove all the blocks that belong
+   * to these volumes
    * @throws DiskErrorException
    */
   public void checkDataDir() throws DiskErrorException {
-    volumes.checkDirs();
+    long total_blocks=0, removed_blocks=0;
+    List<FSVolume> failed_vols =  volumes.checkDirs();
+    
+    //if there no failed volumes return
+    if(failed_vols == null) 
+      return;
+    
+    // else 
+    // remove related blocks
+    long mlsec = System.currentTimeMillis();
+    synchronized (this) {
+      Iterator<Block> ib = volumeMap.keySet().iterator();
+      while(ib.hasNext()) {
+        Block b = ib.next();
+        total_blocks ++;
+        // check if the volume block belongs to still valid
+        FSVolume vol = volumeMap.get(b).getVolume();
+        for(FSVolume fv: failed_vols) {
+          if(vol == fv) {
+            DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " 
+                + vol.dataDir.dir.getAbsolutePath());
+            ib.remove();
+            removed_blocks++;
+            break;
+          }
+        }
+      }
+    } // end of sync
+    mlsec = System.currentTimeMillis() - mlsec;
+    DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+        "(took " + mlsec + " millisecs)");
+
+    // report the error
+    StringBuilder sb = new StringBuilder();
+    for(FSVolume fv : failed_vols) {
+      sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
+    }
+
+    throw  new DiskErrorException("DataNode failed volumes:" + sb);
+  
   }
     
 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Aug 26 15:03:33 2009
@@ -264,4 +264,10 @@
    * @throws IOException
    */
   public void validateBlockMetadata(Block b) throws IOException;
+
+  /**
+   * checks how many valid storage volumes are there in the DataNode
+   * @return true if more then minimum valid volumes left in the FSDataSet
+   */
+  public boolean hasEnoughResource();
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Aug 26 15:03:33 2009
@@ -431,6 +431,17 @@
   public String getServiceName() {
     return "NameNode";
   }
+
+  /**
+   * Get the current number of workers
+   * @return the worker count
+   */
+  @Override
+  public int getLiveWorkerCount() {
+    return getNamesystem() != null?
+            getNamesystem().heartbeats.size()
+            : 0;
+  }
   
   /**
    * This method does all the startup. It is invoked from {@link #start()} when
@@ -1048,6 +1059,8 @@
     }
     verifyRequest(nodeReg);
     if (errorCode == DatanodeProtocol.DISK_ERROR) {
+      LOG.warn("Volume failed on " + dnName); 
+    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
       namesystem.removeDatanode(nodeReg);            
     }
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Aug 26 15:03:33 2009
@@ -42,8 +42,9 @@
   
   // error code
   final static int NOTIFY = 0;
-  final static int DISK_ERROR = 1;
+  final static int DISK_ERROR = 1; // there are still valid volumes on DN
   final static int INVALID_BLOCK = 2;
+  final static int FATAL_DISK_ERROR = 3; // no valid volumes left on DN
 
   /**
    * Determines actions that data node should perform 

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Wed Aug 26 15:03:33 2009
@@ -17,88 +17,97 @@
  */
 package org.apache.hadoop.fi;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.fi.FiTestUtil.Action;
 import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Utilities for DataTransferProtocol related tests,
  * e.g. TestFiDataTransferProtocol.
  */
 public class DataTransferTestUtil {
-  private static DataTransferTest thepipelinetest;
+  protected static PipelineTest thepipelinetest;
   /** initialize pipeline test */
-  public static DataTransferTest initTest() {
+  public static PipelineTest initTest() {
     return thepipelinetest = new DataTransferTest();
   }
   /** get the pipeline test object */
-  public static DataTransferTest getPipelineTest() {
+  public static PipelineTest getPipelineTest() {
     return thepipelinetest;
   }
+  /** get the pipeline test object cast to DataTransferTest */
+  public static DataTransferTest getDataTransferTest() {
+    return (DataTransferTest)getPipelineTest();
+  }
 
   /**
    * The DataTransferTest class includes a pipeline
    * and some actions.
    */
-  public static class DataTransferTest {
-    private Pipeline thepipeline;
+  public static class DataTransferTest implements PipelineTest {
+    private List<Pipeline> pipelines = new ArrayList<Pipeline>();
+    private volatile boolean isSuccess = false;
+
     /** Simulate action for the receiverOpWriteBlock pointcut */
-    public final ActionContainer<DataNode> fiReceiverOpWriteBlock
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
+        = new ActionContainer<DatanodeID>();
     /** Simulate action for the callReceivePacket pointcut */
-    public final ActionContainer<DataNode> fiCallReceivePacket
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiCallReceivePacket
+        = new ActionContainer<DatanodeID>();
     /** Simulate action for the statusRead pointcut */
-    public final ActionContainer<DataNode> fiStatusRead
-        = new ActionContainer<DataNode>();
+    public final ActionContainer<DatanodeID> fiStatusRead
+        = new ActionContainer<DatanodeID>();
+    /** Verification action for the pipelineInitNonAppend pointcut */
+    public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
+        = new ActionContainer<Integer>();
+    /** Verification action for the pipelineErrorAfterInit pointcut */
+    public final ActionContainer<Integer> fiPipelineErrorAfterInit
+        = new ActionContainer<Integer>();
+
+    /** Get test status */
+    public boolean isSuccess() {
+      return this.isSuccess;
+    }
+
+    /** Set test status */
+    public void markSuccess() {
+      this.isSuccess = true;
+    }
 
     /** Initialize the pipeline. */
     public Pipeline initPipeline(LocatedBlock lb) {
-      if (thepipeline != null) {
+      final Pipeline pl = new Pipeline(lb);
+      if (pipelines.contains(pl)) {
         throw new IllegalStateException("thepipeline != null");
       }
-      return thepipeline = new Pipeline(lb);
+      pipelines.add(pl);
+      return pl;
     }
 
     /** Return the pipeline. */
-    public Pipeline getPipeline() {
-      if (thepipeline == null) {
+    public Pipeline getPipeline(DatanodeID id) {
+      if (pipelines == null) {
         throw new IllegalStateException("thepipeline == null");
       }
-      return thepipeline;
-    }
-  }
-
-  /** A pipeline contains a list of datanodes. */
-  public static class Pipeline {
-    private final List<String> datanodes = new ArrayList<String>();
-    
-    private Pipeline(LocatedBlock lb) {
-      for(DatanodeInfo d : lb.getLocations()) {
-        datanodes.add(d.getName());
+      StringBuilder dnString = new StringBuilder();
+      for (Pipeline pipeline : pipelines) {
+        for (DatanodeInfo dni : pipeline.getDataNodes())
+          dnString.append(dni.getStorageID());
+        if (dnString.toString().contains(id.getStorageID()))
+          return pipeline;
       }
-    }
-
-    /** Does the pipeline contains d at the n th position? */
-    public boolean contains(int n, DatanodeID d) {
-      return d.getName().equals(datanodes.get(n));
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      return getClass().getSimpleName() + datanodes;
+      return null;
     }
   }
 
   /** Action for DataNode */
-  public static abstract class DataNodeAction implements Action<DataNode> {
+  public static abstract class DataNodeAction implements Action<DatanodeID> {
     /** The name of the test */
     final String currentTest;
     /** The index of the datanode */
@@ -108,7 +117,7 @@
      * @param currentTest The name of the test
      * @param index The index of the datanode
      */
-    private DataNodeAction(String currentTest, int index) {
+    protected DataNodeAction(String currentTest, int index) {
       this.currentTest = currentTest;
       this.index = index;
     }
@@ -118,10 +127,11 @@
       return currentTest + ", index=" + index;
     }
 
-    /** {@inheritDoc} */
-    String toString(DataNode datanode) {
+    /** {@inheritDoc}
+     * @param datanodeID*/
+    String toString(DatanodeID datanodeID) {
       return "FI: " + this + ", datanode="
-          + datanode.getDatanodeRegistration().getName();
+          + datanodeID.getName();
     }
   }
 
@@ -133,10 +143,11 @@
     }
 
     @Override
-    public void run(DataNode datanode) {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode);
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (!test.isSuccess() && p.contains(index, id)) {
+        final String s = toString(id);
         FiTestUtil.LOG.info(s);
         throw new OutOfMemoryError(s);
       }
@@ -151,10 +162,11 @@
     }
 
     @Override
-    public void run(DataNode datanode) throws DiskOutOfSpaceException {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode);
+    public void run(DatanodeID id) throws DiskOutOfSpaceException {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id)) {
+        final String s = toString(id);
         FiTestUtil.LOG.info(s);
         throw new DiskOutOfSpaceException(s);
       }
@@ -179,10 +191,11 @@
     }
 
     @Override
-    public void run(DataNode datanode) {
-      final Pipeline p = getPipelineTest().getPipeline();
-      if (p.contains(index, datanode.getDatanodeRegistration())) {
-        final String s = toString(datanode) + ", duration=" + duration;
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (!test.isSuccess() && p.contains(index, id)) {
+        final String s = toString(id) + ", duration=" + duration;
         FiTestUtil.LOG.info(s);
         if (duration <= 0) {
           for(; true; FiTestUtil.sleep(1000)); //sleep forever
@@ -192,4 +205,36 @@
       }
     }
   }
+
+  /** Action for pipeline error verification */
+  public static class VerificationAction implements Action<Integer> {
+    /** The name of the test */
+    final String currentTest;
+    /** The error index of the datanode */
+    final int errorIndex;
+
+    /**
+     * Create a verification action for errors at datanode i in the pipeline.
+     * 
+     * @param currentTest The name of the test
+     * @param i The error index of the datanode
+     */
+    public VerificationAction(String currentTest, int i) {
+      this.currentTest = currentTest;
+      this.errorIndex = i;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      return currentTest + ", errorIndex=" + errorIndex;
+    }
+
+    @Override
+    public void run(Integer i) {
+      if (i == errorIndex) {
+        FiTestUtil.LOG.info(this + ", successfully verified.");
+        getDataTransferTest().markSuccess();
+      }
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj Wed Aug 26 15:03:33 2009
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.PipelineTest;
 
 /** Aspect for ClientProtocol */
 public aspect ClientProtocolAspects {
@@ -29,7 +30,9 @@
     call(LocatedBlock ClientProtocol.addBlock(String, String));
 
   after() returning(LocatedBlock lb): addBlock() {
-    LOG.info("FI: addBlock "
-        + DataTransferTestUtil.getPipelineTest().initPipeline(lb));
+    PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();
+    if (pipelineTest != null)
+      LOG.info("FI: addBlock "
+          + pipelineTest.initPipeline(lb));
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Wed Aug 26 15:03:33 2009
@@ -22,6 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.ProbabilityModel;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -44,8 +45,10 @@
   before(BlockReceiver blockreceiver
       ) throws IOException : callReceivePacket(blockreceiver) {
     LOG.info("FI: callReceivePacket");
-    DataTransferTestUtil.getPipelineTest().fiCallReceivePacket.run(
-        blockreceiver.getDataNode());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiCallReceivePacket.run(
+          blockreceiver.getDataNode().getDatanodeRegistration());
 
     if (ProbabilityModel.injectCriteria(BlockReceiver.class.getSimpleName())) {
       LOG.info("Before the injection point");

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Wed Aug 26 15:03:33 2009
@@ -24,6 +24,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
@@ -59,7 +60,8 @@
     final DataNode d = dataxceiver.getDataNode();
     LOG.info("FI: statusRead " + status + ", datanode="
         + d.getDatanodeRegistration().getName());    
-    DataTransferTestUtil.getPipelineTest().fiStatusRead.run(d);
+    DataTransferTestUtil.getDataTransferTest().fiStatusRead.run(
+        d.getDatanodeRegistration());
   }
 
   pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
@@ -68,7 +70,9 @@
   before(DataXceiver dataxceiver
       ) throws IOException: receiverOpWriteBlock(dataxceiver) {
     LOG.info("FI: receiverOpWriteBlock");
-    DataTransferTestUtil.getPipelineTest().fiReceiverOpWriteBlock.run(
-        dataxceiver.getDataNode());
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiReceiverOpWriteBlock.run(
+          dataxceiver.getDataNode().getDatanodeRegistration());
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Wed Aug 26 15:03:33 2009
@@ -24,16 +24,22 @@
 import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
 import org.apache.hadoop.fi.FiTestUtil.Action;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+
+import org.junit.Assert;
+import org.junit.Test;
 
 /** Test DataTransferProtocol with fault injection. */
-public class TestFiDataTransferProtocol extends junit.framework.TestCase {
+public class TestFiDataTransferProtocol {
   static final short REPLICATION = 3;
   static final long BLOCKSIZE = 1L * (1L << 20);
 
@@ -41,6 +47,7 @@
   static {
     conf.setInt("dfs.datanode.handler.count", 1);
     conf.setInt("dfs.replication", REPLICATION);
+    conf.setInt("dfs.socket.timeout", 5000);
   }
 
   static private FSDataOutputStream createFile(FileSystem fs, Path p
@@ -59,8 +66,8 @@
   private static void write1byte(String methodName) throws IOException {
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
         null);
+    final FileSystem dfs = cluster.getFileSystem();
     try {
-      final FileSystem dfs = cluster.getFileSystem();
       final Path p = new Path("/" + methodName + "/foo");
       final FSDataOutputStream out = createFile(dfs, p);
       out.write(1);
@@ -69,9 +76,10 @@
       final FSDataInputStream in = dfs.open(p);
       final int b = in.read();
       in.close();
-      assertEquals(1, b);
+      Assert.assertEquals(1, b);
     }
     finally {
+      dfs.close();
       cluster.shutdown();
     }
   }
@@ -79,18 +87,106 @@
   private static void runSlowDatanodeTest(String methodName, SleepAction a
                   ) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = DataTransferTestUtil.initTest();
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
     t.fiCallReceivePacket.set(a);
     t.fiReceiverOpWriteBlock.set(a);
     t.fiStatusRead.set(a);
     write1byte(methodName);
   }
   
+  private static void runReceiverOpWriteBlockTest(String methodName,
+      int errorIndex, Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiReceiverOpWriteBlock.set(a);
+    t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName,
+        errorIndex));
+    write1byte(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+  
+  private static void runStatusReadTest(String methodName, int errorIndex,
+      Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiStatusRead.set(a);
+    t.fiPipelineInitErrorNonAppend.set(new VerificationAction(methodName,
+        errorIndex));
+    write1byte(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+
+  private static void runCallReceivePacketTest(String methodName,
+      int errorIndex, Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
+    t.fiCallReceivePacket.set(a);
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
+    write1byte(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+
+  /**
+   * Pipeline setup:
+   * DN0 never responses after received setup request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_01() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 0, new SleepAction(methodName, 0, 0));
+  }
+
+  /**
+   * Pipeline setup:
+   * DN1 never responses after received setup request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_02() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 1, new SleepAction(methodName, 1, 0));
+  }
+
+  /**
+   * Pipeline setup:
+   * DN2 never responses after received setup request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_03() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 2, new SleepAction(methodName, 2, 0));
+  }
+
+  /**
+   * Pipeline setup, DN1 never responses after received setup ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_04() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runStatusReadTest(methodName, 1, new SleepAction(methodName, 1, 0));
+  }
+
+  /**
+   * Pipeline setup, DN0 never responses after received setup ack from DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_05() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runStatusReadTest(methodName, 0, new SleepAction(methodName, 0, 0));
+  }
+
   /**
    * Pipeline setup with DN0 very slow but it won't lead to timeout.
    * Client finishes setup successfully.
    */
-  public void testPipelineFi06() throws IOException {
+  @Test
+  public void pipeline_Fi_06() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runSlowDatanodeTest(methodName, new SleepAction(methodName, 0, 3000));
   }
@@ -99,16 +195,75 @@
    * Pipeline setup with DN1 very slow but it won't lead to timeout.
    * Client finishes setup successfully.
    */
-  public void testPipelineFi07() throws IOException {
+  @Test
+  public void pipeline_Fi_07() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
     runSlowDatanodeTest(methodName, new SleepAction(methodName, 1, 3000));
   }
 
-  private static void runCallReceivePacketTest(String methodName,
-      Action<DataNode> a) throws IOException {
-    FiTestUtil.LOG.info("Running " + methodName + " ...");
-    DataTransferTestUtil.initTest().fiCallReceivePacket.set(a);
-    write1byte(methodName);
+  /**
+   * Pipeline setup with DN2 very slow but it won't lead to timeout.
+   * Client finishes setup successfully.
+   */
+  @Test
+  public void pipeline_Fi_08() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runSlowDatanodeTest(methodName, new SleepAction(methodName, 2, 3000));
+  }
+
+  /**
+   * Pipeline setup, DN0 throws an OutOfMemoryException right after it
+   * received a setup request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_09() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 0, new OomAction(methodName, 0));
+  }
+
+  /**
+   * Pipeline setup, DN1 throws an OutOfMemoryException right after it
+   * received a setup request from DN0.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_10() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 1, new OomAction(methodName, 1));
+  }
+
+  /**
+   * Pipeline setup, DN2 throws an OutOfMemoryException right after it
+   * received a setup request from DN1.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_11() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runReceiverOpWriteBlockTest(methodName, 2, new OomAction(methodName, 2));
+  }
+
+  /**
+   * Pipeline setup, DN1 throws an OutOfMemoryException right after it
+   * received a setup ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_12() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runStatusReadTest(methodName, 1, new OomAction(methodName, 1));
+  }
+
+  /**
+   * Pipeline setup, DN0 throws an OutOfMemoryException right after it
+   * received a setup ack from DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_13() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runStatusReadTest(methodName, 0, new OomAction(methodName, 0));
   }
 
   /**
@@ -116,9 +271,10 @@
    * when it writes the data to disk.
    * Client gets an IOException and determine DN0 bad.
    */
-  public void testPipelineFi14() throws IOException {
+  @Test
+  public void pipeline_Fi_14() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, new DoosAction(methodName, 0));
+    runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0));
   }
 
   /**
@@ -126,8 +282,20 @@
    * when it writes the data to disk.
    * Client gets an IOException and determine DN1 bad.
    */
-  public void testPipelineFi15() throws IOException {
+  @Test
+  public void pipeline_Fi_15() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1));
+  }
+  
+  /**
+   * Streaming: Write a packet, DN2 throws a DiskOutOfSpaceError
+   * when it writes the data to disk.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_16() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, new DoosAction(methodName, 1));
+    runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
   }
 }
\ No newline at end of file

Propchange: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
+/hadoop/hdfs/trunk/src/test/hdfs:804973-807690

Propchange: hadoop/hdfs/branches/HDFS-326/src/test/hdfs-with-mr/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr:713112
 /hadoop/core/trunk/src/test/hdfs-with-mr:776175-784663
+/hadoop/hdfs/trunk/src/test/hdfs-with-mr:804973-807690

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java Wed Aug 26 15:03:33 2009
@@ -21,6 +21,7 @@
 import java.io.BufferedReader;
 import java.io.DataInputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -132,9 +133,13 @@
       return;
     }
     
-    FileStatus children[] = fs.listStatus(rootFile);
-    if (children == null)
+    FileStatus [] children = null;
+    try {
+      children = fs.listStatus(rootFile);
+    } catch (FileNotFoundException fnfe ){
       throw new IOException("Could not get listing for " + rootFile);
+    }
+
     for (int i = 0; i < children.length; i++)
       listSubtree(children[i], writer);
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Aug 26 15:03:33 2009
@@ -655,4 +655,8 @@
   public String getStorageInfo() {
     return "Simulated FSDataset-" + storageId;
   }
+  
+  public boolean hasEnoughResource() {
+    return true;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=808037&r1=808036&r2=808037&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Wed Aug 26 15:03:33 2009
@@ -162,6 +162,7 @@
     try {
       Configuration conf = new Configuration();
       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
+      conf.setInt("dfs.datanode.directoryscan.interval", 1);
       cluster = new MiniDFSCluster(conf, 4, true, null);
       String topDir = "/srcdat";
       fs = cluster.getFileSystem();

Propchange: hadoop/hdfs/branches/HDFS-326/src/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
 /hadoop/core/trunk/src/webapps/datanode:776175-784663
+/hadoop/hdfs/trunk/src/webapps/datanode:804973-807690

Propchange: hadoop/hdfs/branches/HDFS-326/src/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663
+/hadoop/hdfs/trunk/src/webapps/hdfs:804973-807690

Propchange: hadoop/hdfs/branches/HDFS-326/src/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:03:33 2009
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
 /hadoop/core/trunk/src/webapps/secondary:776175-784663
+/hadoop/hdfs/trunk/src/webapps/secondary:804973-807690



Mime
View raw message