hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1081580 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Mon, 14 Mar 2011 22:04:11 GMT
Author: szetszwo
Date: Mon Mar 14 22:04:10 2011
New Revision: 1081580

URL: http://svn.apache.org/viewvc?rev=1081580&view=rev
Log:
HDFS-1675. Support transferring RBW between datanodes.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Mar 14 22:04:10 2011
@@ -19,6 +19,8 @@ Trunk (unreleased changes)
 
     HDFS-1626. Make BLOCK_INVALIDATE_LIMIT configurable. (szetszwo)
 
+    HDFS-1675. Support transferring RBW between datanodes. (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Mon
Mar 14 22:04:10 2011
@@ -46,11 +46,10 @@ public interface DataTransferProtocol {
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 19:
-   *    Change the block packet ack protocol to include seqno,
-   *    numberOfReplies, reply0, reply1, ...
+   * Version 20:
+   *    Added TRANSFER_RBW
    */
-  public static final int DATA_TRANSFER_VERSION = 19;
+  public static final int DATA_TRANSFER_VERSION = 20;
 
   /** Operation */
   public enum Op {
@@ -144,7 +143,9 @@ public interface DataTransferProtocol {
     // Recover a failed PIPELINE_CLOSE
     PIPELINE_CLOSE_RECOVERY,
     // pipeline set up for block creation
-    PIPELINE_SETUP_CREATE;
+    PIPELINE_SETUP_CREATE,
+    // similar to replication but transferring rbw instead of finalized
+    TRANSFER_RBW;
     
     final static private byte RECOVERY_BIT = (byte)1;
     

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon
Mar 14 22:04:10 2011
@@ -80,6 +80,7 @@ class BlockReceiver implements java.io.C
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private final DataNode datanode;
+  private final BlockConstructionStage initialStage;
   final private ReplicaInPipelineInterface replicaInfo;
   volatile private boolean mirrorError;
 
@@ -96,6 +97,11 @@ class BlockReceiver implements java.io.C
       this.clientName = clientName;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
+      
+      //for datanode, we have
+      //1: clientName.length() == 0, and
+      //2: stage == null, PIPELINE_SETUP_CREATE or TRANSFER_RBW
+      this.initialStage = stage;
       //
       // Open local disk out
       //
@@ -647,9 +653,11 @@ class BlockReceiver implements java.io.C
         // close the block/crc files
         close();
 
-        // Finalize the block. Does this fsync()?
-        block.setNumBytes(replicaInfo.getNumBytes());
-        datanode.data.finalizeBlock(block);
+        if (initialStage != BlockConstructionStage.TRANSFER_RBW) {
+          // Finalize the block. Does this fsync()?
+          block.setNumBytes(replicaInfo.getNumBytes());
+          datanode.data.finalizeBlock(block);
+        }
         datanode.myMetrics.blocksWritten.inc();
       }
 
@@ -680,7 +688,8 @@ class BlockReceiver implements java.io.C
    * if this write is for a replication request (and not from a client)
    */
   private void cleanupBlock() throws IOException {
-    if (clientName.length() == 0) { // not client write
+    if (clientName.length() == 0
+        && initialStage != BlockConstructionStage.TRANSFER_RBW) {
       datanode.data.unfinalizeBlock(block);
     }
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Mar
14 22:04:10 2011
@@ -1215,7 +1215,8 @@ public class DataNode extends Configured
                  block + " to " + xfersBuilder);                       
       }
 
-      new Daemon(new DataTransfer(xferTargets, block, this)).start();
+      new Daemon(new DataTransfer(xferTargets, block,
+          BlockConstructionStage.PIPELINE_SETUP_CREATE)).start();
     }
   }
 
@@ -1340,19 +1341,20 @@ public class DataNode extends Configured
    * Used for transferring a block of data.  This class
    * sends a piece of data to another DataNode.
    */
-  class DataTransfer implements Runnable {
-    DatanodeInfo targets[];
-    Block b;
-    DataNode datanode;
+  private class DataTransfer implements Runnable {
+    final DatanodeInfo[] targets;
+    final Block b;
+    final BlockConstructionStage stage;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException
{
+    DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage
+        ) throws IOException {
       this.targets = targets;
       this.b = b;
-      this.datanode = datanode;
+      this.stage = stage;
     }
 
     /**
@@ -1378,7 +1380,7 @@ public class DataNode extends Configured
                                                             SMALL_BUFFER_SIZE));
 
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, datanode);
+            false, false, false, DataNode.this);
         DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
 
         //
@@ -1389,9 +1391,9 @@ public class DataNode extends Configured
           accessToken = blockTokenSecretManager.generateToken(null, b,
           EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
+
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b, 0, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
-            srcNode, targets, accessToken);
+            b, 0, stage, 0, 0, 0, "", srcNode, targets, accessToken);
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
@@ -1403,7 +1405,7 @@ public class DataNode extends Configured
         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
         // check if there are any disk problem
-        datanode.checkDiskError();
+        checkDiskError();
         
       } finally {
         xmitsInProgress.getAndDecrement();
@@ -1949,13 +1951,17 @@ public class DataNode extends Configured
   /** {@inheritDoc} */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final Block block) throws IOException {
+    checkWriteAccess(block);
+    return data.getReplicaVisibleLength(block);
+  }
+
+  private void checkWriteAccess(final Block block) throws IOException {
     if (isBlockTokenEnabled) {
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();
       if (tokenIds.size() != 1) {
-        throw new IOException("Can't continue with getReplicaVisibleLength() "
-            + "authorization since none or more than one BlockTokenIdentifier "
-            + "is found.");
+        throw new IOException("Can't continue since none or more than one "
+            + "BlockTokenIdentifier is found.");
       }
       for (TokenIdentifier tokenId : tokenIds) {
         BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
@@ -1966,10 +1972,53 @@ public class DataNode extends Configured
             BlockTokenSecretManager.AccessMode.WRITE);
       }
     }
+  }
 
-    return data.getReplicaVisibleLength(block);
+  /**
+   * Transfer a block to the datanode targets.
+   * @return rbw's visible length
+   */
+  long transferBlockForPipelineRecovery(final Block b,
+      final DatanodeInfo[] targets) throws IOException {
+    checkWriteAccess(b);
+    final Block stored;
+    final boolean isRbw;
+    final long visible;
+
+    //get replica information
+    synchronized(data) {
+      stored = data.getStoredBlock(b.getBlockId());
+      if (stored.getGenerationStamp() < b.getGenerationStamp()) {
+        throw new IOException(
+            "stored.getGenerationStamp() < b.getGenerationStamp(), stored="
+            + stored + ", b=" + b);        
+      }
+      isRbw = data.isValidRbw(b);
+      visible = data.getReplicaVisibleLength(b);
+    }
+
+    if (targets.length > 0) {
+      if (isRbw) {
+        //transfer rbw
+        new DataTransfer(targets, b, BlockConstructionStage.TRANSFER_RBW).run();
+      } else {
+        //transfer finalized replica
+        transferBlock(stored, targets);
+      }
+    }
+    //TODO: should return: visible + storedGS + isRbw
+    return visible;
   }
-  
+
+  /**
+   * Covert an existing temporary replica to a rbw. 
+   * @param temporary specifies id, gs and visible bytes.
+   * @throws IOException
+   */
+  void convertTemporaryToRbw(final Block temporary) throws IOException {
+    data.convertTemporaryToRbw(temporary);
+  }
+
   // Determine a Datanode's streaming address
   public static InetSocketAddress getStreamingAddr(Configuration conf) {
     return NetUtils.createSocketAddr(

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon
Mar 14 22:04:10 2011
@@ -401,7 +401,8 @@ class DataXceiver extends DataTransferPr
       // if this write is for a replication request or recovering
       // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0 || 
+      if ((client.length() == 0 && stage != BlockConstructionStage.TRANSFER_RBW)
+          ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Mar
14 22:04:10 2011
@@ -126,21 +126,7 @@ public class FSDataset implements FSCons
     private File addBlock(Block b, File src, boolean createOk, 
                           boolean resetIdx) throws IOException {
       if (numBlocks < maxBlocksPerDir) {
-        File dest = new File(dir, b.getBlockName());
-        File metaData = getMetaFile( src, b );
-        File newmeta = getMetaFile(dest, b);
-        if ( ! metaData.renameTo( newmeta ) ||
-            ! src.renameTo( dest ) ) {
-          throw new IOException( "could not move files for " + b +
-                                 " from " + src + " to " + 
-                                 dest.getAbsolutePath() + " or from"
-                                 + metaData + " to " + newmeta);
-        }
-        if (DataNode.LOG.isDebugEnabled()) {
-          DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
-          DataNode.LOG.debug("addBlock: Moved " + src + " to " + dest);
-        }
-
+        final File dest = moveBlockFiles(b, src, dir);
         numBlocks += 1;
         return dest;
       }
@@ -1011,6 +997,26 @@ public class FSDataset implements FSCons
    return info.unlinkBlock(numLinks);
   }
 
+  private static File moveBlockFiles(Block b, File srcfile, File destdir
+      ) throws IOException {
+    final File dstfile = new File(destdir, b.getBlockName());
+    final File srcmeta = getMetaFile(srcfile, b);
+    final File dstmeta = getMetaFile(dstfile, b);
+    if (!srcmeta.renameTo(dstmeta)) {
+      throw new IOException("Failed to move meta file for " + b
+          + " from " + srcmeta + " to " + dstmeta);
+    }
+    if (!srcfile.renameTo(dstfile)) {
+      throw new IOException("Failed to move block file for " + b
+          + " from " + srcfile + " to " + dstfile.getAbsolutePath());
+    }
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta);
+      DataNode.LOG.debug("addBlock: Moved " + srcfile + " to " + dstfile);
+    }
+    return dstfile;
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
@@ -1347,6 +1353,55 @@ public class FSDataset implements FSCons
   }
   
   @Override // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
+      final Block b) throws IOException {
+    final long blockId = b.getBlockId();
+    final long expectedGs = b.getGenerationStamp();
+    final long visible = b.getNumBytes();
+    DataNode.LOG.info("Covert the temporary replica " + b
+        + " to RBW, visible length is " + visible);
+
+    // get replica
+    final ReplicaInfo r = volumeMap.get(blockId);
+    if (r == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    // check the replica's state
+    if (r.getState() != ReplicaState.TEMPORARY) {
+      throw new ReplicaNotFoundException(
+          "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+    }
+    // check generation stamp
+    if (r.getGenerationStamp() != expectedGs) {
+      throw new ReplicaNotFoundException(
+          "r.getGenerationStamp() != expectedGs = " + expectedGs + ", r=" + r);
+    }
+    // check length
+    final long numBytes = r.getNumBytes();
+    if (numBytes < visible) {
+      throw new ReplicaNotFoundException(numBytes + " = numBytes < visible = "
+          + visible + ", r=" + r);
+    }
+    // check volume
+    final FSVolume v = r.getVolume();
+    if (v == null) {
+      throw new IOException("r.getVolume() = null, temp="  + r);
+    }
+    
+    // move block files to the rbw directory
+    final File dest = moveBlockFiles(b, r.getBlockFile(), v.rbwDir);
+    // create RBW
+    final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
+        blockId, numBytes, expectedGs,
+        v, dest.getParentFile(), Thread.currentThread());
+    rbw.setBytesAcked(visible);
+    // overwrite the RBW in the volume map
+    volumeMap.add(rbw);
+    return rbw;
+  }
+
+  @Override // FSDatasetInterface
   public synchronized ReplicaInPipelineInterface createTemporary(Block b)
       throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
@@ -1550,12 +1605,23 @@ public class FSDataset implements FSCons
    */
   @Override // FSDatasetInterface
   public boolean isValidBlock(Block b) {
-    ReplicaInfo replicaInfo = volumeMap.get(b);
-    if (replicaInfo == null || 
-        replicaInfo.getState() != ReplicaState.FINALIZED) {
-      return false;
-    }
-    return replicaInfo.getBlockFile().exists();
+    return isValid(b, ReplicaState.FINALIZED);
+  }
+
+  /**
+   * Check whether the given block is a valid RBW.
+   */
+  @Override // {@link FSDatasetInterface}
+  public boolean isValidRbw(final Block b) {
+    return isValid(b, ReplicaState.RBW);
+  }
+
+  /** Does the block exist and have the given state? */
+  private boolean isValid(final Block b, final ReplicaState state) {
+    final ReplicaInfo replicaInfo = volumeMap.get(b);
+    return replicaInfo != null
+        && replicaInfo.getState() == state
+        && replicaInfo.getBlockFile().exists();
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
Mon Mar 14 22:04:10 2011
@@ -215,6 +215,14 @@ public interface FSDatasetInterface exte
   throws IOException;
 
   /**
+   * Covert a temporary replica to a RBW.
+   * @param temporary the temporary replica being converted
+   * @return the result RBW
+   */
+  public ReplicaInPipelineInterface convertTemporaryToRbw(
+      Block temporary) throws IOException;
+
+  /**
    * Append to a finalized replica and returns the meta info of the replica
    * 
    * @param b block
@@ -282,6 +290,13 @@ public interface FSDatasetInterface exte
   public boolean isValidBlock(Block b);
 
   /**
+   * Is the block a valid RBW?
+   * @param b
+   * @return - true if the specified block is a valid RBW
+   */
+  public boolean isValidRbw(Block b);
+
+  /**
    * Invalidates the specified blocks
    * @param invalidBlks - the blocks to be invalidated
    * @throws IOException

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1081580&r1=1081579&r2=1081580&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Mon Mar 14 22:04:10 2011
@@ -358,7 +358,7 @@ public class SimulatedFSDataset  impleme
 
   @Override
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    if (isBeingWritten(b)) {
+    if (isValidRbw(b)) {
       blockMap.remove(b);
     }
   }
@@ -452,15 +452,16 @@ public class SimulatedFSDataset  impleme
     return binfo.isFinalized();
   }
 
-  /* check if a block is created but not finalized */
-  private synchronized boolean isBeingWritten(Block b) {
+  @Override
+  public synchronized boolean isValidRbw(Block b) {
     BInfo binfo = blockMap.get(b);
     if (binfo == null) {
       return false;
     }
     return !binfo.isFinalized();  
   }
-  
+
+  @Override
   public String toString() {
     return getStorageInfo();
   }
@@ -541,7 +542,7 @@ public class SimulatedFSDataset  impleme
           throw new ReplicaAlreadyExistsException("Block " + b + 
               " is valid, and cannot be written to.");
       }
-    if (isBeingWritten(b)) {
+    if (isValidRbw(b)) {
         throw new ReplicaAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
@@ -828,4 +829,17 @@ public class SimulatedFSDataset  impleme
   public long getReplicaVisibleLength(Block block) throws IOException {
     return block.getNumBytes();
   }
+
+  @Override
+  public ReplicaInPipelineInterface convertTemporaryToRbw(Block temporary)
+      throws IOException {
+    final BInfo r = blockMap.get(temporary);
+    if (r == null) {
+      throw new IOException("Block not found, temporary=" + temporary);
+    } else if (r.isFinalized()) {
+      throw new IOException("Replica already finalized, temporary="
+          + temporary + ", r=" + r);
+    }
+    return r;
+  }
 }

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1081580&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
(added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
Mon Mar 14 22:04:10 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test transferring RBW between datanodes */
+public class TestTransferRbw {
+  private static final Log LOG = LogFactory.getLog(TestTransferRbw.class);
+
+  private static final Random RAN = new Random();
+  private static final short REPLICATION = (short)1;
+
+  private static ReplicaBeingWritten getRbw(final DataNode datanode
+      ) throws InterruptedException {
+    return (ReplicaBeingWritten)getReplica(datanode, ReplicaState.RBW);
+  }
+  private static ReplicaInPipeline getReplica(final DataNode datanode,
+      final ReplicaState expectedState) throws InterruptedException {
+    final FSDataset dataset = ((FSDataset)datanode.data);
+    final Collection<ReplicaInfo> replicas = dataset.volumeMap.replicas();
+    for(int i = 0; i < 5 && replicas.size() == 0; i++) {
+      LOG.info("wait since replicas.size() == 0; i=" + i);
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(1, replicas.size());
+    final ReplicaInfo r = replicas.iterator().next();
+    Assert.assertEquals(expectedState, r.getState());
+    return (ReplicaInPipeline)r;
+  }
+
+  @Test
+  public void testTransferRbw() throws Exception {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION).build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+      //create a file, write some data and leave it open. 
+      final Path p = new Path("/foo");
+      final int size = (1 << 16) + RAN.nextInt(1 << 16);
+      LOG.info("size = " + size);
+      final FSDataOutputStream out = fs.create(p, REPLICATION);
+      final byte[] bytes = new byte[1024];
+      for(int remaining = size; remaining > 0; ) {
+        RAN.nextBytes(bytes);
+        final int len = bytes.length < remaining? bytes.length: remaining;
+        out.write(bytes, 0, len);
+        out.hflush();
+        remaining -= len;
+      }
+
+      //get the RBW
+      final ReplicaBeingWritten oldrbw;
+      final DataNode newnode;
+      final DatanodeInfo newnodeinfo;
+      final long visible;
+      {
+        final DataNode oldnode = cluster.getDataNodes().get(0);
+        oldrbw = getRbw(oldnode);
+        LOG.info("oldrbw = " + oldrbw);
+        
+        //add a datanode
+        cluster.startDataNodes(conf, 1, true, null, null);
+        newnode = cluster.getDataNodes().get(REPLICATION);
+        
+        {
+          final DatanodeInfo[] datatnodeinfos = cluster.getNameNode(
+              ).getDatanodeReport(DatanodeReportType.LIVE);
+          Assert.assertEquals(2, datatnodeinfos.length);
+          int i = 0;
+          for(; i < datatnodeinfos.length
+                && !datatnodeinfos[i].equals(newnode.dnRegistration); i++);
+          Assert.assertTrue(i < datatnodeinfos.length);
+          newnodeinfo = datatnodeinfos[i];
+        }
+        
+        //transfer RBW
+        visible = oldnode.transferBlockForPipelineRecovery(oldrbw, new DatanodeInfo[]{newnodeinfo});
+      }
+
+      //check temporary
+      final ReplicaInPipeline temp = getReplica(newnode, ReplicaState.TEMPORARY);
+      LOG.info("temp = " + temp);
+      Assert.assertEquals(oldrbw.getBlockId(), temp.getBlockId());
+      Assert.assertEquals(oldrbw.getGenerationStamp(), temp.getGenerationStamp());
+      final Block b = new Block(oldrbw.getBlockId(), visible, oldrbw.getGenerationStamp());
+      //convert temporary to rbw
+      newnode.convertTemporaryToRbw(b);
+      //check new rbw
+      final ReplicaBeingWritten newrbw = getRbw(newnode);
+      LOG.info("newrbw = " + newrbw);
+      Assert.assertEquals(oldrbw.getBlockId(), newrbw.getBlockId());
+      Assert.assertEquals(oldrbw.getGenerationStamp(), newrbw.getGenerationStamp());
+      Assert.assertEquals(oldrbw.getVisibleLength(), newrbw.getVisibleLength());
+
+      LOG.info("DONE");
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}



Mime
View raw message