hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1414456 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/namen...
Date Tue, 27 Nov 2012 23:02:15 GMT
Author: szetszwo
Date: Tue Nov 27 23:02:00 2012
New Revision: 1414456

URL: http://svn.apache.org/viewvc?rev=1414456&view=rev
Log:
Merge r1412283 through r1414454 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1412283-1414454

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 27 23:02:00 2012
@@ -167,6 +167,12 @@ Trunk (Unreleased)
     HDFS-4215. Remove locking from addToParent(..) since it is used in image
     loading, and add INode.isFile().  (szetszwo)
 
+    HDFS-4200. Reduce the size of synchronized sections in PacketResponder.
+    (suresh)
+
+    HDFS-4209. Clean up the addNode/addChild/addChildNoQuotaCheck methods in
+    FSDirectory and INodeDirectory. (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1412283-1414454

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Nov 27 23:02:00 2012
@@ -233,7 +233,7 @@ public class DFSUtil {
   /**
    * Given a list of path components returns a path as a UTF8 String
    */
-  public static String byteArray2String(byte[][] pathComponents) {
+  public static String byteArray2PathString(byte[][] pathComponents) {
     if (pathComponents.length == 0)
       return "";
     if (pathComponents.length == 1 && pathComponents[0].length == 0) {
@@ -254,6 +254,14 @@ public class DFSUtil {
     return null;
   }
 
+  /** Convert an object representing a path to a string. */
+  public static String path2String(final Object path) {
+    return path == null? null
+        : path instanceof String? (String)path
+        : path instanceof byte[][]? byteArray2PathString((byte[][])path)
+        : path.toString();
+  }
+
   /**
    * Splits the array of bytes into array of arrays of bytes
    * on byte separator

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Nov 27 23:02:00 2012
@@ -638,10 +638,7 @@ class BlockReceiver implements Closeable
         responder.start(); // start thread to processes responses
       }
 
-      /* 
-       * Receive until the last packet.
-       */
-      while (receivePacket() >= 0) {}
+      while (receivePacket() >= 0) { /* Receive until the last packet */ }
 
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
@@ -724,7 +721,7 @@ class BlockReceiver implements Closeable
   static private long checksum2long(byte[] checksum) {
     long crc = 0L;
     for(int i=0; i<checksum.length; i++) {
-      crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
+      crc |= (0xffL&checksum[i])<<((checksum.length-i-1)*8);
     }
     return crc;
   }
@@ -783,24 +780,23 @@ class BlockReceiver implements Closeable
     NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
   }
   
+  private static Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
+  
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
   class PacketResponder implements Runnable, Closeable {   
-
-    /** queue for packets waiting for ack */
+    /** queue for packets waiting for ack - synchronization using monitor lock */
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
     /** the thread that spawns this responder */
     private final Thread receiverThread = Thread.currentThread();
-    /** is this responder running? */
+    /** is this responder running? - synchronization using monitor lock */
     private volatile boolean running = true;
-
     /** input from the next downstream datanode */
     private final DataInputStream downstreamIn;
     /** output to upstream datanode/client */
     private final DataOutputStream upstreamOut;
-
     /** The type of this responder */
     private final PacketResponderType type;
     /** for log and error messages */
@@ -812,8 +808,7 @@ class BlockReceiver implements Closeable
     }
 
     PacketResponder(final DataOutputStream upstreamOut,
-        final DataInputStream downstreamIn,
-        final DatanodeInfo[] downstreams) {
+        final DataInputStream downstreamIn, final DatanodeInfo[] downstreams) {
       this.downstreamIn = downstreamIn;
       this.upstreamOut = upstreamOut;
 
@@ -830,31 +825,49 @@ class BlockReceiver implements Closeable
       this.myString = b.toString();
     }
 
+    private boolean isRunning() {
+      return running && datanode.shouldRun;
+    }
+    
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
      * @param offsetInBlock
      */
-    synchronized void enqueue(final long seqno,
-        final boolean lastPacketInBlock, final long offsetInBlock) {
-      if (running) {
-        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
-            System.nanoTime());
-        if(LOG.isDebugEnabled()) {
-          LOG.debug(myString + ": enqueue " + p);
+    void enqueue(final long seqno, final boolean lastPacketInBlock,
+        final long offsetInBlock) {
+      final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
+          System.nanoTime());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(myString + ": enqueue " + p);
+      }
+      synchronized(this) {
+        if (running) {
+          ackQueue.addLast(p);
+          notifyAll();
         }
-        ackQueue.addLast(p);
-        notifyAll();
       }
     }
+    
+    /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
+    synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
+      while (isRunning() && ackQueue.size() == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(myString + ": seqno=" + seqno +
+                    " waiting for local datanode to finish write.");
+        }
+        wait();
+      }
+      return isRunning() ? ackQueue.getFirst() : null;
+    }
 
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
     @Override
     public synchronized void close() {
-      while (running && ackQueue.size() != 0 && datanode.shouldRun) {
+      while (isRunning() && ackQueue.size() != 0) {
         try {
           wait();
         } catch (InterruptedException e) {
@@ -877,147 +890,97 @@ class BlockReceiver implements Closeable
     public void run() {
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-      while (running && datanode.shouldRun && !lastPacketInBlock) {
-
+      while (isRunning() && !lastPacketInBlock) {
         long totalAckTimeNanos = 0;
         boolean isInterrupted = false;
         try {
-            Packet pkt = null;
-            long expected = -2;
-            PipelineAck ack = new PipelineAck();
-            long seqno = PipelineAck.UNKOWN_SEQNO;
-            long ackRecvNanoTime = 0;
-            try {
-              if (type != PacketResponderType.LAST_IN_PIPELINE
-                  && !mirrorError) {
-                // read an ack from downstream datanode
-                ack.readFields(downstreamIn);
-                ackRecvNanoTime = System.nanoTime();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug(myString + " got " + ack);
-                }
-                seqno = ack.getSeqno();
+          Packet pkt = null;
+          long expected = -2;
+          PipelineAck ack = new PipelineAck();
+          long seqno = PipelineAck.UNKOWN_SEQNO;
+          long ackRecvNanoTime = 0;
+          try {
+            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
+              // read an ack from downstream datanode
+              ack.readFields(downstreamIn);
+              ackRecvNanoTime = System.nanoTime();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(myString + " got " + ack);
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO
-                  || type == PacketResponderType.LAST_IN_PIPELINE) {
-                synchronized (this) {
-                  while (running && datanode.shouldRun && ackQueue.size() == 0) {
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug(myString + ": seqno=" + seqno +
-                                " waiting for local datanode to finish write.");
-                    }
-                    wait();
-                  }
-                  if (!running || !datanode.shouldRun) {
-                    break;
-                  }
-                  pkt = ackQueue.getFirst();
-                  expected = pkt.seqno;
-                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
-                      && seqno != expected) {
-                    throw new IOException(myString + "seqno: expected="
-                        + expected + ", received=" + seqno);
-                  }
-                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
-                    // The total ack time includes the ack times of downstream nodes.
-                    // The value is 0 if this responder doesn't have a downstream
-                    // DN in the pipeline.
-                    totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
-                    // Report the elapsed time from ack send to ack receive minus
-                    // the downstream ack time.
-                    long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos();
-                    if (ackTimeNanos < 0) {
-                      if (LOG.isDebugEnabled()) {
-                        LOG.debug("Calculated invalid ack time: " + ackTimeNanos + "ns.");
-                      }
-                    } else {
-                      datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
-                    }
+              seqno = ack.getSeqno();
+            }
+            if (seqno != PipelineAck.UNKOWN_SEQNO
+                || type == PacketResponderType.LAST_IN_PIPELINE) {
+              pkt = waitForAckHead(seqno);
+              if (!isRunning()) {
+                break;
+              }
+              expected = pkt.seqno;
+              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                  && seqno != expected) {
+                throw new IOException(myString + "seqno: expected=" + expected
+                    + ", received=" + seqno);
+              }
+              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+                // The total ack time includes the ack times of downstream
+                // nodes.
+                // The value is 0 if this responder doesn't have a downstream
+                // DN in the pipeline.
+                totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
+                // Report the elapsed time from ack send to ack receive minus
+                // the downstream ack time.
+                long ackTimeNanos = totalAckTimeNanos
+                    - ack.getDownstreamAckTimeNanos();
+                if (ackTimeNanos < 0) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+                        + "ns.");
                   }
-                  lastPacketInBlock = pkt.lastPacketInBlock;
+                } else {
+                  datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
                 }
               }
-            } catch (InterruptedException ine) {
+              lastPacketInBlock = pkt.lastPacketInBlock;
+            }
+          } catch (InterruptedException ine) {
+            isInterrupted = true;
+          } catch (IOException ioe) {
+            if (Thread.interrupted()) {
               isInterrupted = true;
-            } catch (IOException ioe) {
-              if (Thread.interrupted()) {
-                isInterrupted = true;
-              } else {
-                // continue to run even if can not read from mirror
-                // notify client of the error
-                // and wait for the client to shut down the pipeline
-                mirrorError = true;
-                LOG.info(myString, ioe);
-              }
+            } else {
+              // continue to run even if can not read from mirror
+              // notify client of the error
+              // and wait for the client to shut down the pipeline
+              mirrorError = true;
+              LOG.info(myString, ioe);
             }
+          }
 
-            if (Thread.interrupted() || isInterrupted) {
-              /* The receiver thread cancelled this thread. 
-               * We could also check any other status updates from the 
-               * receiver thread (e.g. if it is ok to write to replyOut). 
-               * It is prudent to not send any more status back to the client
-               * because this datanode has a problem. The upstream datanode
-               * will detect that this datanode is bad, and rightly so.
-               */
-              LOG.info(myString + ": Thread is interrupted.");
-              running = false;
-              continue;
-            }
-            
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (lastPacketInBlock) {
-              BlockReceiver.this.close();
-              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(replicaInfo.getNumBytes());
-              datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
-              if (ClientTraceLog.isInfoEnabled() && isClient) {
-                long offset = 0;
-                DatanodeRegistration dnR = 
-                  datanode.getDNRegistrationForBP(block.getBlockPoolId());
-                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      inAddr, myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", clientname, offset,
-                      dnR.getStorageID(), block, endTime-startTime));
-              } else {
-                LOG.info("Received " + block + " size "
-                    + block.getNumBytes() + " from " + inAddr);
-              }
-            }
+          if (Thread.interrupted() || isInterrupted) {
+            /*
+             * The receiver thread cancelled this thread. We could also check
+             * any other status updates from the receiver thread (e.g. if it is
+             * ok to write to replyOut). It is prudent to not send any more
+             * status back to the client because this datanode has a problem.
+             * The upstream datanode will detect that this datanode is bad, and
+             * rightly so.
+             */
+            LOG.info(myString + ": Thread is interrupted.");
+            running = false;
+            continue;
+          }
 
-            // construct my ack message
-            Status[] replies = null;
-            if (mirrorError) { // ack read error
-              replies = new Status[2];
-              replies[0] = Status.SUCCESS;
-              replies[1] = Status.ERROR;
-            } else {
-              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
-                  : ack.getNumOfReplies();
-              replies = new Status[1+ackLen];
-              replies[0] = Status.SUCCESS;
-              for (int i=0; i<ackLen; i++) {
-                replies[i+1] = ack.getReply(i);
-              }
-            }
-            PipelineAck replyAck = new PipelineAck(expected, replies, totalAckTimeNanos);
-            
-            if (replyAck.isSuccess() && 
-                 pkt.offsetInBlock > replicaInfo.getBytesAcked())
-                replicaInfo.setBytesAcked(pkt.offsetInBlock);
-
-            // send my ack back to upstream datanode
-            replyAck.write(upstreamOut);
-            upstreamOut.flush();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(myString + ", replyAck=" + replyAck);
-            }
-            if (pkt != null) {
-              // remove the packet from the ack queue
-              removeAckHead();
-              // update bytes acked
-            }
+          if (lastPacketInBlock) {
+            // Finalize the block and close the block file
+            finalizeBlock(startTime);
+          }
+
+          sendAckUpstream(ack, expected, totalAckTimeNanos,
+              (pkt != null ? pkt.offsetInBlock : 0));
+          if (pkt != null) {
+            // remove the packet from the ack queue
+            removeAckHead();
+          }
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
@@ -1044,6 +1007,66 @@ class BlockReceiver implements Closeable
     }
     
     /**
+     * Finalize the block and close the block file
+     * @param startTime time when BlockReceiver started receiving the block
+     */
+    private void finalizeBlock(long startTime) throws IOException {
+      BlockReceiver.this.close();
+      final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime()
+          : 0;
+      block.setNumBytes(replicaInfo.getNumBytes());
+      datanode.data.finalizeBlock(block);
+      datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (ClientTraceLog.isInfoEnabled() && isClient) {
+        long offset = 0;
+        DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block
+            .getBlockPoolId());
+        ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, inAddr,
+            myAddr, block.getNumBytes(), "HDFS_WRITE", clientname, offset,
+            dnR.getStorageID(), block, endTime - startTime));
+      } else {
+        LOG.info("Received " + block + " size " + block.getNumBytes()
+            + " from " + inAddr);
+      }
+    }
+    
+    /**
+     * @param ack Ack received from downstream
+     * @param seqno sequence number of ack to be sent upstream
+     * @param totalAckTimeNanos total ack time including all the downstream
+     *          nodes
+     * @param offsetInBlock offset in block for the data in packet
+     */
+    private void sendAckUpstream(PipelineAck ack, long seqno,
+        long totalAckTimeNanos, long offsetInBlock) throws IOException {
+      Status[] replies = null;
+      if (mirrorError) { // ack read error
+        replies = MIRROR_ERROR_STATUS;
+      } else {
+        short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
+            .getNumOfReplies();
+        replies = new Status[1 + ackLen];
+        replies[0] = Status.SUCCESS;
+        for (int i = 0; i < ackLen; i++) {
+          replies[i + 1] = ack.getReply(i);
+        }
+      }
+      PipelineAck replyAck = new PipelineAck(seqno, replies,
+          totalAckTimeNanos);
+      if (replyAck.isSuccess()
+          && offsetInBlock > replicaInfo.getBytesAcked()) {
+        replicaInfo.setBytesAcked(offsetInBlock);
+      }
+
+      // send my ack back to upstream datanode
+      replyAck.write(upstreamOut);
+      upstreamOut.flush();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(myString + ", replyAck=" + replyAck);
+      }
+    }
+    
+    /**
      * Remove a packet from the head of the ack queue
      * 
      * This should be called only when the ack queue is not empty

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Nov 27 23:02:00 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -88,7 +89,6 @@ public class FSDirectory implements Clos
   FSImage fsImage;  
   private final FSNamesystem namesystem;
   private volatile boolean ready = false;
-  private static final long UNKNOWN_DISK_SPACE = -1;
   private final int maxComponentLength;
   private final int maxDirItems;
   private final int lsLimit;  // max list limit
@@ -263,13 +263,14 @@ public class FSDirectory implements Clos
                                  permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
+    boolean added = false;
     writeLock();
     try {
-      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
+      added = addINode(path, newNode);
     } finally {
       writeUnlock();
     }
-    if (newNode == null) {
+    if (!added) {
       NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
       return null;
     }
@@ -289,7 +290,7 @@ public class FSDirectory implements Clos
                             boolean underConstruction,
                             String clientName,
                             String clientMachine) {
-    INode newNode;
+    final INode newNode;
     assert hasWriteLock();
     if (underConstruction) {
       newNode = new INodeFileUnderConstruction(
@@ -302,16 +303,17 @@ public class FSDirectory implements Clos
     }
 
     try {
-      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
+      if (addINode(path, newNode)) {
+        return newNode;
+      }
     } catch (IOException e) {
       if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
                 + " to the file system", e);
       }
-      return null;
     }
-    return newNode;
+    return null;
   }
 
   /**
@@ -559,12 +561,12 @@ public class FSDirectory implements Clos
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcInodes, dstInodes);
     
-    INode dstChild = null;
+    boolean added = false;
     INode srcChild = null;
     String srcChildName = null;
     try {
       // remove src
-      srcChild = removeChild(srcInodesInPath, srcInodes.length-1);
+      srcChild = removeLastINode(srcInodesInPath);
       if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
@@ -575,9 +577,8 @@ public class FSDirectory implements Clos
       srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
       // add src to the destination
-      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length-1,
-          srcChild, UNKNOWN_DISK_SPACE);
-      if (dstChild != null) {
+      added = addLastINodeNoQuotaCheck(dstInodesInPath, srcChild);
+      if (added) {
         srcChild = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
@@ -589,11 +590,10 @@ public class FSDirectory implements Clos
         return true;
       }
     } finally {
-      if (dstChild == null && srcChild != null) {
+      if (!added && srcChild != null) {
         // put it back
         srcChild.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, srcChild, 
-            UNKNOWN_DISK_SPACE);
+        addLastINodeNoQuotaCheck(srcInodesInPath, srcChild);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -724,7 +724,7 @@ public class FSDirectory implements Clos
 
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcInodes, dstInodes);
-    INode removedSrc = removeChild(srcInodesInPath, srcInodes.length - 1);
+    INode removedSrc = removeLastINode(srcInodesInPath);
     if (removedSrc == null) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
@@ -737,18 +737,13 @@ public class FSDirectory implements Clos
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeChild(dstInodesInPath, dstInodes.length - 1);
+        removedDst = removeLastINode(dstInodesInPath);
         dstChildName = removedDst.getLocalName();
       }
 
-      INode dstChild = null;
       removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
       // add src as dst to complete rename
-      dstChild = addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1,
-          removedSrc, UNKNOWN_DISK_SPACE);
-
-      int filesDeleted = 0;
-      if (dstChild != null) {
+      if (addLastINodeNoQuotaCheck(dstInodesInPath, removedSrc)) {
         removedSrc = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
@@ -759,6 +754,7 @@ public class FSDirectory implements Clos
         dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
 
         // Collect the blocks and remove the lease for previous dst
+        int filesDeleted = 0;
         if (removedDst != null) {
           INode rmdst = removedDst;
           removedDst = null;
@@ -772,14 +768,12 @@ public class FSDirectory implements Clos
       if (removedSrc != null) {
         // Rename failed - restore src
         removedSrc.setLocalName(srcChildName);
-        addChildNoQuotaCheck(srcInodesInPath, srcInodes.length - 1, removedSrc, 
-            UNKNOWN_DISK_SPACE);
+        addLastINodeNoQuotaCheck(srcInodesInPath, removedSrc);
       }
       if (removedDst != null) {
         // Rename failed - restore dst
         removedDst.setLocalName(dstChildName);
-        addChildNoQuotaCheck(dstInodesInPath, dstInodes.length - 1, removedDst, 
-            UNKNOWN_DISK_SPACE);
+        addLastINodeNoQuotaCheck(dstInodesInPath, removedDst);
       }
     }
     NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -1129,14 +1123,13 @@ public class FSDirectory implements Clos
 
     final INode[] inodes = inodesInPath.getINodes();
     INode targetNode = inodes[inodes.length-1];
-    int pos = inodes.length - 1;
     // Remove the node from the namespace
-    targetNode = removeChild(inodesInPath, pos);
+    targetNode = removeLastINode(inodesInPath);
     if (targetNode == null) {
       return 0;
     }
     // set the parent's modification time
-    inodes[pos-1].setModificationTime(mtime);
+    inodes[inodes.length - 2].setModificationTime(mtime);
     int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
@@ -1183,7 +1176,7 @@ public class FSDirectory implements Clos
     } 
     
     //add the new node
-    rootDir.addNode(path, newnode); 
+    rootDir.addINode(path, newnode); 
   }
 
   /**
@@ -1332,22 +1325,6 @@ public class FSDirectory implements Clos
     }
   }
 
-  /**
-   * Get the parent node of path.
-   * 
-   * @param path the path to explore
-   * @return its parent node
-   */
-  INodeDirectory getParent(byte[][] path) 
-    throws FileNotFoundException, UnresolvedLinkException {
-    readLock();
-    try {
-      return rootDir.getParent(path);
-    } finally {
-      readUnlock();
-    }
-  }
-  
   /** 
    * Check whether the filepath could be created
    * @throws SnapshotAccessControlException if path is in RO snapshot
@@ -1405,20 +1382,17 @@ public class FSDirectory implements Clos
    * @param nsDelta the delta change of namespace
    * @param dsDelta the delta change of diskspace
    * @throws QuotaExceededException if the new count violates any quota limit
-   * @throws FileNotFound if path does not exist.
+   * @throws FileNotFoundException if path does not exist.
    */
   void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
-                                         throws QuotaExceededException,
-                                                FileNotFoundException,
-                                                UnresolvedLinkException {
+      throws QuotaExceededException, FileNotFoundException, UnresolvedLinkException {
     writeLock();
     try {
       final INodesInPath inodesInPath = rootDir.getExistingPathINodes(path, false);
       final INode[] inodes = inodesInPath.getINodes();
       int len = inodes.length;
       if (inodes[len - 1] == null) {
-        throw new FileNotFoundException(path + 
-                                        " does not exist under rootDir.");
+        throw new FileNotFoundException("Path not found: " + path);
       }
       updateCount(inodesInPath, len-1, nsDelta, dsDelta, true);
     } finally {
@@ -1647,15 +1621,17 @@ public class FSDirectory implements Clos
       long timestamp) throws QuotaExceededException {
     assert hasWriteLock();
     final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
-    final INode inode = addChild(inodesInPath, pos, dir, -1, true);
-    inodesInPath.setINode(pos, inode);
+    if (addChild(inodesInPath, pos, dir, true)) {
+      inodesInPath.setINode(pos, dir);
+    }
   }
   
-  /** Add a node child to the namespace. The full path name of the node is src.
-   * childDiskspace should be -1, if unknown. 
+  /**
+   * Add the given child to the namespace.
+   * @param src The full path name of the child node.
    * @throw QuotaExceededException is thrown if it violates quota limit
    */
-  private <T extends INode> T addNode(String src, T child, long childDiskspace
+  private boolean addINode(String src, INode child
       ) throws QuotaExceededException, UnresolvedLinkException {
     byte[][] components = INode.getPathComponents(src);
     byte[] path = components[components.length-1];
@@ -1665,8 +1641,7 @@ public class FSDirectory implements Clos
     try {
       INodesInPath inodesInPath = rootDir.getExistingPathINodes(components,
           components.length, false);
-      return addChild(inodesInPath, inodesInPath.getINodes().length-1, child,
-          childDiskspace, true);
+      return addLastINode(inodesInPath, child, true);
     } finally {
       writeUnlock();
     }
@@ -1790,14 +1765,24 @@ public class FSDirectory implements Clos
     }
   }
   
+  /**
+   * The same as {@link #addChild(INodesInPath, int, INode, boolean)}
+   * with pos = length - 1.
+   */
+  private boolean addLastINode(INodesInPath inodesInPath,
+      INode inode, boolean checkQuota) throws QuotaExceededException {
+    final int pos = inodesInPath.getINodes().length - 1;
+    return addChild(inodesInPath, pos, inode, checkQuota);
+  }
+
   /** Add a node child to the inodes at index pos. 
    * Its ancestors are stored at [0, pos-1].
-   * @return the added node. 
+   * @return false if the child with this name already exists; 
+   *         otherwise return true;
    * @throw QuotaExceededException is thrown if it violates quota limit
    */
-  private <T extends INode> T addChild(INodesInPath inodesInPath, int pos,
-      T child, long childDiskspace,
-      boolean checkQuota) throws QuotaExceededException {
+  private boolean addChild(INodesInPath inodesInPath, int pos,
+      INode child, boolean checkQuota) throws QuotaExceededException {
     final INode[] inodes = inodesInPath.getINodes();
     // The filesystem limits are not really quotas, so this check may appear
     // odd. It's because a rename operation deletes the src, tries to add
@@ -1811,38 +1796,34 @@ public class FSDirectory implements Clos
     
     INode.DirCounts counts = new INode.DirCounts();
     child.spaceConsumedInTree(counts);
-    if (childDiskspace < 0) {
-      childDiskspace = counts.getDsCount();
-    }
-    updateCount(inodesInPath, pos, counts.getNsCount(), childDiskspace, checkQuota);
+    updateCount(inodesInPath, pos, counts.getNsCount(), counts.getDsCount(), checkQuota);
     if (inodes[pos-1] == null) {
       throw new NullPointerException("Panic: parent does not exist");
     }
-    final T addedNode = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
-    if (addedNode == null) {
-      updateCount(inodesInPath, pos, -counts.getNsCount(), -childDiskspace, true);
+    final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true);
+    if (!added) {
+      updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
     }
-    return addedNode;
+    return added;
   }
   
-  private <T extends INode> T addChildNoQuotaCheck(INodesInPath inodesInPath,
-      int pos, T child, long childDiskspace) {
-    T inode = null;
+  private boolean addLastINodeNoQuotaCheck(INodesInPath inodesInPath, INode i) {
     try {
-      inode = addChild(inodesInPath, pos, child, childDiskspace, false);
+      return addLastINode(inodesInPath, i, false);
     } catch (QuotaExceededException e) {
       NameNode.LOG.warn("FSDirectory.addChildNoQuotaCheck - unexpected", e); 
     }
-    return inode;
+    return false;
   }
   
-  /** Remove an inode at index pos from the namespace.
-   * Its ancestors are stored at [0, pos-1].
+  /**
+   * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
-   * Return the removed node; null if the removal fails.
+   * @return the removed node; null if the removal fails.
    */
-  private INode removeChild(final INodesInPath inodesInPath, int pos) {
+  private INode removeLastINode(final INodesInPath inodesInPath) {
     final INode[] inodes = inodesInPath.getINodes();
+    final int pos = inodes.length - 1;
     INode removedNode = ((INodeDirectory)inodes[pos-1]).removeChild(inodes[pos]);
     if (removedNode != null) {
       INode.DirCounts counts = new INode.DirCounts();
@@ -1961,15 +1942,17 @@ public class FSDirectory implements Clos
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
    * Sets quota for for a directory.
    * @returns INodeDirectory if any of the quotas have changed. null other wise.
-   * @throws FileNotFoundException if the path does not exist or is a file
+   * @throws FileNotFoundException if the path does not exist.
+   * @throws PathIsNotDirectoryException if the path is not a directory.
    * @throws QuotaExceededException if the directory tree size is 
    *                                greater than the given quota
    * @throws UnresolvedLinkException if a symlink is encountered in src.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    */
   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
-      throws FileNotFoundException, QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException {
+      throws FileNotFoundException, PathIsNotDirectoryException,
+      QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     assert hasWriteLock();
     // sanity check
     if ((nsQuota < 0 && nsQuota != HdfsConstants.QUOTA_DONT_SET && 
@@ -1984,15 +1967,10 @@ public class FSDirectory implements Clos
     String srcs = normalizePath(src);
     final INode[] inodes = rootDir.getMutableINodesInPath(srcs, true)
         .getINodes();
-    INode targetNode = inodes[inodes.length-1];
-    if (targetNode == null) {
-      throw new FileNotFoundException("Directory does not exist: " + srcs);
-    } else if (!targetNode.isDirectory()) {
-      throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
-    } else if (targetNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
+    INodeDirectory dirNode = INodeDirectory.valueOf(inodes[inodes.length-1], srcs);
+    if (dirNode.isRoot() && nsQuota == HdfsConstants.QUOTA_RESET) {
       throw new IllegalArgumentException("Cannot clear namespace quota on root.");
     } else { // a directory inode
-      INodeDirectory dirNode = (INodeDirectory)targetNode;
       long oldNsQuota = dirNode.getNsQuota();
       long oldDsQuota = dirNode.getDsQuota();
       if (nsQuota == HdfsConstants.QUOTA_DONT_SET) {
@@ -2026,14 +2004,14 @@ public class FSDirectory implements Clos
   }
   
   /**
-   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
-   * contract.
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
    * @throws SnapshotAccessControlException if path is in RO snapshot
    * @see #unprotectedSetQuota(String, long, long)
    */
-  void setQuota(String src, long nsQuota, long dsQuota)
-      throws FileNotFoundException, QuotaExceededException,
-      UnresolvedLinkException, SnapshotAccessControlException {
+  void setQuota(String src, long nsQuota, long dsQuota) 
+      throws FileNotFoundException, PathIsNotDirectoryException,
+      QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
     writeLock();
     try {
       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
@@ -2242,7 +2220,7 @@ public class FSDirectory implements Clos
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
     final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm);
-    return addNode(path, symlink, UNKNOWN_DISK_SPACE);
+    return addINode(path, symlink)? symlink: null;
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Nov 27 23:02:00 2012
@@ -288,7 +288,7 @@ class FSImageFormat {
       }
       // check if the new inode belongs to the same parent
       if(!isParent(pathComponents, parentPath)) {
-        parentINode = fsDir.getParent(pathComponents);
+        parentINode = fsDir.rootDir.getParent(pathComponents);
         parentPath = getParent(pathComponents);
       }
 
@@ -305,7 +305,7 @@ class FSImageFormat {
    */
   void addToParent(INodeDirectory parent, INode child) {
     // NOTE: This does not update space counts for parents
-    if (parent.addChild(child, false) == null) {
+    if (!parent.addChild(child, false)) {
       return;
     }
     namesystem.dir.cacheName(child);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Tue Nov 27 23:02:00 2012
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -43,13 +43,14 @@ import com.google.common.annotations.Vis
  */
 public class INodeDirectory extends INode {
   /** Cast INode to INodeDirectory. */
-  public static INodeDirectory valueOf(INode inode, String path
-      ) throws IOException {
+  public static INodeDirectory valueOf(INode inode, Object path
+      ) throws FileNotFoundException, PathIsNotDirectoryException {
     if (inode == null) {
-      throw new IOException("Directory does not exist: " + path);
+      throw new FileNotFoundException("Directory does not exist: "
+          + DFSUtil.path2String(path));
     }
     if (!inode.isDirectory()) {
-      throw new IOException("Path is not a directory: " + path);
+      throw new PathIsNotDirectoryException(DFSUtil.path2String(path));
     }
     return (INodeDirectory)inode; 
   }
@@ -357,16 +358,17 @@ public class INodeDirectory extends INod
    * @param setModTime set modification time for the parent node
    *                   not needed when replaying the addition and 
    *                   the parent already has the proper mod time
-   * @return  null if the child with this name already exists; 
-   *          node, otherwise
+   * @return false if the child with this name already exists; 
+   *         otherwise, return true;
    */
-  public <T extends INode> T addChild(final T node, boolean setModTime) {
+  public boolean addChild(final INode node, final boolean setModTime) {
     if (children == null) {
       children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
     }
     final int low = searchChildren(node);
-    if(low >= 0)
-      return null;
+    if (low >= 0) {
+      return false;
+    }
     node.parent = this;
     children.add(-low - 1, node);
     // update modification time of the parent directory
@@ -375,7 +377,7 @@ public class INodeDirectory extends INod
     if (node.getGroupName() == null) {
       node.setGroup(getGroupName());
     }
-    return node;
+    return true;
   }
 
   /**
@@ -384,53 +386,32 @@ public class INodeDirectory extends INod
    * 
    * @param path file path
    * @param newNode INode to be added
-   * @return null if the node already exists; inserted INode, otherwise
+   * @return false if the node already exists; otherwise, return true;
    * @throws FileNotFoundException if parent does not exist or 
    * @throws UnresolvedLinkException if any path component is a symbolic link
    * is not a directory.
    */
-  <T extends INode> T addNode(String path, T newNode
-      ) throws FileNotFoundException, UnresolvedLinkException  {
+  boolean addINode(String path, INode newNode
+      ) throws FileNotFoundException, PathIsNotDirectoryException,
+      UnresolvedLinkException {
     byte[][] pathComponents = getPathComponents(path);        
-    return addToParent(pathComponents, newNode, true) == null? null: newNode;
+    if (pathComponents.length < 2) { // add root
+      return false;
+    }
+    newNode.setLocalName(pathComponents[pathComponents.length - 1]);
+    // insert into the parent children list
+    INodeDirectory parent = getParent(pathComponents);
+    return parent.addChild(newNode, true);
   }
 
   INodeDirectory getParent(byte[][] pathComponents
-      ) throws FileNotFoundException, UnresolvedLinkException {
+      ) throws FileNotFoundException, PathIsNotDirectoryException,
+      UnresolvedLinkException {
     if (pathComponents.length < 2)  // add root
       return null;
     // Gets the parent INode
     INodesInPath inodes =  getExistingPathINodes(pathComponents, 2, false);
-    INode inode = inodes.inodes[0];
-    if (inode == null) {
-      throw new FileNotFoundException("Parent path does not exist: "+
-          DFSUtil.byteArray2String(pathComponents));
-    }
-    if (!inode.isDirectory()) {
-      throw new FileNotFoundException("Parent path is not a directory: "+
-          DFSUtil.byteArray2String(pathComponents));
-    }
-    return (INodeDirectory)inode;
-  }
-  
-  /**
-   * Add new inode 
-   * Optimized version of addNode()
-   * 
-   * @return  parent INode if new inode is inserted
-   *          or null if it already exists.
-   * @throws  FileNotFoundException if parent does not exist or 
-   *          is not a directory.
-   */
-  INodeDirectory addToParent(byte[][] pathComponents, INode newNode,
-      boolean propagateModTime) throws FileNotFoundException, UnresolvedLinkException {
-    if (pathComponents.length < 2) { // add root
-      return null;
-    }
-    newNode.setLocalName(pathComponents[pathComponents.length - 1]);
-    // insert into the parent children list
-    INodeDirectory parent = getParent(pathComponents);
-    return parent.addChild(newNode, propagateModTime) == null? null: parent;
+    return INodeDirectory.valueOf(inodes.inodes[0], pathComponents);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Nov 27 23:02:00 2012
@@ -33,7 +33,8 @@ import org.apache.hadoop.hdfs.server.blo
 @InterfaceAudience.Private
 public class INodeFile extends INode implements BlockCollection {
   /** Cast INode to INodeFile. */
-  public static INodeFile valueOf(INode inode, String path) throws IOException {
+  public static INodeFile valueOf(INode inode, String path
+      ) throws FileNotFoundException {
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + path);
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Tue Nov 27 23:02:00 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -36,10 +37,10 @@ import org.apache.hadoop.hdfs.server.com
 public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
   /** Cast INode to INodeFileUnderConstruction. */
   public static INodeFileUnderConstruction valueOf(INode inode, String path
-      ) throws IOException {
+      ) throws FileNotFoundException {
     final INodeFile file = INodeFile.valueOf(inode, path);
     if (!file.isUnderConstruction()) {
-      throw new IOException("File is not under construction: " + path);
+      throw new FileNotFoundException("File is not under construction: " + path);
     }
     return (INodeFileUnderConstruction)file;
   }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1412283-1414454

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1412283-1414454

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1412283-1414454

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1412283-1414454

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1412283-1414454

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java Tue Nov 27 23:02:00 2012
@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -239,8 +240,8 @@ public class TestINodeFile {
       try {
         INodeDirectory.valueOf(from, path);
         fail();
-      } catch(IOException ioe) {
-        assertTrue(ioe.getMessage().contains("Directory does not exist"));
+      } catch(FileNotFoundException e) {
+        assertTrue(e.getMessage().contains("Directory does not exist"));
       }
     }
 
@@ -264,8 +265,7 @@ public class TestINodeFile {
       try {
         INodeDirectory.valueOf(from, path);
         fail();
-      } catch(IOException ioe) {
-        assertTrue(ioe.getMessage().contains("Path is not a directory"));
+      } catch(PathIsNotDirectoryException e) {
       }
     }
 
@@ -286,8 +286,7 @@ public class TestINodeFile {
       try {
         INodeDirectory.valueOf(from, path);
         fail();
-      } catch(IOException ioe) {
-        assertTrue(ioe.getMessage().contains("Path is not a directory"));
+      } catch(PathIsNotDirectoryException e) {
       }
     }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml?rev=1414456&r1=1414455&r2=1414456&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml Tue Nov 27 23:02:00 2012
@@ -15488,7 +15488,7 @@
       <comparators>
         <comparator>
           <type>SubstringComparator</type>
-          <expected-output>Cannot set quota on a file: /test/file1</expected-output>
+          <expected-output>setQuota: `/test/file1': Is not a directory</expected-output>
         </comparator>
       </comparators>
     </test>



Mime
View raw message