hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r923098 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSOutputStream.java src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
Date Mon, 15 Mar 2010 07:24:19 GMT
Author: dhruba
Date: Mon Mar 15 07:24:19 2010
New Revision: 923098

URL: http://svn.apache.org/viewvc?rev=923098&view=rev
Log:
HDFS-826. The DFSOutputStream has a API that returns the number of
active datanode(s) in the current pipeline. (dhruba)


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=923098&r1=923097&r2=923098&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Mar 15 07:24:19 2010
@@ -95,6 +95,9 @@ Trunk (unreleased changes)
     HDFS-850. The WebUI display more details about namenode memory usage.
     (Dmytro Molkov via dhruba)
 
+    HDFS-826. The DFSOutputStream has a API that returns the number of
+    active datanode(s) in the current pipeline. (dhruba)
+
   OPTIMIZATIONS
 
     HDFS-946. NameNode should not return full path name when lisitng a

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=923098&r1=923097&r2=923098&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Mon Mar 15 07:24:19
2010
@@ -122,6 +122,7 @@ class DFSOutputStream extends FSOutputSu
   private volatile boolean appendChunk = false;   // appending to existing partial block
   private long initialFileSize = 0; // at time of file open
   private Progressable progress;
+  private short blockReplication; // replication factor of file
   
   private class Packet {
     ByteBuffer buffer;           // only one of buf and buffer is non-null
@@ -1025,12 +1026,13 @@ class DFSOutputStream extends FSOutputSu
   }
 
   private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
-      int bytesPerChecksum) throws IOException {
+      int bytesPerChecksum, short replication) throws IOException {
     super(new PureJavaCrc32(), bytesPerChecksum, 4);
     this.dfsClient = dfsClient;
     this.conf = dfsClient.conf;
     this.src = src;
     this.blockSize = blockSize;
+    this.blockReplication = replication;
     this.progress = progress;
     if (progress != null) {
       DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
@@ -1055,7 +1057,7 @@ class DFSOutputStream extends FSOutputSu
       boolean createParent, short replication, long blockSize, Progressable progress,
       int buffersize, int bytesPerChecksum) 
       throws IOException, UnresolvedLinkException {
-    this(dfsClient, src, blockSize, progress, bytesPerChecksum);
+    this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
 
     computePacketChunkSize(dfsClient.writePacketSize, bytesPerChecksum);
 
@@ -1081,7 +1083,7 @@ class DFSOutputStream extends FSOutputSu
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
       LocatedBlock lastBlock, HdfsFileStatus stat,
       int bytesPerChecksum) throws IOException {
-    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum);
+    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
     initialFileSize = stat.getLen(); // length of file when opened
 
     //
@@ -1291,6 +1293,27 @@ class DFSOutputStream extends FSOutputSu
   public synchronized void hsync() throws IOException {
     hflush();
   }
+
+  /**
+   * Returns the number of replicas of current block. This can be different
+   * from the designated replication factor of the file because the NameNode
+   * does not replicate the block to which a client is currently writing to.
+   * The client continues to write to a block even if a few datanodes in the
+   * write pipeline have failed. 
+   * @return the number of valid replicas of the current block
+   */
+  public synchronized int getNumCurrentReplicas() throws IOException {
+    dfsClient.checkOpen();
+    isClosed();
+    if (streamer == null) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    DatanodeInfo[] currentNodes = streamer.getNodes();
+    if (currentNodes == null) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    return currentNodes.length;
+  }
   
   /**
    * Waits till all existing data is flushed and confirmations 
@@ -1446,4 +1469,4 @@ class DFSOutputStream extends FSOutputSu
     return streamer.getAccessToken();
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=923098&r1=923097&r2=923098&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Mon Mar 15
07:24:19 2010
@@ -460,10 +460,18 @@ public class TestFileCreation extends ju
       FSDataOutputStream stm = createFile(fs, file1, 1);
       System.out.println("testFileCreationNamenodeRestart: "
                          + "Created file " + file1);
+      int actualRepl = ((DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(file1 + " should be replicated to 1 datanodes.",
+                 actualRepl == 1);
 
       // write two full blocks.
       writeFile(stm, numBlocks * blockSize);
       stm.hflush();
+      actualRepl = ((DFSOutputStream)(stm.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(file1 + " should still be replicated to 1 datanodes.",
+                 actualRepl == 1);
 
       // rename file wile keeping it open.
       Path fileRenamed = new Path("/filestatusRenamed.dat");
@@ -857,6 +865,10 @@ public class TestFileCreation extends ju
       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
       out.write("something".getBytes());
       out.hflush();
+      int actualRepl = ((DFSOutputStream)(out.getWrappedStream())).
+                        getNumCurrentReplicas();
+      assertTrue(f + " should be replicated to " + DATANODE_NUM + " datanodes.",
+                 actualRepl == DATANODE_NUM);
 
       // set the soft and hard limit to be 1 second so that the
       // namenode triggers lease recovery



Mime
View raw message