hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1164685 - in /hadoop/common/branches/branch-0.20-security: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/TestFileCreation.java
Date Fri, 02 Sep 2011 20:10:44 GMT
Author: suresh
Date: Fri Sep  2 20:10:44 2011
New Revision: 1164685

URL: http://svn.apache.org/viewvc?rev=1164685&view=rev
Log:
Porting change from 0.20-append - HDFS-826. Allow a mechanism for an application to detect
that datanode(s) have died in the write pipeline. Contributed by Dhruba.


Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileCreation.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164685&r1=1164684&r2=1164685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep  2 20:10:44 2011
@@ -9,6 +9,9 @@ Release 0.20.205.0 - unreleased
 
     HDFS-200. Support append and sync for hadoop 0.20 branch. (dhruba)
 
+    HDFS-826. Allow a mechanism for an application to detect that 
+    datanode(s) have died in the write pipeline. (dhruba)
+
   BUG FIXES
 
     MAPREDUCE-2324. Removed usage of broken

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164685&r1=1164684&r2=1164685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Fri Sep  2 20:10:44 2011
@@ -2340,6 +2340,8 @@ public class DFSClient implements FSCons
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
     private volatile boolean appendChunk = false;   // appending to existing partial block
     private long initialFileSize = 0; // at time of file open
+    private Progressable progress;
+    private short blockReplication; // replication factor of file
 
     Token<BlockTokenIdentifier> getAccessToken() {
       return accessToken;
@@ -2873,13 +2875,12 @@ public class DFSClient implements FSCons
       }
     }
 
-    private Progressable progress;
-
     private DFSOutputStream(String src, long blockSize, Progressable progress,
-        int bytesPerChecksum) throws IOException {
+        int bytesPerChecksum, short replication) throws IOException {
       super(new CRC32(), bytesPerChecksum, 4);
       this.src = src;
       this.blockSize = blockSize;
+      this.blockReplication = replication;
       this.progress = progress;
       if (progress != null) {
         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -2903,7 +2904,7 @@ public class DFSClient implements FSCons
     DFSOutputStream(String src, FsPermission masked, boolean overwrite,
         short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
-      this(src, blockSize, progress, bytesPerChecksum);
+      this(src, blockSize, progress, bytesPerChecksum, replication);
 
       computePacketChunkSize(writePacketSize, bytesPerChecksum);
 
@@ -2925,7 +2926,7 @@ public class DFSClient implements FSCons
     DFSOutputStream(String src, int buffersize, Progressable progress,
         LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
-      this(src, stat.getBlockSize(), progress, bytesPerChecksum);
+      this(src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
       initialFileSize = stat.getLen(); // length of file when opened
 
       //
@@ -3341,6 +3342,24 @@ public class DFSClient implements FSCons
     }
 
     /**
+     * Returns the number of replicas of current block. This can be different
+     * from the designated replication factor of the file because the NameNode
+     * does not replicate the block to which a client is currently writing to.
+     * The client continues to write to a block even if a few datanodes in the
+     * write pipeline have failed. If the current block is full and the next
+     * block is not yet allocated, then this API will return 0 because there are
+     * no replicas in the pipeline.
+     */
+    public int getNumCurrentReplicas() throws IOException {
+      synchronized(dataQueue) {
+        if (nodes == null) {
+          return blockReplication;
+        }
+        return nodes.length;
+      }
+    }
+
+    /**
      * Waits till all existing data is flushed and confirmations 
      * received from datanodes. 
      */

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1164685&r1=1164684&r2=1164685&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
(original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileCreation.java
Fri Sep  2 20:10:44 2011
@@ -93,7 +93,7 @@ public class TestFileCreation extends ju
     byte[] buffer = AppendTestUtil.randomBytes(seed, size);
     stm.write(buffer, 0, size);
   }
-
+  
   //
   // verify that the data written to the full blocks are sane
   // 
@@ -478,7 +478,16 @@ public class TestFileCreation extends ju
                          + "Created file " + file1);
 
       // write two full blocks.
-      writeFile(stm, numBlocks * blockSize);
+      int remainingPiece = blockSize/2;
+      int blocksMinusPiece = numBlocks * blockSize - remainingPiece;
+      writeFile(stm, blocksMinusPiece);
+      stm.sync();
+      int actualRepl = ((DFSClient.DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      // if we sync on a block boundary, actualRepl will be 0
+      assertTrue(file1 + " should be replicated to 1 datanodes, not " + actualRepl,
+                 actualRepl == 1);
+      writeFile(stm, remainingPiece);
       stm.sync();
 
       // rename file wile keeping it open.
@@ -686,6 +695,10 @@ public class TestFileCreation extends ju
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something".getBytes());
       out.sync();
+      int actualRepl = ((DFSClient.DFSOutputStream)(out.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.",
+                 actualRepl == DATANODE_NUM);
 
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery



Mime
View raw message