hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1294028 [2/6] - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/...
Date Mon, 27 Feb 2012 04:54:45 GMT
Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Mon Feb 27 04:54:33 2012
@@ -43,7 +43,7 @@ public class ExportedBlockKeys implement
     this(false, 0, 0, new BlockKey(), new BlockKey[0]);
   }
 
-  ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+  public ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
       long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
     this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.keyUpdateInterval = keyUpdateInterval;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Feb 27 04:54:33 2012
@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -46,6 +48,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -206,14 +209,15 @@ class NameNodeConnector {
     methodNameToPolicyMap.put("getBlocks", methodPolicy);
     methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
 
-    return (NamenodeProtocol) RetryProxy.create(NamenodeProtocol.class,
-        RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID,
-            address,
-            UserGroupInformation.getCurrentUser(),
-            conf,
-            NetUtils.getDefaultSocketFactory(conf)),
-        methodNameToPolicyMap);
+    RPC.setProtocolEngine(conf, NamenodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    NamenodeProtocolPB proxy = RPC.getProxy(NamenodeProtocolPB.class,
+            RPC.getProtocolVersion(NamenodeProtocolPB.class), address,
+            UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getDefaultSocketFactory(conf));
+    NamenodeProtocolPB retryProxy = (NamenodeProtocolPB) RetryProxy.create(
+        NamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+    return new NamenodeProtocolTranslatorPB(retryProxy);
   }
 
   /**

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Mon Feb 27 04:54:33 2012
@@ -112,7 +112,6 @@ public class BlockInfoUnderConstruction 
       return (this == obj) || super.equals(obj);
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
       final StringBuilder b = new StringBuilder(getClass().getSimpleName());
@@ -268,7 +267,6 @@ public class BlockInfoUnderConstruction 
     return (this == obj) || super.equals(obj);
   }
 
-  /** {@inheritDoc} */
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(super.toString());

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Feb 27 04:54:33 2012
@@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -2082,7 +2084,7 @@ public class BlockManager {
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  private void removeStoredBlock(Block block, DatanodeDescriptor node) {
+  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
           + block + " from " + node.getName());
@@ -2201,27 +2203,48 @@ public class BlockManager {
     }
   }
 
-  /** The given node is reporting that it received a certain block. */
-  public void blockReceived(final DatanodeID nodeID, final String poolId,
-      final Block block, final String delHint) throws IOException {
+  /** The given node is reporting that it received/deleted certain blocks. */
+  public void blockReceivedAndDeleted(final DatanodeID nodeID, 
+     final String poolId, 
+     final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+  ) throws IOException {
     namesystem.writeLock();
+    int received = 0;
+    int deleted = 0;
     try {
       final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
       if (node == null || !node.isAlive) {
-        final String s = block + " is received from dead or unregistered node "
-            + nodeID.getName();
-        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
-        throw new IOException(s);
-      } 
-
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
-            + " is received from " + nodeID.getName());
+        NameNode.stateChangeLog
+            .warn("BLOCK* blockReceivedDeleted"
+                + " is received from dead or unregistered node "
+                + nodeID.getName());
+        throw new IOException(
+            "Got blockReceivedDeleted message from unregistered or dead node");
       }
 
-      addBlock(node, block, delHint);
+      for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
+        if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
+          removeStoredBlock(
+              receivedAndDeletedBlocks[i].getBlock(), node);
+          deleted++;
+        } else {
+          addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
+              receivedAndDeletedBlocks[i].getDelHints());
+          received++;
+        }
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("BLOCK* block"
+              + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
+                  : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+              + " is received from " + nodeID.getName());
+        }
+      }
     } finally {
       namesystem.writeUnlock();
+      NameNode.stateChangeLog
+          .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+              + nodeID.getName() + " received: " + received + ", "
+              + " deleted: " + deleted);
     }
   }
 
@@ -2396,6 +2419,7 @@ public class BlockManager {
   }
 
   public void removeBlock(Block block) {
+    block.setNumBytes(BlockCommand.NO_ACK);
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Mon Feb 27 04:54:33 2012
@@ -66,7 +66,7 @@ public class BlockPlacementPolicyDefault
   BlockPlacementPolicyDefault() {
   }
     
-  /** {@inheritDoc} */
+  @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
     this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
@@ -82,7 +82,7 @@ public class BlockPlacementPolicyDefault
     }
   };
 
-  /** {@inheritDoc} */
+  @Override
   public DatanodeDescriptor[] chooseTarget(String srcPath,
                                     int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -92,7 +92,6 @@ public class BlockPlacementPolicyDefault
         null, blocksize);
   }
 
-  /** {@inheritDoc} */
   @Override
   public DatanodeDescriptor[] chooseTarget(String srcPath,
                                     int numOfReplicas,
@@ -528,7 +527,7 @@ public class BlockPlacementPolicyDefault
     return nodes;
   }
 
-  /** {@inheritDoc} */
+  @Override
   public int verifyBlockPlacement(String srcPath,
                                   LocatedBlock lBlk,
                                   int minRacks) {
@@ -547,7 +546,7 @@ public class BlockPlacementPolicyDefault
     return minRacks - racks.size();
   }
 
-  /** {@inheritDoc} */
+  @Override
   public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
                                                  Block block,
                                                  short replicationFactor,

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Mon Feb 27 04:54:33 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
@@ -862,7 +863,7 @@ public class DatanodeManager {
         try {
           nodeinfo = getDatanode(nodeReg);
         } catch(UnregisteredNodeException e) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+          return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
         
         // Check if this datanode should actually be shutdown instead. 
@@ -872,7 +873,7 @@ public class DatanodeManager {
         }
          
         if (nodeinfo == null || !nodeinfo.isAlive) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+          return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 
         heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Feb 27 04:54:33 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -45,11 +46,17 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 
@@ -86,15 +93,17 @@ class BPOfferService implements Runnable
   DatanodeRegistration bpRegistration;
   
   long lastBlockReport = 0;
+  long lastDeletedReport = 0;
 
   boolean resetBlockReportTime = true;
 
   Thread bpThread;
-  DatanodeProtocol bpNamenode;
+  DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private long lastHeartbeat = 0;
   private volatile boolean initialized = false;
-  private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-  private final LinkedList<String> delHints = new LinkedList<String>();
+  private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList 
+  	= new LinkedList<ReceivedDeletedBlockInfo>();
+  private volatile int pendingReceivedRequests = 0;
   private volatile boolean shouldServiceRun = true;
   UpgradeManagerDatanode upgradeManager = null;
   private final DataNode dn;
@@ -160,7 +169,7 @@ class BPOfferService implements Runnable
    * Used to inject a spy NN in the unit tests.
    */
   @VisibleForTesting
-  void setNameNode(DatanodeProtocol dnProtocol) {
+  void setNameNode(DatanodeProtocolClientSideTranslatorPB dnProtocol) {
     bpNamenode = dnProtocol;
   }
 
@@ -220,8 +229,8 @@ class BPOfferService implements Runnable
 
   private void connectToNNAndHandshake() throws IOException {
     // get NN proxy
-    bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-          DatanodeProtocol.versionID, nnAddr, dn.getConf());
+    bpNamenode = new DatanodeProtocolClientSideTranslatorPB(nnAddr,
+        dn.getConf());
 
     // First phase of the handshake with NN - get the namespace
     // info.
@@ -270,39 +279,32 @@ class BPOfferService implements Runnable
    * Report received blocks and delete hints to the Namenode
    * @throws IOException
    */
-  private void reportReceivedBlocks() throws IOException {
-    //check if there are newly received blocks
-    Block [] blockArray=null;
-    String [] delHintArray=null;
-    synchronized(receivedBlockList) {
-      synchronized(delHints){
-        int numBlocks = receivedBlockList.size();
-        if (numBlocks > 0) {
-          if(numBlocks!=delHints.size()) {
-            LOG.warn("Panic: receiveBlockList and delHints are not of " +
-            "the same length" );
-          }
-          //
-          // Send newly-received blockids to namenode
-          //
-          blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-          delHintArray = delHints.toArray(new String[numBlocks]);
-        }
+  private void reportReceivedDeletedBlocks() throws IOException {
+
+    // check if there are newly received blocks
+    ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+    int currentReceivedRequestsCounter;
+    synchronized (receivedAndDeletedBlockList) {
+      currentReceivedRequestsCounter = pendingReceivedRequests;
+      int numBlocks = receivedAndDeletedBlockList.size();
+      if (numBlocks > 0) {
+        //
+        // Send newly-received and deleted blockids to namenode
+        //
+        receivedAndDeletedBlockArray = receivedAndDeletedBlockList
+            .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
       }
     }
-    if (blockArray != null) {
-      if(delHintArray == null || delHintArray.length != blockArray.length ) {
-        LOG.warn("Panic: block array & delHintArray are not the same" );
-      }
-      bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
-          delHintArray);
-      synchronized(receivedBlockList) {
-        synchronized(delHints){
-          for(int i=0; i<blockArray.length; i++) {
-            receivedBlockList.remove(blockArray[i]);
-            delHints.remove(delHintArray[i]);
-          }
+    if (receivedAndDeletedBlockArray != null) {
+      StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
+          bpRegistration.getStorageID(), receivedAndDeletedBlockArray) };
+      bpNamenode.blockReceivedAndDeleted(bpRegistration, getBlockPoolId(),
+          report);
+      synchronized (receivedAndDeletedBlockList) {
+        for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
+          receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
         }
+        pendingReceivedRequests -= currentReceivedRequestsCounter;
       }
     }
   }
@@ -313,26 +315,42 @@ class BPOfferService implements Runnable
    * client? For now we don't.
    */
   void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-    if(block==null || delHint==null) {
-      throw new IllegalArgumentException(
-          block==null?"Block is null":"delHint is null");
+    if (block == null || delHint == null) {
+      throw new IllegalArgumentException(block == null ? "Block is null"
+          : "delHint is null");
     }
-    
+
     if (!block.getBlockPoolId().equals(getBlockPoolId())) {
       LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
           + getBlockPoolId());
       return;
     }
-    
-    synchronized (receivedBlockList) {
-      synchronized (delHints) {
-        receivedBlockList.add(block.getLocalBlock());
-        delHints.add(delHint);
-        receivedBlockList.notifyAll();
-      }
+
+    synchronized (receivedAndDeletedBlockList) {
+      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+          .getLocalBlock(), delHint));
+      pendingReceivedRequests++;
+      receivedAndDeletedBlockList.notifyAll();
     }
   }
 
+  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+    if (block == null) {
+      throw new IllegalArgumentException("Block is null");
+    }
+
+    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
+      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+          + getBlockPoolId());
+      return;
+    }
+
+    synchronized (receivedAndDeletedBlockList) {
+      receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+          .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
+    }
+  }
+  
 
   /**
    * Report the list blocks to the Namenode
@@ -350,8 +368,9 @@ class BPOfferService implements Runnable
 
       // Send block report
       long brSendStartTime = now();
-      cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
-          .getBlockListAsLongs());
+      StorageBlockReport[] report = { new StorageBlockReport(
+          bpRegistration.getStorageID(), bReport.getBlockListAsLongs()) };
+      cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), report);
 
       // Log the block report processing stats from Datanode perspective
       long brSendCost = now() - brSendStartTime;
@@ -383,11 +402,11 @@ class BPOfferService implements Runnable
   
   
   DatanodeCommand [] sendHeartBeat() throws IOException {
-    return bpNamenode.sendHeartbeat(bpRegistration,
-        dn.data.getCapacity(),
-        dn.data.getDfsUsed(),
-        dn.data.getRemaining(),
-        dn.data.getBlockPoolUsed(getBlockPoolId()),
+    // reports number of failed volumes
+    StorageReport[] report = { new StorageReport(bpRegistration.getStorageID(),
+        false, dn.data.getCapacity(), dn.data.getDfsUsed(),
+        dn.data.getRemaining(), dn.data.getBlockPoolUsed(getBlockPoolId())) };
+    return bpNamenode.sendHeartbeat(bpRegistration, report,
         dn.xmitsInProgress.get(),
         dn.getXceiverCount(), dn.data.getNumFailedVolumes());
   }
@@ -433,7 +452,7 @@ class BPOfferService implements Runnable
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
     shouldServiceRun = false;
-    RPC.stopProxy(bpNamenode);
+    IOUtils.cleanup(LOG, bpNamenode);
     dn.shutdownBlockPool(this);
   }
 
@@ -442,7 +461,8 @@ class BPOfferService implements Runnable
    * forever calling remote NameNode functions.
    */
   private void offerService() throws Exception {
-    LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
         + dnConf.blockReportInterval + "msec" + " Initial delay: "
         + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
         + dnConf.heartBeatInterval);
@@ -480,8 +500,11 @@ class BPOfferService implements Runnable
             }
           }
         }
-
-        reportReceivedBlocks();
+        if (pendingReceivedRequests > 0
+            || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
+          reportReceivedDeletedBlocks();
+          lastDeletedReport = startTime;
+        }
 
         DatanodeCommand cmd = blockReport();
         processCommand(cmd);
@@ -497,10 +520,10 @@ class BPOfferService implements Runnable
         //
         long waitTime = dnConf.heartBeatInterval - 
         (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedBlockList) {
-          if (waitTime > 0 && receivedBlockList.size() == 0) {
+        synchronized(receivedAndDeletedBlockList) {
+          if (waitTime > 0 && pendingReceivedRequests == 0) {
             try {
-              receivedBlockList.wait(waitTime);
+            	receivedAndDeletedBlockList.wait(waitTime);
             } catch (InterruptedException ie) {
               LOG.warn("BPOfferService for " + this + " interrupted");
             }
@@ -553,7 +576,8 @@ class BPOfferService implements Runnable
     while (shouldRun()) {
       try {
         // Use returned registration from namenode with updated machine name.
-        bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+        bpRegistration = bpNamenode.registerDatanode(bpRegistration,
+            new DatanodeStorage[0]);
         break;
       } catch(SocketTimeoutException e) {  // namenode is busy
         LOG.info("Problem connecting to server: " + nnAddr);
@@ -699,7 +723,7 @@ class BPOfferService implements Runnable
       }
       break;
     case DatanodeProtocol.DNA_FINALIZE:
-      String bp = ((DatanodeCommand.Finalize) cmd).getBlockPoolId(); 
+      String bp = ((FinalizeCommand) cmd).getBlockPoolId(); 
       assert getBlockPoolId().equals(bp) :
         "BP " + getBlockPoolId() + " received DNA_FINALIZE " +
         "for other block pool " + bp;
@@ -764,12 +788,12 @@ class BPOfferService implements Runnable
   }
 
   @VisibleForTesting
-  DatanodeProtocol getBpNamenode() {
+  DatanodeProtocolClientSideTranslatorPB getBpNamenode() {
     return bpNamenode;
   }
 
   @VisibleForTesting
-  void setBpNamenode(DatanodeProtocol bpNamenode) {
+  void setBpNamenode(DatanodeProtocolClientSideTranslatorPB bpNamenode) {
     this.bpNamenode = bpNamenode;
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Mon Feb 27 04:54:33 2012
@@ -55,6 +55,7 @@ class DNConf {
   final long readaheadLength;
   final long heartBeatInterval;
   final long blockReportInterval;
+  final long deleteReportInterval;
   final long initialBlockReportDelay;
   final int writePacketSize;
 
@@ -105,6 +106,7 @@ class DNConf {
     heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
     
+    this.deleteReportInterval = 100 * heartBeatInterval;
     // do we need to sync block file contents to disk when blockfile is closed?
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
         DFS_DATANODE_SYNCONCLOSE_DEFAULT);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Feb 27 04:54:33 2012
@@ -100,8 +100,16 @@ import org.apache.hadoop.hdfs.protocol.R
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -124,11 +132,13 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -154,6 +164,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.protobuf.BlockingService;
 
 
 /**********************************************************
@@ -385,7 +396,7 @@ public class DataNode extends Configured
   private List<ServicePlugin> plugins;
   
   // For InterDataNodeProtocol
-  public Server ipcServer;
+  public RPC.Server ipcServer;
 
   private SecureResources secureResources = null;
   private AbstractList<File> dataDirs;
@@ -507,11 +518,26 @@ public class DataNode extends Configured
   private void initIpcServer(Configuration conf) throws IOException {
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
-    ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
-                              ipcAddr.getPort(), 
-                              conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
-                                          DFS_DATANODE_HANDLER_COUNT_DEFAULT), 
-                              false, conf, blockPoolTokenSecretManager);
+    
+    // Add all the RPC protocols that the Datanode implements    
+    RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = 
+          new ClientDatanodeProtocolServerSideTranslatorPB(this);
+    BlockingService service = ClientDatanodeProtocolService
+        .newReflectiveBlockingService(clientDatanodeProtocolXlator);
+    ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
+        .getHostName(), ipcAddr.getPort(), conf.getInt(
+        DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
+        false, conf, blockPoolTokenSecretManager);
+    
+    InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
+        new InterDatanodeProtocolServerSideTranslatorPB(this);
+    service = InterDatanodeProtocolService
+        .newReflectiveBlockingService(interDatanodeProtocolXlator);
+    DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
+        ipcServer);
+    
     // set service-level authorization security policy
     if (conf.getBoolean(
         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
@@ -634,6 +660,17 @@ public class DataNode extends Configured
     }
   }
   
+  // calls specific to BP
+  protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if (bpos != null) {
+      bpos.notifyNamenodeDeletedBlock(block);
+    } else {
+      LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+          + block.getBlockPoolId());
+    }
+  }
+  
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos == null || bpos.bpNamenode == null) {
@@ -952,15 +989,13 @@ public class DataNode extends Configured
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
     }
-    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
     try {
       return loginUgi
           .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
             public InterDatanodeProtocol run() throws IOException {
-              return (InterDatanodeProtocol) RPC.getProxy(
-                  InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
-                  addr, UserGroupInformation.getCurrentUser(), conf,
-                  NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+              return new InterDatanodeProtocolTranslatorPB(addr, loginUgi,
+                  conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
             }
           });
     } catch (InterruptedException ie) {
@@ -1207,7 +1242,7 @@ public class DataNode extends Configured
 
     //inform NameNodes
     for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
-      DatanodeProtocol nn = bpos.bpNamenode;
+      DatanodeProtocolClientSideTranslatorPB nn = bpos.bpNamenode;
       try {
         nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
       } catch(IOException e) {
@@ -1241,7 +1276,8 @@ public class DataNode extends Configured
   private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
-    DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+    DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
+        .getBlockPoolId());
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
     if (!data.isValidBlock(block)) {
@@ -1819,7 +1855,7 @@ public class DataNode extends Configured
     return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
   }
 
-  /** {@inheritDoc} */
+  @Override
   public long getProtocolVersion(String protocol, long clientVersion
       ) throws IOException {
     if (protocol.equals(InterDatanodeProtocol.class.getName())) {
@@ -1852,7 +1888,7 @@ public class DataNode extends Configured
       this.rInfo = rInfo;
     }
 
-    /** {@inheritDoc} */
+    @Override
     public String toString() {
       return "block:" + rInfo + " node:" + id;
     }
@@ -1909,7 +1945,8 @@ public class DataNode extends Configured
    * @return Namenode corresponding to the bpid
    * @throws IOException
    */
-  public DatanodeProtocol getBPNamenode(String bpid) throws IOException {
+  public DatanodeProtocolClientSideTranslatorPB getBPNamenode(String bpid)
+      throws IOException {
     BPOfferService bpos = blockPoolManager.get(bpid);
     if(bpos == null || bpos.bpNamenode == null) {
       throw new IOException("cannot find a namnode proxy for bpid=" + bpid);
@@ -1921,7 +1958,8 @@ public class DataNode extends Configured
   void syncBlock(RecoveringBlock rBlock,
                          List<BlockRecord> syncList) throws IOException {
     ExtendedBlock block = rBlock.getBlock();
-    DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+    DatanodeProtocolClientSideTranslatorPB nn = getBPNamenode(block
+        .getBlockPoolId());
     
     long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
@@ -2036,7 +2074,6 @@ public class DataNode extends Configured
   }
 
   // ClientDataNodeProtocol implementation
-  /** {@inheritDoc} */
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
     checkWriteAccess(block);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Feb 27 04:54:33 2012
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -1122,7 +1121,7 @@ class FSDataset implements FSDatasetInte
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
     }
-    asyncDiskService = new FSDatasetAsyncDiskService(roots);
+    asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
     registerMBean(storage.getStorageID());
   }
 
@@ -1202,8 +1201,8 @@ class FSDataset implements FSDatasetInte
   File getBlockFile(String bpid, Block b) throws IOException {
     File f = validateBlockFile(bpid, b);
     if(f == null) {
-      if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-        InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
       }
       throw new IOException("Block " + b + " is not valid.");
     }
@@ -1964,8 +1963,8 @@ class FSDataset implements FSDatasetInte
       datanode.checkDiskError();
     }
     
-    if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-      InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("b=" + b + ", f=" + f);
     }
     return null;
   }
@@ -2049,15 +2048,19 @@ class FSDataset implements FSDatasetInte
         volumeMap.remove(bpid, invalidBlks[i]);
       }
       File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
-      
+
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, bpid, f, metaFile,
-          invalidBlks[i].toString());
+      asyncDiskService.deleteAsync(v, f, metaFile,
+          new ExtendedBlock(bpid, invalidBlks[i]));
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
     }
   }
+  
+  public void notifyNamenodeDeletedBlock(ExtendedBlock block){
+    datanode.notifyNamenodeDeletedBlock(block);
+  }
 
   @Override // {@link FSDatasetInterface}
   public synchronized boolean contains(final ExtendedBlock block) {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Mon Feb 27 04:54:33 2012
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 
 /*
  * This class is a container of multiple thread pools, each for a volume,
@@ -47,6 +49,8 @@ import org.apache.commons.logging.LogFac
  */
 class FSDatasetAsyncDiskService {
   
+  final FSDataset dataset;
+  
   public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
   
   // ThreadPool core pool size
@@ -70,8 +74,8 @@ class FSDatasetAsyncDiskService {
    * 
    * @param volumes The roots of the data volumes.
    */
-  FSDatasetAsyncDiskService(File[] volumes) {
-
+  FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
+    this.dataset = dataset;
     // Create one ThreadPool per volume
     for (int v = 0 ; v < volumes.length; v++) {
       final File vol = volumes[v];
@@ -147,13 +151,12 @@ class FSDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
-      File metaFile, String blockName) {
-    DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
-        + " for deletion");
-    ReplicaFileDeleteTask deletionTask = 
-        new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile,
-            blockName);
+  void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
+      ExtendedBlock block) {
+    DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+        + " file " + blockFile + " for deletion");
+    ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
+        volume, blockFile, metaFile, block);
     execute(volume.getCurrentDir(), deletionTask);
   }
   
@@ -161,19 +164,19 @@ class FSDatasetAsyncDiskService {
    *  as decrement the dfs usage of the volume. 
    */
   static class ReplicaFileDeleteTask implements Runnable {
+    final FSDataset dataset;
     final FSDataset.FSVolume volume;
-    final String blockPoolId;
     final File blockFile;
     final File metaFile;
-    final String blockName;
+    final ExtendedBlock block;
     
-    ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
-        File blockFile, File metaFile, String blockName) {
+    ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
+        File metaFile, ExtendedBlock block) {
+      this.dataset = dataset;
       this.volume = volume;
-      this.blockPoolId = bpid;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
-      this.blockName = blockName;
+      this.block = block;
     }
     
     FSDataset.FSVolume getVolume() {
@@ -183,9 +186,9 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockPoolId + " " + blockName
-          + " with block file " + blockFile + " and meta file " + metaFile
-          + " from volume " + volume;
+      return "deletion of block " + block.getBlockPoolId() + " "
+          + block.getLocalBlock().toString() + " with block file " + blockFile
+          + " and meta file " + metaFile + " from volume " + volume;
     }
 
     @Override
@@ -193,12 +196,15 @@ class FSDatasetAsyncDiskService {
       long dfsBytes = blockFile.length() + metaFile.length();
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
-            + blockPoolId + " " + blockName + " at file " + blockFile
-            + ". Ignored.");
+            + block.getBlockPoolId() + " " + block.getLocalBlock().toString()
+            + " at file " + blockFile + ". Ignored.");
       } else {
-        volume.decDfsUsed(blockPoolId, dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
-            + " at file " + blockFile);
+        if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
+          dataset.notifyNamenodeDeletedBlock(block);
+        }
+        volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+        DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
+            + block.getLocalBlock().toString() + " at file " + blockFile);
       }
     }
   };

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Mon Feb 27 04:54:33 2012
@@ -257,7 +257,7 @@ public interface FSDatasetInterface exte
       this.checksumIn = checksumIn;
     }
 
-    /** {@inheritDoc} */
+    @Override
     public void close() {
       IOUtils.closeStream(dataIn);
       IOUtils.closeStream(checksumIn);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Mon Feb 27 04:54:33 2012
@@ -18,18 +18,21 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.zip.Checksum;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Extension of FSImage for the backup node.
@@ -77,6 +80,8 @@ public class BackupImage extends FSImage
    * {@see #freezeNamespaceAtNextRoll()}
    */
   private boolean stopApplyingEditsOnNextRoll = false;
+  
+  private FSNamesystem namesystem;
 
   /**
    * Construct a backup image.
@@ -88,6 +93,10 @@ public class BackupImage extends FSImage
     storage.setDisablePreUpgradableLayoutCheck(true);
     bnState = BNState.DROP_UNTIL_NEXT_ROLL;
   }
+  
+  void setNamesystem(FSNamesystem fsn) {
+    this.namesystem = fsn;
+  }
 
   /**
    * Analyze backup storage directories for consistency.<br>
@@ -136,7 +145,7 @@ public class BackupImage extends FSImage
    * and create empty edits.
    */
   void saveCheckpoint() throws IOException {
-    saveNamespace();
+    saveNamespace(namesystem);
   }
 
   /**
@@ -219,7 +228,7 @@ public class BackupImage extends FSImage
       }
       lastAppliedTxId += numTxns;
       
-      getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
+      namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
     } finally {
       backupInputStream.clear();
     }
@@ -257,11 +266,18 @@ public class BackupImage extends FSImage
         new FSImageTransactionalStorageInspector();
       
       storage.inspectStorageDirs(inspector);
-      LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
-          target - 1);
-  
-      logLoadPlan.doRecovery();
-      loadEdits(logLoadPlan.getEditsFiles());
+
+      editLog.recoverUnclosedStreams();
+      Iterable<EditLogInputStream> editStreamsAll 
+        = editLog.selectInputStreams(lastAppliedTxId, target - 1);
+      // remove inprogress
+      List<EditLogInputStream> editStreams = Lists.newArrayList();
+      for (EditLogInputStream s : editStreamsAll) {
+        if (s.getFirstTxId() != editLog.getCurSegmentTxId()) {
+          editStreams.add(s);
+        }
+      }
+      loadEdits(editStreams, namesystem);
     }
     
     // now, need to load the in-progress file
@@ -271,7 +287,24 @@ public class BackupImage extends FSImage
         return false; // drop lock and try again to load local logs
       }
       
-      EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
+      EditLogInputStream stream = null;
+      Collection<EditLogInputStream> editStreams
+        = getEditLog().selectInputStreams(
+            getEditLog().getCurSegmentTxId(),
+            getEditLog().getCurSegmentTxId());
+      
+      for (EditLogInputStream s : editStreams) {
+        if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) {
+          stream = s;
+        }
+        break;
+      }
+      if (stream == null) {
+        LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId()
+                 + ". This indicates that there is an error in synchronization in BackupImage");
+        return false;
+      }
+
       try {
         long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
         
@@ -285,7 +318,7 @@ public class BackupImage extends FSImage
           "expected to load " + remainingTxns + " but loaded " +
           numLoaded + " from " + stream;
       } finally {
-        IOUtils.closeStream(stream);
+        FSEditLog.closeAllStreams(editStreams);
       }
 
       LOG.info("Successfully synced BackupNode with NameNode at txnid " +

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Mon Feb 27 04:54:33 2012
@@ -57,12 +57,31 @@ class BackupJournalManager implements Jo
       throws IOException {
   }
 
+  @Override
+  public long getNumberOfTransactions(long fromTxnId) 
+      throws IOException, CorruptionException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    return 0;
+  }
+  
+  @Override
+  public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
+    // This JournalManager is never used for input. Therefore it cannot
+    // return any transactions
+    throw new IOException("Unsupported operation");
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+  }
+
   public boolean matchesRegistration(NamenodeRegistration bnReg) {
     return bnReg.getAddress().equals(this.bnReg.getAddress());
   }
 
   @Override
-  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
-    return null;
+  public String toString() {
+    return "BackupJournalManager";
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Mon Feb 27 04:54:33 2012
@@ -28,16 +28,24 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.BlockingService;
 
 /**
  * BackupNode.
@@ -61,7 +69,7 @@ public class BackupNode extends NameNode
   private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
 
   /** Name-node proxy */
-  NamenodeProtocol namenode;
+  NamenodeProtocolTranslatorPB namenode;
   /** Name-node RPC address */
   String nnRpcAddress;
   /** Name-node HTTP address */
@@ -123,6 +131,7 @@ public class BackupNode extends NameNode
   protected void loadNamesystem(Configuration conf) throws IOException {
     BackupImage bnImage = new BackupImage(conf);
     this.namesystem = new FSNamesystem(conf, bnImage);
+    bnImage.setNamesystem(namesystem);
     bnImage.recoverCreateRead();
   }
 
@@ -175,7 +184,9 @@ public class BackupNode extends NameNode
       }
     }
     // Stop the RPC client
-    RPC.stopProxy(namenode);
+    if (namenode != null) {
+      IOUtils.cleanup(LOG, namenode);
+    }
     namenode = null;
     // Stop the checkpoint manager
     if(checkpointManager != null) {
@@ -186,12 +197,19 @@ public class BackupNode extends NameNode
     super.stop();
   }
 
-  static class BackupNodeRpcServer extends NameNodeRpcServer implements JournalProtocol {
+  static class BackupNodeRpcServer extends NameNodeRpcServer implements
+      JournalProtocol {
     private final String nnRpcAddress;
     
     private BackupNodeRpcServer(Configuration conf, BackupNode nn)
         throws IOException {
       super(conf, nn);
+      JournalProtocolServerSideTranslatorPB journalProtocolTranslator = 
+          new JournalProtocolServerSideTranslatorPB(this);
+      BlockingService service = JournalProtocolService
+          .newReflectiveBlockingService(journalProtocolTranslator);
+      DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service,
+          this.clientRpcServer);
       nnRpcAddress = nn.nnRpcAddress;
     }
 
@@ -200,9 +218,8 @@ public class BackupNode extends NameNode
         throws IOException {
       if (protocol.equals(JournalProtocol.class.getName())) {
         return JournalProtocol.versionID;
-      } else {
-        return super.getProtocolVersion(protocol, clientVersion);
       }
+      return super.getProtocolVersion(protocol, clientVersion);
     }
   
     /////////////////////////////////////////////////////
@@ -244,7 +261,7 @@ public class BackupNode extends NameNode
       verifyRequest(nnReg);
       if(!nnRpcAddress.equals(nnReg.getAddress()))
         throw new IOException("Journal request from unexpected name-node: "
-            + nnReg.getAddress() + " expecting " + rpcAddress);
+            + nnReg.getAddress() + " expecting " + clientRpcAddress);
       getBNImage().journal(firstTxId, numTxns, records);
     }
   
@@ -278,9 +295,8 @@ public class BackupNode extends NameNode
   private NamespaceInfo handshake(Configuration conf) throws IOException {
     // connect to name node
     InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
-    this.namenode =
-      (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
-          NamenodeProtocol.versionID, nnAddress, conf);
+    this.namenode = new NamenodeProtocolTranslatorPB(nnAddress, conf,
+        UserGroupInformation.getCurrentUser());
     this.nnRpcAddress = getHostPortString(nnAddress);
     this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
     // get version and id info from the name-node
@@ -293,7 +309,9 @@ public class BackupNode extends NameNode
         LOG.info("Problem connecting to server: " + nnAddress);
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
+        } catch (InterruptedException ie) {
+          LOG.warn("Encountered exception ", e);
+        }
       }
     }
     return nsInfo;
@@ -342,7 +360,9 @@ public class BackupNode extends NameNode
         LOG.info("Problem connecting to name-node: " + nnRpcAddress);
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
+        } catch (InterruptedException ie) {
+          LOG.warn("Encountered exception ", e);
+        }
       }
     }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Mon Feb 27 04:54:33 2012
@@ -37,9 +37,7 @@ public class CheckpointSignature extends
                       implements WritableComparable<CheckpointSignature> {
   private static final String FIELD_SEPARATOR = ":";
   private static final int NUM_FIELDS = 7;
-
   String blockpoolID = "";
-  
   long mostRecentCheckpointTxId;
   long curSegmentTxId;
 
@@ -67,6 +65,14 @@ public class CheckpointSignature extends
     blockpoolID = fields[i++];
   }
 
+  public CheckpointSignature(StorageInfo info, String blockpoolID,
+      long mostRecentCheckpointTxId, long curSegmentTxId) {
+    super(info);
+    this.blockpoolID = blockpoolID;
+    this.mostRecentCheckpointTxId = mostRecentCheckpointTxId;
+    this.curSegmentTxId = curSegmentTxId;
+  }
+
   /**
    * Get the cluster id from CheckpointSignature
    * @return the cluster id
@@ -83,6 +89,14 @@ public class CheckpointSignature extends
     return blockpoolID;
   }
 
+  public long getMostRecentCheckpointTxId() {
+    return mostRecentCheckpointTxId;
+  }
+
+  public long getCurSegmentTxId() {
+    return curSegmentTxId;
+  }
+
   /**
    * Set the block pool id of CheckpointSignature.
    * 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Mon Feb 27 04:54:33 2012
@@ -224,7 +224,7 @@ class Checkpointer extends Daemon {
         
         LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
         File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
-        bnImage.reloadFromImageFile(file);
+        bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
       }
       
       lastApplied = bnImage.getLastAppliedTxId();
@@ -238,11 +238,11 @@ class Checkpointer extends Daemon {
             backupNode.nnHttpAddress, log, bnStorage);
       }
   
-      rollForwardByApplyingLogs(manifest, bnImage);
+      rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
     }
 
     long txid = bnImage.getLastAppliedTxId();
-    bnImage.saveFSImageInAllDirs(txid);
+    bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
     bnStorage.writeAll();
 
     if(cpCmd.needToReturnImage()) {
@@ -272,19 +272,21 @@ class Checkpointer extends Daemon {
 
   static void rollForwardByApplyingLogs(
       RemoteEditLogManifest manifest,
-      FSImage dstImage) throws IOException {
+      FSImage dstImage,
+      FSNamesystem dstNamesystem) throws IOException {
     NNStorage dstStorage = dstImage.getStorage();
   
-    List<File> editsFiles = Lists.newArrayList();
+    List<EditLogInputStream> editsStreams = Lists.newArrayList();    
     for (RemoteEditLog log : manifest.getLogs()) {
       File f = dstStorage.findFinalizedEditsFile(
           log.getStartTxId(), log.getEndTxId());
       if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
-        editsFiles.add(f);
-      }
+        editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), 
+                                                    log.getEndTxId()));
+       }
     }
     LOG.info("Checkpointer about to load edits from " +
-        editsFiles.size() + " file(s).");
-    dstImage.loadEdits(editsFiles);
+        editsStreams.size() + " stream(s).");
+    dstImage.loadEdits(editsStreams, dstNamesystem);
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java Mon Feb 27 04:54:33 2012
@@ -40,7 +40,7 @@ public class ContentSummaryServlet exten
   /** For java.io.Serializable */
   private static final long serialVersionUID = 1L;
   
-  /** {@inheritDoc} */
+  @Override
   public void doGet(final HttpServletRequest request,
       final HttpServletResponse response) throws ServletException, IOException {
     final Configuration conf = 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Mon Feb 27 04:54:33 2012
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import com.google.common.base.Preconditions;
 
 /**
@@ -122,4 +123,14 @@ class EditLogBackupInputStream extends E
     reader = null;
     this.version = 0;
   }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
+
+  @Override
+  public long getLastTxId() throws IOException {
+    return HdfsConstants.INVALID_TXID;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Mon Feb 27 04:54:33 2012
@@ -22,11 +22,11 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 
 /**
@@ -40,7 +40,7 @@ import org.apache.hadoop.net.NetUtils;
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private JournalProtocol backupNode;        // RPC proxy to backup node
+  private JournalProtocolTranslatorPB backupNode;  // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private EditsDoubleBuffer doubleBuf;
@@ -57,8 +57,7 @@ class EditLogBackupOutputStream extends 
     Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
-        RPC.getProxy(JournalProtocol.class,
-            JournalProtocol.versionID, bnAddress, new HdfsConfiguration());
+          new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
@@ -105,14 +104,14 @@ class EditLogBackupOutputStream extends 
       throw new IOException("BackupEditStream has " + size +
                           " records still to be flushed and cannot be closed.");
     } 
-    RPC.stopProxy(backupNode); // stop the RPC threads
+    IOUtils.cleanup(Storage.LOG, backupNode); // stop the RPC threads
     doubleBuf.close();
     doubleBuf = null;
   }
 
   @Override
   public void abort() throws IOException {
-    RPC.stopProxy(backupNode);
+    IOUtils.cleanup(Storage.LOG, backupNode);
     doubleBuf = null;
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Mon Feb 27 04:54:33 2012
@@ -27,6 +27,7 @@ import java.io.DataInputStream;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -37,12 +38,15 @@ import com.google.common.annotations.Vis
 class EditLogFileInputStream extends EditLogInputStream {
   private final File file;
   private final FileInputStream fStream;
+  final private long firstTxId;
+  final private long lastTxId;
   private final int logVersion;
   private final FSEditLogOp.Reader reader;
   private final FSEditLogLoader.PositionTrackingInputStream tracker;
   
   /**
    * Open an EditLogInputStream for the given file.
+   * The file is pretransactional, so has no txids
    * @param name filename to open
    * @throws LogHeaderCorruptException if the header is either missing or
    *         appears to be corrupt/truncated
@@ -51,6 +55,21 @@ class EditLogFileInputStream extends Edi
    */
   EditLogFileInputStream(File name)
       throws LogHeaderCorruptException, IOException {
+    this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
+  }
+
+  /**
+   * Open an EditLogInputStream for the given file.
+   * @param name filename to open
+   * @param firstTxId first transaction found in file
+   * @param lastTxId last transaction id found in file
+   * @throws LogHeaderCorruptException if the header is either missing or
+   *         appears to be corrupt/truncated
+   * @throws IOException if an actual IO error occurs while reading the
+   *         header
+   */
+  EditLogFileInputStream(File name, long firstTxId, long lastTxId)
+      throws LogHeaderCorruptException, IOException {
     file = name;
     fStream = new FileInputStream(name);
 
@@ -65,6 +84,18 @@ class EditLogFileInputStream extends Edi
     }
 
     reader = new FSEditLogOp.Reader(in, logVersion);
+    this.firstTxId = firstTxId;
+    this.lastTxId = lastTxId;
+  }
+
+  @Override
+  public long getFirstTxId() throws IOException {
+    return firstTxId;
+  }
+  
+  @Override
+  public long getLastTxId() throws IOException {
+    return lastTxId;
   }
 
   @Override // JournalStream
@@ -116,7 +147,8 @@ class EditLogFileInputStream extends Edi
       // If it's missing its header, this is equivalent to no transactions
       FSImage.LOG.warn("Log at " + file + " has no valid header",
           corrupt);
-      return new FSEditLogLoader.EditLogValidation(0, 0);
+      return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, 
+                                                   HdfsConstants.INVALID_TXID);
     }
     
     try {

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Mon Feb 27 04:54:33 2012
@@ -83,7 +83,6 @@ class EditLogFileOutputStream extends Ed
     return JournalType.FILE;
   }
 
-  /** {@inheritDoc} */
   @Override
   void write(FSEditLogOp op) throws IOException {
     doubleBuf.writeOp(op);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Mon Feb 27 04:54:33 2012
@@ -28,6 +28,17 @@ import java.io.IOException;
  * into the #{@link EditLogOutputStream}.
  */
 abstract class EditLogInputStream implements JournalStream, Closeable {
+  /** 
+   * @return the first transaction which will be found in this stream
+   */
+  public abstract long getFirstTxId() throws IOException;
+  
+  /** 
+   * @return the last transaction which will be found in this stream
+   */
+  public abstract long getLastTxId() throws IOException;
+
+
   /**
    * Close the stream.
    * @throws IOException if an error occurred while closing

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Feb 27 04:54:33 2012
@@ -57,9 +57,10 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.util.ByteArray;
 
+import com.google.common.base.Preconditions;
+
 /*************************************************
  * FSDirectory stores the filesystem directory state.
  * It handles writing/loading values to disk, and logging
@@ -73,6 +74,7 @@ public class FSDirectory implements Clos
 
   INodeDirectoryWithQuota rootDir;
   FSImage fsImage;  
+  private final FSNamesystem namesystem;
   private volatile boolean ready = false;
   private static final long UNKNOWN_DISK_SPACE = -1;
   private final int maxComponentLength;
@@ -114,15 +116,9 @@ public class FSDirectory implements Clos
    */
   private final NameCache<ByteArray> nameCache;
 
-  /** Access an existing dfs name directory. */
-  FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
-    this(new FSImage(conf), ns, conf);
-  }
-
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
     this.dirLock = new ReentrantReadWriteLock(true); // fair
     this.cond = dirLock.writeLock().newCondition();
-    fsImage.setFSNamesystem(ns);
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
         Integer.MAX_VALUE, UNKNOWN_DISK_SPACE);
@@ -146,10 +142,11 @@ public class FSDirectory implements Clos
     NameNode.LOG.info("Caching file names occuring more than " + threshold
         + " times ");
     nameCache = new NameCache<ByteArray>(threshold);
+    namesystem = ns;
   }
     
   private FSNamesystem getFSNamesystem() {
-    return fsImage.getFSNamesystem();
+    return namesystem;
   }
 
   private BlockManager getBlockManager() {
@@ -157,33 +154,11 @@ public class FSDirectory implements Clos
   }
 
   /**
-   * Load the filesystem image into memory.
-   *
-   * @param startOpt Startup type as specified by the user.
-   * @throws IOException If image or editlog cannot be read.
+   * Notify that loading of this FSDirectory is complete, and
+   * it is ready for use 
    */
-  void loadFSImage(StartupOption startOpt) 
-      throws IOException {
-    // format before starting up if requested
-    if (startOpt == StartupOption.FORMAT) {
-      fsImage.format(fsImage.getStorage().determineClusterId());// reuse current id
-
-      startOpt = StartupOption.REGULAR;
-    }
-    boolean success = false;
-    try {
-      if (fsImage.recoverTransitionRead(startOpt)) {
-        fsImage.saveNamespace();
-      }
-      fsImage.openEditLog();
-      
-      fsImage.setCheckpointDirectories(null, null);
-      success = true;
-    } finally {
-      if (!success) {
-        fsImage.close();
-      }
-    }
+  void imageLoadComplete() {
+    Preconditions.checkState(!ready, "FSDirectory already loaded");
     writeLock();
     try {
       setReady(true);

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1294028&r1=1294027&r2=1294028&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Feb 27 04:54:33 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -1069,6 +1072,112 @@ public class FSEditLog  {
   }
 
   /**
+   * Find the best editlog input stream to read from txid. In this case
+   * best means the editlog which has the largest continuous range of 
+   * transactions starting from the transaction id, fromTxId.
+   *
+   * If a journal throws an CorruptionException while reading from a txn id,
+   * it means that it has more transactions, but can't find any from fromTxId. 
+   * If this is the case and no other journal has transactions, we should throw
+   * an exception as it means more transactions exist, we just can't load them.
+   *
+   * @param fromTxId Transaction id to start from.
+   * @return a edit log input stream with tranactions fromTxId 
+   *         or null if no more exist
+   */
+  private EditLogInputStream selectStream(long fromTxId) 
+      throws IOException {
+    JournalManager bestjm = null;
+    long bestjmNumTxns = 0;
+    CorruptionException corruption = null;
+
+    for (JournalAndStream jas : journals) {
+      JournalManager candidate = jas.getManager();
+      long candidateNumTxns = 0;
+      try {
+        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
+      } catch (CorruptionException ce) {
+        corruption = ce;
+      } catch (IOException ioe) {
+        LOG.warn("Error reading number of transactions from " + candidate);
+        continue; // error reading disk, just skip
+      }
+      
+      if (candidateNumTxns > bestjmNumTxns) {
+        bestjm = candidate;
+        bestjmNumTxns = candidateNumTxns;
+      }
+    }
+    
+    
+    if (bestjm == null) {
+      /**
+       * If all candidates either threw a CorruptionException or
+       * found 0 transactions, then a gap exists. 
+       */
+      if (corruption != null) {
+        throw new IOException("Gap exists in logs from " 
+                              + fromTxId, corruption);
+      } else {
+        return null;
+      }
+    }
+
+    return bestjm.getInputStream(fromTxId);
+  }
+
+  /**
+   * Run recovery on all journals to recover any unclosed segments
+   */
+  void recoverUnclosedStreams() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+        @Override
+        public void apply(JournalAndStream jas) throws IOException {
+          jas.manager.recoverUnfinalizedSegments();
+        }
+      }, "recovering unclosed streams");
+  }
+
+  /**
+   * Select a list of input streams to load.
+   * @param fromTxId first transaction in the selected streams
+   * @param toAtLeast the selected streams must contain this transaction
+   */
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
+      throws IOException {
+    List<EditLogInputStream> streams = Lists.newArrayList();
+    
+    boolean gapFound = false;
+    EditLogInputStream stream = selectStream(fromTxId);
+    while (stream != null) {
+      fromTxId = stream.getLastTxId() + 1;
+      streams.add(stream);
+      try {
+        stream = selectStream(fromTxId);
+      } catch (IOException ioe) {
+        gapFound = true;
+        break;
+      }
+    }
+    if (fromTxId <= toAtLeastTxId || gapFound) {
+      closeAllStreams(streams);
+      throw new IOException("No non-corrupt logs for txid " 
+                            + fromTxId);
+    }
+    return streams;
+  }
+
+  /** 
+   * Close all the streams in a collection
+   * @param streams The list of streams to close
+   */
+  static void closeAllStreams(Iterable<EditLogInputStream> streams) {
+    for (EditLogInputStream s : streams) {
+      IOUtils.closeStream(s);
+    }
+  }
+
+  /**
    * Container for a JournalManager paired with its currently
    * active stream.
    * 
@@ -1137,30 +1246,5 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
-
-    private EditLogInputStream getInProgressInputStream() throws IOException {
-      return manager.getInProgressInputStream(segmentStartsAtTxId);
-    }
-  }
-
-  /**
-   * @return an EditLogInputStream that reads from the same log that
-   * the edit log is currently writing. This is used from the BackupNode
-   * during edits synchronization.
-   * @throws IOException if no valid logs are available.
-   */
-  synchronized EditLogInputStream getInProgressFileInputStream()
-      throws IOException {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      try {
-        EditLogInputStream in = jas.getInProgressInputStream();
-        if (in != null) return in;
-      } catch (IOException ioe) {
-        LOG.warn("Unable to get the in-progress input stream from " + jas,
-            ioe);
-      }
-    }
-    throw new IOException("No in-progress stream provided edits");
   }
 }



Mime
View raw message