hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [11/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apac...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Oct 19 02:25:55 2012
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -74,7 +73,6 @@ class BPOfferService {
    */
   DatanodeRegistration bpRegistration;
   
-  UpgradeManagerDatanode upgradeManager = null;
   private final DataNode dn;
 
   /**
@@ -260,33 +258,6 @@ class BPOfferService {
     }
   }
 
-  synchronized UpgradeManagerDatanode getUpgradeManager() {
-    if(upgradeManager == null)
-      upgradeManager = 
-        new UpgradeManagerDatanode(dn, getBlockPoolId());
-    
-    return upgradeManager;
-  }
-  
-  void processDistributedUpgradeCommand(UpgradeCommand comm)
-  throws IOException {
-    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
-    upgradeManager.processUpgradeCommand(comm);
-  }
-
-  /**
-   * Start distributed upgrade if it should be initiated by the data-node.
-   */
-  synchronized void startDistributedUpgradeIfNeeded() throws IOException {
-    UpgradeManagerDatanode um = getUpgradeManager();
-    
-    if(!um.getUpgradeState())
-      return;
-    um.setUpgradeState(false, um.getUpgradeVersion());
-    um.startUpgrade();
-    return;
-  }
-  
   DataNode getDataNode() {
     return dn;
   }
@@ -374,9 +345,6 @@ class BPOfferService {
 
     if (bpServices.isEmpty()) {
       dn.shutdownBlockPool(this);
-      
-      if(upgradeManager != null)
-        upgradeManager.shutdownUpgrade();
     }
   }
 
@@ -591,10 +559,6 @@ class BPOfferService {
 
       dn.finalizeUpgradeForPool(bp);
       break;
-    case UpgradeCommand.UC_ACTION_START_UPGRADE:
-      // start distributed upgrade here
-      processDistributedUpgradeCommand((UpgradeCommand)cmd);
-      break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
       String who = "NameNode at " + actor.getNNSocketAddress();
       dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 
@@ -226,7 +227,7 @@ class BPServiceActor implements Runnable
    */
   void scheduleBlockReport(long delay) {
     if (delay > 0) { // send BR after random delay
-      lastBlockReport = System.currentTimeMillis()
+      lastBlockReport = Time.now()
       - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
     } else { // send at next heartbeat
       lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
@@ -323,7 +324,7 @@ class BPServiceActor implements Runnable
    * Run an immediate block report on this thread. Used by tests.
    */
   @VisibleForTesting
-  void triggerBlockReportForTests() throws IOException {
+  void triggerBlockReportForTests() {
     synchronized (pendingIncrementalBR) {
       lastBlockReport = 0;
       lastHeartbeat = 0;
@@ -339,7 +340,7 @@ class BPServiceActor implements Runnable
   }
   
   @VisibleForTesting
-  void triggerHeartbeatForTests() throws IOException {
+  void triggerHeartbeatForTests() {
     synchronized (pendingIncrementalBR) {
       lastHeartbeat = 0;
       pendingIncrementalBR.notifyAll();
@@ -354,7 +355,7 @@ class BPServiceActor implements Runnable
   }
 
   @VisibleForTesting
-  void triggerDeletionReportForTests() throws IOException {
+  void triggerDeletionReportForTests() {
     synchronized (pendingIncrementalBR) {
       lastDeletedReport = 0;
       pendingIncrementalBR.notifyAll();
@@ -561,7 +562,7 @@ class BPServiceActor implements Runnable
         // or work arrives, and then iterate again.
         //
         long waitTime = dnConf.heartBeatInterval - 
-        (System.currentTimeMillis() - lastHeartbeat);
+        (Time.now() - lastHeartbeat);
         synchronized(pendingIncrementalBR) {
           if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
@@ -669,7 +670,6 @@ class BPServiceActor implements Runnable
       
       while (shouldRun()) {
         try {
-          bpos.startDistributedUpgradeIfNeeded();
           offerService();
         } catch (Exception ex) {
           LOG.error("Exception in BPOfferService for " + this, ex);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Fri Oct 19 02:25:55 2012
@@ -122,6 +122,7 @@ class BlockPoolManager {
     try {
       UserGroupInformation.getLoginUser().doAs(
           new PrivilegedExceptionAction<Object>() {
+            @Override
             public Object run() throws Exception {
               for (BPOfferService bpos : offerServices) {
                 bpos.start();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Fri Oct 19 02:25:55 2012
@@ -49,6 +49,9 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Scans the block files under a block pool and verifies that the
@@ -90,7 +93,7 @@ class BlockPoolSliceScanner {
   private long totalTransientErrors = 0;
   private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
   
-  private long currentPeriodStart = System.currentTimeMillis();
+  private long currentPeriodStart = Time.now();
   private long bytesLeft = 0; // Bytes to scan in this period
   private long totalBytesToScan = 0;
   
@@ -114,10 +117,12 @@ class BlockPoolSliceScanner {
       this.block = block;
     }
     
+    @Override
     public int hashCode() {
       return block.hashCode();
     }
     
+    @Override
     public boolean equals(Object other) {
       return other instanceof BlockScanInfo &&
              compareTo((BlockScanInfo)other) == 0;
@@ -127,6 +132,7 @@ class BlockPoolSliceScanner {
       return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
     }
     
+    @Override
     public int compareTo(BlockScanInfo other) {
       long t1 = lastScanTime;
       long t2 = other.lastScanTime;
@@ -224,7 +230,7 @@ class BlockPoolSliceScanner {
     long period = Math.min(scanPeriod, 
                            Math.max(blockMap.size(),1) * 600 * 1000L);
     int periodInt = Math.abs((int)period);
-    return System.currentTimeMillis() - scanPeriod + 
+    return Time.now() - scanPeriod + 
         DFSUtil.getRandom().nextInt(periodInt);
   }
 
@@ -251,6 +257,11 @@ class BlockPoolSliceScanner {
     }
   }
 
+  @VisibleForTesting
+  long getTotalScans() {
+    return totalScans;
+  }
+
   /** @return the last scan time for the block pool. */
   long getLastScanTime() {
     return lastScanTime.get();
@@ -281,7 +292,7 @@ class BlockPoolSliceScanner {
       info = new BlockScanInfo(block);
     }
     
-    long now = System.currentTimeMillis();
+    long now = Time.now();
     info.lastScanType = type;
     info.lastScanTime = now;
     info.lastScanOk = scanOk;
@@ -358,12 +369,13 @@ class BlockPoolSliceScanner {
   }
   
   private synchronized void adjustThrottler() {
-    long timeLeft = currentPeriodStart+scanPeriod - System.currentTimeMillis();
+    long timeLeft = currentPeriodStart+scanPeriod - Time.now();
     long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
     throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
   }
   
-  private void verifyBlock(ExtendedBlock block) {
+  @VisibleForTesting
+  void verifyBlock(ExtendedBlock block) {
     BlockSender blockSender = null;
 
     /* In case of failure, attempt to read second time to reduce
@@ -481,7 +493,7 @@ class BlockPoolSliceScanner {
   private boolean assignInitialVerificationTimes() {
     //First updates the last verification times from the log file.
     if (verificationLog != null) {
-      long now = System.currentTimeMillis();
+      long now = Time.now();
       RollingLogs.LineIterator logIterator = null;
       try {
         logIterator = verificationLog.logs.iterator(false);
@@ -529,7 +541,7 @@ class BlockPoolSliceScanner {
       // Initially spread the block reads over half of scan period
       // so that we don't keep scanning the blocks too quickly when restarted.
       long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
-      long lastScanTime = System.currentTimeMillis() - scanPeriod;
+      long lastScanTime = Time.now() - scanPeriod;
 
       if (!blockInfoSet.isEmpty()) {
         BlockScanInfo info;
@@ -556,11 +568,27 @@ class BlockPoolSliceScanner {
 
     // reset the byte counts :
     bytesLeft = totalBytesToScan;
-    currentPeriodStart = System.currentTimeMillis();
+    currentPeriodStart = Time.now();
   }
   
+  private synchronized boolean workRemainingInCurrentPeriod() {
+    if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
+                  currentPeriodStart + ", period=" + scanPeriod + ", now=" +
+                  Time.now() + " " + blockPoolId);
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
   void scanBlockPoolSlice() {
-    startNewPeriod();
+    if (!workRemainingInCurrentPeriod()) {
+      return;
+    }
+
     // Create a new processedBlocks structure
     processedBlocks = new HashMap<Long, Integer>();
     if (!assignInitialVerificationTimes()) {
@@ -571,7 +599,7 @@ class BlockPoolSliceScanner {
       scan();
     } finally {
       totalBlocksScannedInLastRun.set(processedBlocks.size());
-      lastScanTime.set(System.currentTimeMillis());
+      lastScanTime.set(Time.now());
     }
   }
   
@@ -584,7 +612,7 @@ class BlockPoolSliceScanner {
         
       while (datanode.shouldRun && !Thread.interrupted()
           && datanode.isBPServiceAlive(blockPoolId)) {
-        long now = System.currentTimeMillis();
+        long now = Time.now();
         synchronized (this) {
           if ( now >= (currentPeriodStart + scanPeriod)) {
             startNewPeriod();
@@ -605,14 +633,14 @@ class BlockPoolSliceScanner {
       LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
       throw e;
     } finally {
-      cleanUp();
+      rollVerificationLogs();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Done scanning block pool: " + blockPoolId);
       }
     }
   }
   
-  private synchronized void cleanUp() {
+  private synchronized void rollVerificationLogs() {
     if (verificationLog != null) {
       try {
         verificationLog.logs.roll();
@@ -642,7 +670,7 @@ class BlockPoolSliceScanner {
     
     int total = blockInfoSet.size();
     
-    long now = System.currentTimeMillis();
+    long now = Time.now();
     
     Date date = new Date();
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Fri Oct 19 02:25:55 2012
@@ -138,7 +138,7 @@ public class BlockPoolSliceStorage exten
     // During startup some of them can upgrade or roll back
     // while others could be up-to-date for the regular startup.
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
+      doTransition(getStorageDir(idx), nsInfo, startOpt);
       assert getLayoutVersion() == nsInfo.getLayoutVersion() 
           : "Data-node and name-node layout versions must be the same.";
       assert getCTime() == nsInfo.getCTime() 
@@ -232,7 +232,7 @@ public class BlockPoolSliceStorage exten
    * @param startOpt startup option
    * @throws IOException
    */
-  private void doTransition(DataNode datanode, StorageDirectory sd,
+  private void doTransition(StorageDirectory sd,
       NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
     if (startOpt == StartupOption.ROLLBACK)
       doRollback(sd, nsInfo); // rollback if applicable
@@ -254,13 +254,9 @@ public class BlockPoolSliceStorage exten
           + blockpoolID);
     }
     if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
-        && this.cTime == nsInfo.getCTime())
+        && this.cTime == nsInfo.getCTime()) {
       return; // regular startup
-    
-    // verify necessity of a distributed upgrade
-    UpgradeManagerDatanode um = 
-      datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
-    verifyDistributedUpgradeProgress(um, nsInfo);
+    }
     if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
       doUpgrade(sd, nsInfo); // upgrade
@@ -440,6 +436,7 @@ public class BlockPoolSliceStorage exten
 
     // delete finalized.tmp dir in a separate thread
     new Daemon(new Runnable() {
+      @Override
       public void run() {
         try {
           deleteDir(tmpDir);
@@ -449,6 +446,7 @@ public class BlockPoolSliceStorage exten
         LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
       }
 
+      @Override
       public String toString() {
         return "Finalize " + dataDirPath;
       }
@@ -474,13 +472,6 @@ public class BlockPoolSliceStorage exten
     LOG.info( hardLink.linkStats.report() );
   }
 
-  private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
-      NamespaceInfo nsInfo) throws IOException {
-    assert um != null : "DataNode.upgradeManager is null.";
-    um.setUpgradeState(false, getLayoutVersion());
-    um.initializeUpgrade(nsInfo);
-  }
-
   /**
    * gets the data node storage directory based on block pool storage
    * 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Oct 19 02:25:55 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,12 +33,14 @@ import java.util.LinkedList;
 import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -77,9 +78,10 @@ class BlockReceiver implements Closeable
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
   private int checksumSize;
-  private ByteBuffer buf; // contains one full packet.
-  private int bufRead; //amount of valid data in the buf
-  private int maxPacketReadLen;
+  
+  private PacketReceiver packetReceiver =
+      new PacketReceiver(false);
+  
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -246,7 +248,12 @@ class BlockReceiver implements Closeable
   /**
    * close files.
    */
+  @Override
   public void close() throws IOException {
+    if (packetReceiver != null) {
+      packetReceiver.close();
+    }
+    
     IOException ioe = null;
     if (syncOnClose && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
@@ -364,33 +371,24 @@ class BlockReceiver implements Closeable
   /**
    * Verify multiple CRC chunks. 
    */
-  private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
-                             byte[] checksumBuf, int checksumOff ) 
-                             throws IOException {
-    while (len > 0) {
-      int chunkLen = Math.min(len, bytesPerChecksum);
-      
-      clientChecksum.update(dataBuf, dataOff, chunkLen);
-
-      if (!clientChecksum.compare(checksumBuf, checksumOff)) {
-        if (srcDataNode != null) {
-          try {
-            LOG.info("report corrupt block " + block + " from datanode " +
-                      srcDataNode + " to namenode");
-            datanode.reportRemoteBadBlock(srcDataNode, block);
-          } catch (IOException e) {
-            LOG.warn("Failed to report bad block " + block + 
-                      " from datanode " + srcDataNode + " to namenode");
-          }
+  private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
+      throws IOException {
+    try {
+      clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
+    } catch (ChecksumException ce) {
+      LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
+      if (srcDataNode != null) {
+        try {
+          LOG.info("report corrupt block " + block + " from datanode " +
+                    srcDataNode + " to namenode");
+          datanode.reportRemoteBadBlock(srcDataNode, block);
+        } catch (IOException e) {
+          LOG.warn("Failed to report bad block " + block + 
+                    " from datanode " + srcDataNode + " to namenode");
         }
-        throw new IOException("Unexpected checksum mismatch " + 
-                              "while writing " + block + " from " + inAddr);
       }
-
-      clientChecksum.reset();
-      dataOff += chunkLen;
-      checksumOff += checksumSize;
-      len -= chunkLen;
+      throw new IOException("Unexpected checksum mismatch " + 
+                            "while writing " + block + " from " + inAddr);
     }
   }
   
@@ -402,163 +400,24 @@ class BlockReceiver implements Closeable
    * This does not verify the original checksums, under the assumption
    * that they have already been validated.
    */
-  private void translateChunks( byte[] dataBuf, int dataOff, int len,
-      byte[] checksumBuf, int checksumOff ) {
-    if (len == 0) return;
-    
-    int numChunks = (len - 1)/bytesPerChecksum + 1;
-    
-    diskChecksum.calculateChunkedSums(
-        ByteBuffer.wrap(dataBuf, dataOff, len),
-        ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
+  private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
+    diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
   }
 
-  /**
-   * Makes sure buf.position() is zero without modifying buf.remaining().
-   * It moves the data if position needs to be changed.
-   */
-  private void shiftBufData() {
-    if (bufRead != buf.limit()) {
-      throw new IllegalStateException("bufRead should be same as " +
-                                      "buf.limit()");
-    }
-    
-    //shift the remaining data on buf to the front
-    if (buf.position() > 0) {
-      int dataLeft = buf.remaining();
-      if (dataLeft > 0) {
-        byte[] b = buf.array();
-        System.arraycopy(b, buf.position(), b, 0, dataLeft);
-      }
-      buf.position(0);
-      bufRead = dataLeft;
-      buf.limit(bufRead);
-    }
-  }
-  
-  /**
-   * reads upto toRead byte to buf at buf.limit() and increments the limit.
-   * throws an IOException if read does not succeed.
-   */
-  private int readToBuf(int toRead) throws IOException {
-    if (toRead < 0) {
-      toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
-               - buf.limit();
-    }
-    
-    int nRead = in.read(buf.array(), buf.limit(), toRead);
-    
-    if (nRead < 0) {
-      throw new EOFException("while trying to read " + toRead + " bytes");
-    }
-    bufRead = buf.limit() + nRead;
-    buf.limit(bufRead);
-    return nRead;
-  }
-  
-  
-  /**
-   * Reads (at least) one packet and returns the packet length.
-   * buf.position() points to the start of the packet and 
-   * buf.limit() point to the end of the packet. There could 
-   * be more data from next packet in buf.<br><br>
-   * 
-   * It tries to read a full packet with single read call.
-   * Consecutive packets are usually of the same length.
-   */
-  private void readNextPacket() throws IOException {
-    /* This dances around buf a little bit, mainly to read 
-     * full packet with single read and to accept arbitrary size  
-     * for next packet at the same time.
-     */
-    if (buf == null) {
-      /* initialize buffer to the best guess size:
-       * 'chunksPerPacket' calculation here should match the same 
-       * calculation in DFSClient to make the guess accurate.
-       */
-      int chunkSize = bytesPerChecksum + checksumSize;
-      int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
-                             + chunkSize - 1)/chunkSize;
-      buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
-                                Math.max(chunksPerPacket, 1) * chunkSize);
-      buf.limit(0);
-    }
-    
-    // See if there is data left in the buffer :
-    if (bufRead > buf.limit()) {
-      buf.limit(bufRead);
-    }
-    
-    while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
-      if (buf.position() > 0) {
-        shiftBufData();
-      }
-      readToBuf(-1);
-    }
-    
-    /* We mostly have the full packet or at least enough for an int
-     */
-    buf.mark();
-    int payloadLen = buf.getInt();
-    buf.reset();
-    
-    // check corrupt values for pktLen, 100MB upper limit should be ok?
-    if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
-      throw new IOException("Incorrect value for packet payload : " +
-                            payloadLen);
-    }
-    
-    // Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
-    // we read above.
-    int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
-        - HdfsConstants.BYTES_IN_INTEGER;
-    
-    if (buf.remaining() < pktSize) {
-      //we need to read more data
-      int toRead = pktSize - buf.remaining();
-      
-      // first make sure buf has enough space.        
-      int spaceLeft = buf.capacity() - buf.limit();
-      if (toRead > spaceLeft && buf.position() > 0) {
-        shiftBufData();
-        spaceLeft = buf.capacity() - buf.limit();
-      }
-      if (toRead > spaceLeft) {
-        byte oldBuf[] = buf.array();
-        int toCopy = buf.limit();
-        buf = ByteBuffer.allocate(toCopy + toRead);
-        System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
-        buf.limit(toCopy);
-      }
-      
-      //now read:
-      while (toRead > 0) {
-        toRead -= readToBuf(toRead);
-      }
-    }
-    
-    if (buf.remaining() > pktSize) {
-      buf.limit(buf.position() + pktSize);
-    }
-    
-    if (pktSize > maxPacketReadLen) {
-      maxPacketReadLen = pktSize;
-    }
-  }
-  
+
   /** 
    * Receives and processes a packet. It can contain many chunks.
    * returns the number of data bytes that the packet has.
    */
   private int receivePacket() throws IOException {
     // read the next packet
-    readNextPacket();
+    packetReceiver.receiveNextPacket(in);
 
-    buf.mark();
-    PacketHeader header = new PacketHeader();
-    header.readFields(buf);
-    int endOfHeader = buf.position();
-    buf.reset();
+    PacketHeader header = packetReceiver.getHeader();
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Receiving one packet for block " + block +
+                ": " + header);
+    }
 
     // Sanity check the header
     if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@@ -573,38 +432,12 @@ class BlockReceiver implements Closeable
                             header.getDataLen()); 
     }
 
-    return receivePacket(
-      header.getOffsetInBlock(),
-      header.getSeqno(),
-      header.isLastPacketInBlock(),
-      header.getDataLen(),
-      header.getSyncBlock(),
-      endOfHeader);
-  }
+    long offsetInBlock = header.getOffsetInBlock();
+    long seqno = header.getSeqno();
+    boolean lastPacketInBlock = header.isLastPacketInBlock();
+    int len = header.getDataLen();
+    boolean syncBlock = header.getSyncBlock();
 
-  /**
-   * Write the received packet to disk (data only)
-   */
-  private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, 
-      int numBytesToDisk) throws IOException {
-    out.write(pktBuf, startByteToDisk, numBytesToDisk);
-  }
-  
-  /** 
-   * Receives and processes a packet. It can contain many chunks.
-   * returns the number of data bytes that the packet has.
-   */
-  private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, boolean syncBlock,
-      int endOfHeader) throws IOException {
-    if (LOG.isDebugEnabled()){
-      LOG.debug("Receiving one packet for block " + block +
-                " of length " + len +
-                " seqno " + seqno +
-                " offsetInBlock " + offsetInBlock +
-                " syncBlock " + syncBlock +
-                " lastPacketInBlock " + lastPacketInBlock);
-    }
     // make sure the block gets sync'ed upon close
     this.syncOnClose |= syncBlock && lastPacketInBlock;
 
@@ -624,14 +457,15 @@ class BlockReceiver implements Closeable
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
-        mirrorOut.write(buf.array(), buf.position(), buf.remaining());
+        packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
       } catch (IOException e) {
         handleMirrorOutError(e);
       }
     }
     
-    buf.position(endOfHeader);        
+    ByteBuffer dataBuf = packetReceiver.getDataSlice();
+    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
     
     if (lastPacketInBlock || len == 0) {
       if(LOG.isDebugEnabled()) {
@@ -645,18 +479,11 @@ class BlockReceiver implements Closeable
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
-      if ( buf.remaining() != (checksumLen + len)) {
-        throw new IOException("Data remaining in packet does not match" +
-                              "sum of checksumLen and dataLen " +
-                              " size remaining: " + buf.remaining() +
-                              " data len: " + len +
-                              " checksum Len: " + checksumLen);
-      }
-      int checksumOff = buf.position();
-      int dataOff = checksumOff + checksumLen;
-      byte pktBuf[] = buf.array();
-
-      buf.position(buf.limit()); // move to the end of the data.
+      if ( checksumBuf.capacity() != checksumLen) {
+        throw new IOException("Length of checksums in packet " +
+            checksumBuf.capacity() + " does not match calculated checksum " +
+            "length " + checksumLen);
+     }
 
       /* skip verifying checksum iff this is not the last one in the 
        * pipeline and clientName is non-null. i.e. Checksum is verified
@@ -666,11 +493,11 @@ class BlockReceiver implements Closeable
        * checksum.
        */
       if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
-        verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+        verifyChunks(dataBuf, checksumBuf);
         if (needsChecksumTranslation) {
           // overwrite the checksums in the packet buffer with the
           // appropriate polynomial for the disk storage.
-          translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
+          translateChunks(dataBuf, checksumBuf);
         }
       }
       
@@ -699,9 +526,13 @@ class BlockReceiver implements Closeable
             computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
           }
 
-          int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
+          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
+              + dataBuf.arrayOffset() + dataBuf.position();
+
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
-          writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
+          
+          // Write data to disk.
+          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -713,7 +544,7 @@ class BlockReceiver implements Closeable
                                     " len = " + len + 
                                     " bytesPerChecksum " + bytesPerChecksum);
             }
-            partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
+            partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
             lastChunkChecksum = Arrays.copyOfRange(
               buf, buf.length - checksumSize, buf.length
@@ -725,11 +556,12 @@ class BlockReceiver implements Closeable
             partialCrc = null;
           } else {
             lastChunkChecksum = Arrays.copyOfRange(
-              pktBuf, 
-              checksumOff + checksumLen - checksumSize, 
-              checksumOff + checksumLen
-            );
-            checksumOut.write(pktBuf, checksumOff, checksumLen);
+                checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
+                checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
+            checksumOut.write(checksumBuf.array(),
+                checksumBuf.arrayOffset() + checksumBuf.position(),
+                checksumLen);
           }
           /// flush entire packet, sync unless close() will sync
           flushOrSync(syncBlock && !lastPacketInBlock);
@@ -1033,6 +865,7 @@ class BlockReceiver implements Closeable
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
      */
+    @Override
     public void run() {
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Oct 19 02:25:55 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -38,10 +37,9 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
@@ -63,40 +61,29 @@ import org.apache.hadoop.util.DataChecks
  * </pre>   
  * An empty packet is sent to mark the end of block and read completion.
  * 
- *  PACKET Contains a packet header, checksum and data. Amount of data
- *  carried is set by BUFFER_SIZE.
- *  <pre>
- *    +-----------------------------------------------------+
- *    | 4 byte packet length (excluding packet header)      |
- *    +-----------------------------------------------------+
- *    | 8 byte offset in the block | 8 byte sequence number |
- *    +-----------------------------------------------------+
- *    | 1 byte isLastPacketInBlock                          |
- *    +-----------------------------------------------------+
- *    | 4 byte Length of actual data                        |
- *    +-----------------------------------------------------+
- *    | x byte checksum data. x is defined below            |
- *    +-----------------------------------------------------+
- *    | actual data ......                                  |
- *    +-----------------------------------------------------+
- *    
- *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
- *    A checksum is calculated for each chunk.
- *    
- *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- *        CHECKSUM_SIZE
- *        
- *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
- *    </pre>
+ * PACKET Contains a packet header, checksum and data. Amount of data
+ * carried is set by BUFFER_SIZE.
+ * <pre>
+ *   +-----------------------------------------------------+
+ *   | Variable length header. See {@link PacketHeader}    |
+ *   +-----------------------------------------------------+
+ *   | x byte checksum data. x is defined below            |
+ *   +-----------------------------------------------------+
+ *   | actual data ......                                  |
+ *   +-----------------------------------------------------+
+ * 
+ *   Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *   A checksum is calculated for each chunk.
+ *  
+ *   x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *       CHECKSUM_SIZE
+ *  
+ *   CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *  </pre>
  *  
  *  The client reads data until it receives a packet with 
  *  "LastPacketInBlock" set to true or with a zero length. If there is 
- *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: 
- *  <pre>
- *    +------------------------------+
- *    | 2 byte OP_STATUS_CHECKSUM_OK |
- *    +------------------------------+
- *  </pre>
+ *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
  */
 class BlockSender implements java.io.Closeable {
   static final Log LOG = DataNode.LOG;
@@ -162,8 +149,6 @@ class BlockSender implements java.io.Clo
    */
   private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
   
-  private static ReadaheadPool readaheadPool =
-    ReadaheadPool.getInstance();
 
   /**
    * Constructor
@@ -250,8 +235,7 @@ class BlockSender implements java.io.Clo
       } else {
         LOG.warn("Could not find metadata file for " + block);
         // This only decides the buffer size. Use BUFFER_SIZE?
-        csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
-            16 * 1024);
+        csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 16 * 1024);
       }
 
       /*
@@ -330,6 +314,7 @@ class BlockSender implements java.io.Clo
   /**
    * close opened files.
    */
+  @Override
   public void close() throws IOException {
     if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
       // drop the last few MB of the file from cache
@@ -450,8 +435,22 @@ class BlockSender implements java.io.Clo
     int packetLen = dataLen + checksumDataLen + 4;
     boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
 
-    writePacketHeader(pkt, dataLen, packetLen);
-
+    // The packet buffer is organized as follows:
+    // _______HHHHCCCCD?D?D?D?
+    //        ^   ^
+    //        |   \ checksumOff
+    //        \ headerOff
+    // _ padding, since the header is variable-length
+    // H = header and length prefixes
+    // C = checksums
+    // D? = data, if transferTo is false.
+    
+    int headerLen = writePacketHeader(pkt, dataLen, packetLen);
+    
+    // Per above, the header doesn't start at the beginning of the
+    // buffer
+    int headerOff = pkt.position() - headerLen;
+    
     int checksumOff = pkt.position();
     byte[] buf = pkt.array();
     
@@ -481,17 +480,21 @@ class BlockSender implements java.io.Clo
     try {
       if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        sockOut.write(buf, 0, dataOff); // First write checksum
+        // First write header and checksums
+        sockOut.write(buf, headerOff, dataOff - headerOff);
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+        LongWritable waitTime = new LongWritable();
+        LongWritable transferTime = new LongWritable();
         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
-            datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
-            datanode.metrics.getSendDataPacketTransferNanos());
+            waitTime, transferTime);
+        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
+        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;
-      } else { 
+      } else {
         // normal transfer
-        out.write(buf, 0, dataOff + dataLen);
+        out.write(buf, headerOff, dataOff + dataLen - headerOff);
       }
     } catch (IOException e) {
       if (e instanceof SocketTimeoutException) {
@@ -624,7 +627,7 @@ class BlockSender implements java.io.Clo
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
       int maxChunksPerPacket;
-      int pktSize = PacketHeader.PKT_HEADER_LEN;
+      int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
           && baseStream instanceof SocketOutputStream
           && blockIn instanceof FileInputStream;
@@ -635,15 +638,15 @@ class BlockSender implements java.io.Clo
         maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
         // Smaller packet size to only hold checksum when doing transferTo
-        pktSize += checksumSize * maxChunksPerPacket;
+        pktBufSize += checksumSize * maxChunksPerPacket;
       } else {
         maxChunksPerPacket = Math.max(1,
             numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
         // Packet size includes both checksum and data
-        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
+        pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
-      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
 
       while (endOffset > offset) {
         manageOsCache();
@@ -686,8 +689,8 @@ class BlockSender implements java.io.Clo
     }
 
     // Perform readahead if necessary
-    if (readaheadLength > 0 && readaheadPool != null) {
-      curReadahead = readaheadPool.readaheadStream(
+    if (readaheadLength > 0 && datanode.readaheadPool != null) {
+      curReadahead = datanode.readaheadPool.readaheadStream(
           clientTraceFmt, blockInFd,
           offset, readaheadLength, Long.MAX_VALUE,
           curReadahead);
@@ -713,14 +716,19 @@ class BlockSender implements java.io.Clo
   }
 
   /**
-   * Write packet header into {@code pkt}
+   * Write packet header into {@code pkt},
+   * return the length of the header written.
    */
-  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+  private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
     // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
         (dataLen == 0), dataLen, false);
+    
+    int size = header.getSerializedSize();
+    pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
     header.putInBuffer(pkt);
+    return size;
   }
   
   boolean didSendEntireByteRange() {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Oct 19 02:25:55 2012
@@ -33,7 +33,9 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT;
-
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -52,7 +54,8 @@ class DNConf {
   final boolean syncBehindWrites;
   final boolean dropCacheBehindReads;
   final boolean syncOnClose;
-  
+  final boolean encryptDataTransfer;
+  final boolean connectToDnViaHostname;
 
   final long readaheadLength;
   final long heartBeatInterval;
@@ -62,6 +65,7 @@ class DNConf {
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
+  final String encryptionAlgorithm;
 
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -93,7 +97,9 @@ class DNConf {
     dropCacheBehindReads = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
-    
+    connectToDnViaHostname = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
     DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
     
@@ -117,6 +123,10 @@ class DNConf {
 
     this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
         DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
+    
+    this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
+        DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * DataBlockScanner manages block scanning for all the block pools. For each
  * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
@@ -47,6 +49,8 @@ public class DataBlockScanner implements
   private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
   private final Configuration conf;
   
+  static final int SLEEP_PERIOD_MS = 5 * 1000;
+
   /**
    * Map to find the BlockPoolScanner for a given block pool id. This is updated
    * when a BPOfferService becomes alive or dies.
@@ -63,14 +67,15 @@ public class DataBlockScanner implements
     this.conf = conf;
   }
   
+  @Override
   public void run() {
     String currentBpId = "";
     boolean firstRun = true;
     while (datanode.shouldRun && !Thread.interrupted()) {
-      //Sleep everytime except in the first interation.
+      //Sleep everytime except in the first iteration.
       if (!firstRun) {
         try {
-          Thread.sleep(5000);
+          Thread.sleep(SLEEP_PERIOD_MS);
         } catch (InterruptedException ex) {
           // Interrupt itself again to set the interrupt status
           blockScannerThread.interrupt();
@@ -98,16 +103,11 @@ public class DataBlockScanner implements
   }
 
   // Wait for at least one block pool to be up
-  private void waitForInit(String bpid) {
-    UpgradeManagerDatanode um = null;
-    if(bpid != null && !bpid.equals(""))
-      um = datanode.getUpgradeManagerDatanode(bpid);
-    
-    while ((um != null && ! um.isUpgradeCompleted())
-        || (getBlockPoolSetSize() < datanode.getAllBpOs().length)
+  private void waitForInit() {
+    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
         || (getBlockPoolSetSize() < 1)) {
       try {
-        Thread.sleep(5000);
+        Thread.sleep(SLEEP_PERIOD_MS);
       } catch (InterruptedException e) {
         blockScannerThread.interrupt();
         return;
@@ -128,7 +128,7 @@ public class DataBlockScanner implements
     String nextBpId = null;
     while ((nextBpId == null) && datanode.shouldRun
         && !blockScannerThread.isInterrupted()) {
-      waitForInit(currentBpId);
+      waitForInit();
       synchronized (this) {
         if (getBlockPoolSetSize() > 0) {          
           // Find nextBpId by the minimum of the last scan time
@@ -172,7 +172,8 @@ public class DataBlockScanner implements
     return blockPoolScannerMap.size();
   }
   
-  private synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
+  @VisibleForTesting
+  synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
     return blockPoolScannerMap.get(bpid);
   }
   
@@ -253,7 +254,7 @@ public class DataBlockScanner implements
     LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
   }
   
-  // This method is used for testing
+  @VisibleForTesting
   long getBlocksScannedInLastRun(String bpid) throws IOException {
     BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
     if (bpScanner == null) {
@@ -263,6 +264,16 @@ public class DataBlockScanner implements
     }
   }
 
+  @VisibleForTesting
+  long getTotalScans(String bpid) throws IOException {
+    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
+    if (bpScanner == null) {
+      throw new IOException("Block Pool: "+bpid+" is not running");
+    } else {
+      return bpScanner.getTotalScans();
+    }
+  }
+
   public void start() {
     blockScannerThread = new Thread(this);
     blockScannerThread.setDaemon(true);
@@ -273,6 +284,7 @@ public class DataBlockScanner implements
   public static class Servlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
+    @Override
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response) throws IOException {
       response.setContentType("text/plain");

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 19 02:25:55 2012
@@ -46,6 +46,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -53,7 +54,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -93,11 +96,14 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
@@ -121,9 +127,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
-
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -141,6 +144,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -163,12 +167,13 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
 /**********************************************************
@@ -225,6 +230,8 @@ public class DataNode extends Configured
         
   static final Log ClientTraceLog =
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
+  
+  private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
 
   /**
    * Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -244,7 +251,7 @@ public class DataNode extends Configured
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
-  private boolean heartbeatsDisabledForTests = false;
+  private volatile boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
   DataNodeMetrics metrics;
@@ -270,7 +277,10 @@ public class DataNode extends Configured
   private AbstractList<File> dataDirs;
   private Configuration conf;
 
-  private final String userWithLocalPathAccess;
+  private final List<String> usersWithLocalPathAccess;
+  private boolean connectToDnViaHostname;
+  ReadaheadPool readaheadPool;
+  private final boolean getHdfsBlockLocationsEnabled;
 
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
@@ -290,8 +300,14 @@ public class DataNode extends Configured
            final SecureResources resources) throws IOException {
     super(conf);
 
-    this.userWithLocalPathAccess = conf
-        .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+    this.usersWithLocalPathAccess = Arrays.asList(
+        conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
+    this.connectToDnViaHostname = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.getHdfsBlockLocationsEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
@@ -401,10 +417,15 @@ public class DataNode extends Configured
           new ClientDatanodeProtocolServerSideTranslatorPB(this);
     BlockingService service = ClientDatanodeProtocolService
         .newReflectiveBlockingService(clientDatanodeProtocolXlator);
-    ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
-        .getHostName(), ipcAddr.getPort(), conf.getInt(
-        DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
-        false, conf, blockPoolTokenSecretManager);
+    ipcServer = new RPC.Builder(conf)
+        .setProtocol(ClientDatanodeProtocolPB.class)
+        .setInstance(service)
+        .setBindAddress(ipcAddr.getHostName())
+        .setPort(ipcAddr.getPort())
+        .setNumHandlers(
+            conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+                DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
+        .setSecretManager(blockPoolTokenSecretManager).build();
     
     InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
         new InterDatanodeProtocolServerSideTranslatorPB(this);
@@ -486,7 +507,7 @@ public class DataNode extends Configured
       reason = "verifcation is not supported by SimulatedFSDataset";
     } 
     if (reason == null) {
-      directoryScanner = new DirectoryScanner(this, data, conf);
+      directoryScanner = new DirectoryScanner(data, conf);
       directoryScanner.start();
     } else {
       LOG.info("Periodic Directory Tree Verification scan is disabled because " +
@@ -665,6 +686,10 @@ public class DataNode extends Configured
 
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
+
+    // Create the ReadaheadPool from the DataNode context so we can
+    // exit without having to explicitly shutdown its thread pool.
+    readaheadPool = ReadaheadPool.getInstance();
   }
   
   /**
@@ -732,8 +757,6 @@ public class DataNode extends Configured
             + " tokens, or none may be.");
       }
     }
-    // TODO should we check that all federated nns are either enabled or
-    // disabled?
     if (!isBlockTokenEnabled) return;
     
     if (!blockPoolTokenSecretManager.isBlockPoolRegistered(blockPoolId)) {
@@ -745,7 +768,8 @@ public class DataNode extends Configured
           + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
           + " min(s)");
       final BlockTokenSecretManager secretMgr = 
-        new BlockTokenSecretManager(0, blockTokenLifetime);
+          new BlockTokenSecretManager(0, blockTokenLifetime, blockPoolId,
+              dnConf.encryptionAlgorithm);
       blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
     }
   }
@@ -869,7 +893,7 @@ public class DataNode extends Configured
   /**
    * NB: The datanode can perform data transfer on the streaming
    * address however clients are given the IPC IP address for data
-   * transfer, and that may be be a different address.
+   * transfer, and that may be a different address.
    * 
    * @return socket address for data transfer
    */
@@ -916,17 +940,18 @@ public class DataNode extends Configured
   }
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
-      DatanodeID datanodeid, final Configuration conf, final int socketTimeout)
-    throws IOException {
-    final InetSocketAddress addr =
-      NetUtils.createSocketAddr(datanodeid.getIpcAddr());
-    if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-      InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
+      DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
+      final boolean connectToDnViaHostname) throws IOException {
+    final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+    final InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
     }
     final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
     try {
       return loginUgi
           .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
+            @Override
             public InterDatanodeProtocol run() throws IOException {
               return new InterDatanodeProtocolTranslatorPB(addr, loginUgi,
                   conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@@ -968,7 +993,7 @@ public class DataNode extends Configured
     
     int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
     return "DS-" + rand + "-" + ip + "-" + port + "-"
-        + System.currentTimeMillis();
+        + Time.now();
   }
   
   /** Ensure the authentication method is kerberos */
@@ -987,7 +1012,7 @@ public class DataNode extends Configured
   private void checkBlockLocalPathAccess() throws IOException {
     checkKerberosAuthMethod("getBlockLocalPathInfo()");
     String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+    if (!usersWithLocalPathAccess.contains(currentUser)) {
       throw new AccessControlException(
           "Can't continue with getBlockLocalPathInfo() "
               + "authorization. The user " + currentUser
@@ -1018,6 +1043,25 @@ public class DataNode extends Configured
     metrics.incrBlocksGetLocalPathInfo();
     return info;
   }
+
+  @Override
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException, 
+      UnsupportedOperationException {
+    if (!getHdfsBlockLocationsEnabled) {
+      throw new UnsupportedOperationException("Datanode#getHdfsBlocksMetadata "
+          + " is not enabled in datanode config");
+    }
+    if (blocks.size() != tokens.size()) {
+      throw new IOException("Differing number of blocks and tokens");
+    }
+    // Check access for each block
+    for (int i = 0; i < blocks.size(); i++) {
+      checkBlockToken(blocks.get(i), tokens.get(i), 
+          BlockTokenSecretManager.AccessMode.READ);
+    }
+    return data.getHdfsBlocksMetadata(blocks);
+  }
   
   private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
       AccessMode accessMode) throws IOException {
@@ -1051,6 +1095,7 @@ public class DataNode extends Configured
       }
     }
     
+    this.shouldRun = false;
     shutdownPeriodicScanners();
     
     if (infoServer != null) {
@@ -1064,7 +1109,6 @@ public class DataNode extends Configured
       ipcServer.stop();
     }
     
-    this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
@@ -1179,17 +1223,8 @@ public class DataNode extends Configured
     return xmitsInProgress.get();
   }
     
-  UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
-    BPOfferService bpos = blockPoolManager.get(bpid);
-    if(bpos==null) {
-      return null;
-    }
-    return bpos.getUpgradeManager();
-  }
-
-  private void transferBlock( ExtendedBlock block, 
-                              DatanodeInfo xferTargets[] 
-                              ) throws IOException {
+  private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
+      throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
@@ -1366,6 +1401,7 @@ public class DataNode extends Configured
     /**
      * Do the deed, write the bytes
      */
+    @Override
     public void run() {
       xmitsInProgress.getAndIncrement();
       Socket sock = null;
@@ -1375,17 +1411,32 @@ public class DataNode extends Configured
       final boolean isClient = clientname.length() > 0;
       
       try {
-        InetSocketAddress curTarget = 
-          NetUtils.createSocketAddr(targets[0].getXferAddr());
+        final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
+        InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to datanode " + dnAddr);
+        }
         sock = newSocket();
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
-        OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
-        out = new DataOutputStream(new BufferedOutputStream(baseStream,
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dnConf.encryptDataTransfer) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn,
+                  blockPoolTokenSecretManager.generateDataEncryptionKey(
+                      b.getBlockPoolId()));
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
@@ -1403,7 +1454,7 @@ public class DataNode extends Configured
             stage, 0, 0, 0, 0, blockSender.getChecksum());
 
         // send data & checksum
-        blockSender.sendBlock(out, baseStream, null);
+        blockSender.sendBlock(out, unbufOut, null);
 
         // no response necessary
         LOG.info(getClass().getSimpleName() + ": Transmitted " + b
@@ -1411,7 +1462,6 @@ public class DataNode extends Configured
 
         // read ack
         if (isClient) {
-          in = new DataInputStream(NetUtils.getInputStream(sock));
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
               HdfsProtoUtil.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
@@ -1512,7 +1562,7 @@ public class DataNode extends Configured
     }
     
     if (!parseArguments(args, conf)) {
-      printUsage();
+      printUsage(System.err);
       return null;
     }
     Collection<URI> dataDirs = getStorageDirs(conf);
@@ -1626,9 +1676,8 @@ public class DataNode extends Configured
         + xmitsInProgress.get() + "}";
   }
 
-  private static void printUsage() {
-    System.err.println("Usage: java DataNode");
-    System.err.println("           [-rollback]");
+  private static void printUsage(PrintStream out) {
+    out.println(USAGE + "\n");
   }
 
   /**
@@ -1701,7 +1750,7 @@ public class DataNode extends Configured
         datanode.join();
     } catch (Throwable e) {
       LOG.fatal("Exception in secureMain", e);
-      terminate(1);
+      terminate(1, e);
     } finally {
       // We need to terminate the process here because either shutdown was called
       // or some disk related conditions like volumes tolerated or volumes required
@@ -1713,6 +1762,10 @@ public class DataNode extends Configured
   }
   
   public static void main(String args[]) {
+    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
+      System.exit(0);
+    }
+
     secureMain(args, null);
   }
 
@@ -1722,6 +1775,7 @@ public class DataNode extends Configured
     
     Daemon d = new Daemon(threadGroup, new Runnable() {
       /** Recover a list of blocks. It is run by the primary datanode. */
+      @Override
       public void run() {
         for(RecoveringBlock b : blocks) {
           try {
@@ -1808,8 +1862,7 @@ public class DataNode extends Configured
   private void recoverBlock(RecoveringBlock rBlock) throws IOException {
     ExtendedBlock block = rBlock.getBlock();
     String blookPoolId = block.getBlockPoolId();
-    DatanodeInfo[] targets = rBlock.getLocations();
-    DatanodeID[] datanodeids = (DatanodeID[])targets;
+    DatanodeID[] datanodeids = rBlock.getLocations();
     List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
     int errorCount = 0;
 
@@ -1820,7 +1873,7 @@ public class DataNode extends Configured
         DatanodeRegistration bpReg = bpos.bpRegistration;
         InterDatanodeProtocol datanode = bpReg.equals(id)?
             this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
-                dnConf.socketTimeout);
+                dnConf.socketTimeout, dnConf.connectToDnViaHostname);
         ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
         if (info != null &&
             info.getGenerationStamp() >= block.getGenerationStamp() &&

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Oct 19 02:25:55 2012
@@ -396,10 +396,6 @@ public class DataStorage extends Storage
     if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION 
         && this.cTime == nsInfo.getCTime())
       return; // regular startup
-    // verify necessity of a distributed upgrade
-    UpgradeManagerDatanode um = 
-      datanode.getUpgradeManagerDatanode(nsInfo.getBlockPoolID());
-    verifyDistributedUpgradeProgress(um, nsInfo);
     
     // do upgrade
     if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
@@ -455,6 +451,8 @@ public class DataStorage extends Storage
     
     File curDir = sd.getCurrentDir();
     File prevDir = sd.getPreviousDir();
+    File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
+
     assert curDir.exists() : "Data node current directory must exist.";
     // Cleanup directory "detach"
     cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
@@ -475,7 +473,7 @@ public class DataStorage extends Storage
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), 
         nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(curDir, nsInfo);
-    linkAllBlocks(tmpDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+    linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
     
     // 4. Write version file under <SD>/current
     layoutVersion = HdfsConstants.LAYOUT_VERSION;
@@ -582,19 +580,27 @@ public class DataStorage extends Storage
              + "; cur CTime = " + this.getCTime());
     assert sd.getCurrentDir().exists() : "Current directory must exist.";
     final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
+    final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
     // 1. rename previous to finalized.tmp
     rename(prevDir, tmpDir);
 
     // 2. delete finalized.tmp dir in a separate thread
+    // Also delete the blocksBeingWritten from HDFS 1.x and earlier, if
+    // it exists.
     new Daemon(new Runnable() {
+        @Override
         public void run() {
           try {
             deleteDir(tmpDir);
+            if (bbwDir.exists()) {
+              deleteDir(bbwDir);
+            }
           } catch(IOException ex) {
             LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
           }
           LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
         }
+        @Override
         public String toString() { return "Finalize " + dataDirPath; }
       }).start();
   }
@@ -622,11 +628,16 @@ public class DataStorage extends Storage
 
   /**
    * Hardlink all finalized and RBW blocks in fromDir to toDir
-   * @param fromDir directory where the snapshot is stored
-   * @param toDir the current data directory
-   * @throws IOException if error occurs during hardlink
+   *
+   * @param fromDir      The directory where the 'from' snapshot is stored
+   * @param fromBbwDir   In HDFS 1.x, the directory where blocks
+   *                     that are under construction are stored.
+   * @param toDir        The current data directory
+   *
+   * @throws IOException If error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+  private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
+      throws IOException {
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
@@ -634,13 +645,23 @@ public class DataStorage extends Storage
       // hardlink finalized blocks in tmpDir/finalized
       linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
-      // hardlink rbw blocks in tmpDir/finalized
+      // hardlink rbw blocks in tmpDir/rbw
       linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
           new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     } else { // pre-RBW version
       // hardlink finalized blocks in tmpDir
       linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), 
           diskLayoutVersion, hardLink);      
+      if (fromBbwDir.exists()) {
+        /*
+         * We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
+         * directory.  It's a little messy, because the blocksBeingWriten was
+         * NOT underneath the 'current' directory in those releases.  See
+         * HDFS-3731 for details.
+         */
+        linkBlocks(fromBbwDir,
+            new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
+      }
     } 
     LOG.info( hardLink.linkStats.report() );
   }
@@ -677,6 +698,7 @@ public class DataStorage extends Storage
       throw new IOException("Cannot create directory " + to);
     
     String[] blockNames = from.list(new java.io.FilenameFilter() {
+      @Override
       public boolean accept(File dir, String name) {
         return name.startsWith(BLOCK_FILE_PREFIX);
       }
@@ -694,6 +716,7 @@ public class DataStorage extends Storage
     
     // Now take care of the rest of the files and subdirectories
     String[] otherNames = from.list(new java.io.FilenameFilter() {
+        @Override
         public boolean accept(File dir, String name) {
           return name.startsWith(BLOCK_SUBDIR_PREFIX) 
             || name.startsWith(COPY_FILE_PREFIX);
@@ -704,14 +727,6 @@ public class DataStorage extends Storage
           new File(to, otherNames[i]), oldLV, hl);
   }
 
-  private void verifyDistributedUpgradeProgress(UpgradeManagerDatanode um,
-                  NamespaceInfo nsInfo
-                ) throws IOException {
-    assert um != null : "DataNode.upgradeManager is null.";
-    um.setUpgradeState(false, getLayoutVersion());
-    um.initializeUpgrade(nsInfo);
-  }
-  
   /**
    * Add bpStorage into bpStorageMap
    */



Mime
View raw message