hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1097905 [4/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/ja...
Date Fri, 29 Apr 2011 18:16:38 GMT
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Apr 29 18:16:32 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
@@ -66,7 +67,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -76,10 +77,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -96,7 +99,6 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@@ -168,7 +170,7 @@ import org.mortbay.util.ajax.JSON;
 @InterfaceAudience.Private
 public class DataNode extends Configured 
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
-    Runnable, DataNodeMXBean {
+    DataNodeMXBean {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
   static{
@@ -199,45 +201,175 @@ public class DataNode extends Configured
     return NetUtils.createSocketAddr(target);
   }
   
-  public DatanodeProtocol namenode = null;
-  public FSDatasetInterface data = null;
-  public DatanodeRegistration dnRegistration = null;
+  /**
+   * Manages he BPOfferService objects for the data node.
+   * Creation, removal, starting, stopping, shutdown on BPOfferService
+   * objects must be done via APIs in this class.
+   */
+  @InterfaceAudience.Private
+  class BlockPoolManager {
+    private final Map<String, BPOfferService> bpMapping;
+    private final Map<InetSocketAddress, BPOfferService> nameNodeThreads;
+ 
+    //This lock is used only to ensure exclusion of refreshNamenodes
+    private final Object refreshNamenodesLock = new Object();
+    
+    BlockPoolManager(Configuration conf)
+        throws IOException {
+      bpMapping = new HashMap<String, BPOfferService>();
+      nameNodeThreads = new HashMap<InetSocketAddress, BPOfferService>();
+  
+      List<InetSocketAddress> isas = DFSUtil.getNNServiceRpcAddresses(conf);
+      for(InetSocketAddress isa : isas) {
+        BPOfferService bpos = new BPOfferService(isa);
+        nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+      }
+    }
+    
+    synchronized void addBlockPool(BPOfferService t) {
+      if (nameNodeThreads.get(t.getNNSocketAddress()) == null) {
+        throw new IllegalArgumentException(
+            "Unknown BPOfferService thread for namenode address:"
+                + t.getNNSocketAddress());
+      }
+      if (t.getBlockPoolId() == null) {
+        throw new IllegalArgumentException("Null blockpool id");
+      }
+      bpMapping.put(t.getBlockPoolId(), t);
+    }
+    
+    /**
+     * Returns the array of BPOfferService objects. 
+     * Caution: The BPOfferService returned could be shutdown any time.
+     */
+    synchronized BPOfferService[] getAllNamenodeThreads() {
+      BPOfferService[] bposArray = new BPOfferService[nameNodeThreads.values()
+          .size()];
+      return nameNodeThreads.values().toArray(bposArray);
+    }
+    
+    synchronized BPOfferService get(InetSocketAddress addr) {
+      return nameNodeThreads.get(addr);
+    }
+    
+    synchronized BPOfferService get(String bpid) {
+      return bpMapping.get(bpid);
+    }
+    
+    synchronized void remove(BPOfferService t) {
+      nameNodeThreads.remove(t.getNNSocketAddress());
+      bpMapping.remove(t.getBlockPoolId());
+    }
+    
+    void shutDownAll() throws InterruptedException {
+      BPOfferService[] bposArray = this.getAllNamenodeThreads();
+      
+      for (BPOfferService bpos : bposArray) {
+        bpos.stop(); //interrupts the threads
+      }
+      //now join
+      for (BPOfferService bpos : bposArray) {
+        bpos.join();
+      }
+    }
+    
+    synchronized void startAll() throws IOException {
+      try {
+        UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Object>() {
+              public Object run() throws Exception {
+                for (BPOfferService bpos : nameNodeThreads.values()) {
+                  bpos.start();
+                }
+                return null;
+              }
+            });
+      } catch (InterruptedException ex) {
+        IOException ioe = new IOException();
+        ioe.initCause(ex.getCause());
+        throw ioe;
+      }
+    }
+    
+    void joinAll() throws InterruptedException {
+      for (BPOfferService bpos: this.getAllNamenodeThreads()) {
+        bpos.join();
+      }
+    }
+    
+    void refreshNamenodes(Configuration conf)
+        throws IOException, InterruptedException {
+      LOG.info("Refresh request received for nameservices: "
+          + conf.get(DFS_FEDERATION_NAMESERVICES));
+      List<InetSocketAddress> newAddresses = 
+        DFSUtil.getNNServiceRpcAddresses(conf);
+      List<BPOfferService> toShutdown = new ArrayList<BPOfferService>();
+      List<InetSocketAddress> toStart = new ArrayList<InetSocketAddress>();
+      synchronized (refreshNamenodesLock) {
+        synchronized (this) {
+          for (InetSocketAddress nnaddr : nameNodeThreads.keySet()) {
+            if (!(newAddresses.contains(nnaddr))) {
+              toShutdown.add(nameNodeThreads.get(nnaddr));
+            }
+          }
+          for (InetSocketAddress nnaddr : newAddresses) {
+            if (!(nameNodeThreads.containsKey(nnaddr))) {
+              toStart.add(nnaddr);
+            }
+          }
+
+          for (InetSocketAddress nnaddr : toStart) {
+            BPOfferService bpos = new BPOfferService(nnaddr);
+            nameNodeThreads.put(bpos.getNNSocketAddress(), bpos);
+          }
+
+          for (BPOfferService bpos : toShutdown) {
+            remove(bpos);
+          }
+        }
 
+        for (BPOfferService bpos : toShutdown) {
+          bpos.stop();
+          bpos.join();
+        }
+        // Now start the threads that are not already running.
+        startAll();
+      }
+    }
+  }
+  
   volatile boolean shouldRun = true;
-  private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-  private LinkedList<String> delHints = new LinkedList<String>();
+  private BlockPoolManager blockPoolManager;
+  public volatile FSDatasetInterface data = null;
+  private String clusterId = null;
+
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   long blockReportInterval;
-  //disallow the sending of BR before instructed to do so
-  long lastBlockReport = 0;
   boolean resetBlockReportTime = true;
   long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
-  long lastHeartbeat = 0;
   long heartBeatInterval;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
   DataNodeMetrics myMetrics;
-  private InetSocketAddress nameNodeAddr;
-  private InetSocketAddress nameNodeAddrForClient;
   private InetSocketAddress selfAddr;
-  private static DataNode datanodeObject = null;
-  private Thread dataNodeThread = null;
-  String machineName;
+  
+  private static volatile DataNode datanodeObject = null;
+  private volatile String hostName; // Host name of this datanode
+  
   private static String dnThreadName;
   int socketTimeout;
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
   boolean isBlockTokenEnabled;
-  BlockTokenSecretManager blockTokenSecretManager;
-  boolean isBlockTokenInitialized = false;
+  BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   boolean syncOnClose;
   
   public DataBlockScanner blockScanner = null;
-  public Daemon blockScannerThread = null;
+  private DirectoryScanner directoryScanner = null;
   
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
@@ -247,8 +379,10 @@ public class DataNode extends Configured
   // For InterDataNodeProtocol
   public Server ipcServer;
 
-  private SecureResources secureResources = null;  
-  
+  private SecureResources secureResources = null;
+  private AbstractList<File> dataDirs;
+  private Configuration conf;
+
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
@@ -259,75 +393,74 @@ public class DataNode extends Configured
   }
   
   /**
-   * Start a Datanode with specified server sockets for secure environments
-   * where they are run with privileged ports and injected from a higher
-   * level of capability
-   */
-  DataNode(final Configuration conf,
-           final AbstractList<File> dataDirs, final SecureResources resources) throws IOException {  
-    this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-                       DatanodeProtocol.versionID,
-                       NameNode.getServiceAddress(conf, true), 
-                       conf), resources);
-  }
-  
-  /**
    * Create the DataNode given a configuration, an array of dataDirs,
    * and a namenode proxy
    */
   DataNode(final Configuration conf, 
            final AbstractList<File> dataDirs,
-           final DatanodeProtocol namenode, final SecureResources resources) throws IOException {
+           final SecureResources resources) throws IOException {
     super(conf);
 
     DataNode.setDataNode(this);
     
     try {
-      startDataNode(conf, dataDirs, namenode, resources);
+      hostName = getHostName(conf);
+      startDataNode(conf, dataDirs, resources);
     } catch (IOException ie) {
       shutdown();
-     throw ie;
-   }
+      throw ie;
+    }
   }
 
-  private void initConfig(Configuration conf) throws UnknownHostException {
+  private synchronized void setClusterId(String cid) throws IOException {
+    if(clusterId != null && !clusterId.equals(cid)) {
+      throw new IOException ("cluster id doesn't match. old cid=" + clusterId 
+          + " new cid="+ cid);
+    }
+    // else
+    clusterId = cid;    
+  }
+
+  private static String getHostName(Configuration config)
+      throws UnknownHostException {
+    String name = null;
     // use configured nameserver & interface to get local hostname
-    if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
-      machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);   
+    if (config.get(DFS_DATANODE_HOST_NAME_KEY) != null) {
+      name = config.get(DFS_DATANODE_HOST_NAME_KEY);
     }
-    if (machineName == null) {
-      machineName = DNS.getDefaultHost(
-                                     conf.get("dfs.datanode.dns.interface","default"),
-                                     conf.get("dfs.datanode.dns.nameserver","default"));
+    if (name == null) {
+      name = DNS.getDefaultHost(config.get("dfs.datanode.dns.interface",
+          "default"), config.get("dfs.datanode.dns.nameserver", "default"));
     }
-    this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
-    this.nameNodeAddrForClient = NameNode.getAddress(conf);
-    
-    this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+    return name;
+  }
+
+  private void initConfig(Configuration conf) {
+    this.socketTimeout =  conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
-    this.socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+    this.socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
                                           HdfsConstants.WRITE_TIMEOUT);
     /* Based on results on different platforms, we might need set the default 
      * to false on some of them. */
     this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
                                              true);
-    this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
-                                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
+                                       DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
 
     this.blockReportInterval =
-      conf.getLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
-    this.initialBlockReportDelay = conf.getLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
+      conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL);
+    this.initialBlockReportDelay = conf.getLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
                                             BLOCKREPORT_INITIAL_DELAY)* 1000L; 
     if (this.initialBlockReportDelay >= blockReportInterval) {
       this.initialBlockReportDelay = 0;
       LOG.info("dfs.blockreport.initialDelay is greater than " +
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
-    this.heartBeatInterval = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
+    this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL) * 1000L;
 
     // do we need to sync block file contents to disk when blockfile is closed?
-    this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, 
-                                       DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
+    this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
+                                       DFS_DATANODE_SYNCONCLOSE_DEFAULT);
   }
   
   private void startInfoServer(Configuration conf) throws IOException {
@@ -337,16 +470,16 @@ public class DataNode extends Configured
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = (secureResources == null) 
        ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, 
-           conf, new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")))
+           conf, new AccessControlList(conf.get(DFS_ADMIN, " ")))
        : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
-           conf, new AccessControlList(conf.get(DFSConfigKeys.DFS_ADMIN, " ")),
+           conf, new AccessControlList(conf.get(DFS_ADMIN, " ")),
            secureResources.getListener());
     if(LOG.isDebugEnabled()) {
       LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
     }
     if (conf.getBoolean("dfs.https.enable", false)) {
-      boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
-                                               DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
+      boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+                                               DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
           "dfs.datanode.https.address", infoHost + ":" + 0));
       Configuration sslConf = new HdfsConfiguration(false);
@@ -360,52 +493,14 @@ public class DataNode extends Configured
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
     this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
         FileChecksumServlets.GetServlet.class);
-    this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
+    
+    this.infoServer.setAttribute("datanode", this);
     this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
     this.infoServer.addServlet(null, "/blockScannerReport", 
                                DataBlockScanner.Servlet.class);
     this.infoServer.start();
-    // adjust info port
-    this.dnRegistration.setInfoPort(this.infoServer.getPort());
-  }
-  
-  private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
-      throws IOException {
-    // get version and id info from the name-node
-    NamespaceInfo nsInfo = handshake();
-
-    StartupOption startOpt = getStartupOption(conf);
-    assert startOpt != null : "Startup option must be set.";
-    
-
-    boolean simulatedFSDataset = 
-        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
-    if (simulatedFSDataset) {
-        setNewStorageID(dnRegistration);
-        dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
-        dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
-        // it would have been better to pass storage as a parameter to
-        // constructor below - need to augment ReflectionUtils used below.
-        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
-        try {
-          //Equivalent of following (can't do because Simulated is in test dir)
-          //  this.data = new SimulatedFSDataset(conf);
-          this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
-              Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
-        } catch (ClassNotFoundException e) {
-          throw new IOException(StringUtils.stringifyException(e));
-        }
-    } else { // real storage
-      // read storage info, lock data dirs and transition fs state if necessary
-      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-      // adjust
-      this.dnRegistration.setStorageInfo(storage);
-      // initialize data node internal structure
-      this.data = new FSDataset(storage, conf);
-    }
   }
   
-
   private void startPlugins(Configuration conf) {
     plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
     for (ServicePlugin p: plugins) {
@@ -424,41 +519,96 @@ public class DataNode extends Configured
         conf.get("dfs.datanode.ipc.address"));
     ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(),
                               ipcAddr.getPort(), 
-                              conf.getInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 
-                                          DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT), 
-                              false, conf, blockTokenSecretManager);
+                              conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
+                                          DFS_DATANODE_HANDLER_COUNT_DEFAULT), 
+                              false, conf, blockPoolTokenSecretManager);
     // set service-level authorization security policy
     if (conf.getBoolean(
         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
       ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
     }
-
-    dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
-    LOG.info("dnRegistration = " + dnRegistration);
   }
   
-
-  private void initBlockScanner(Configuration conf) {
+/**
+ * Initialize the datanode's periodic scanners:
+ *     {@link DataBlockScanner}
+ *     {@link DirectoryScanner}
+ * They report results on a per-blockpool basis but do their scanning 
+ * on a per-Volume basis to minimize competition for disk iops.
+ * 
+ * @param conf - Configuration has the run intervals and other 
+ *               parameters for these periodic scanners
+ */
+  private void initPeriodicScanners(Configuration conf) {
+    initDataBlockScanner(conf);
+    initDirectoryScanner(conf);
+  }
+  
+  private void shutdownPeriodicScanners() {
+    shutdownDirectoryScanner();
+    shutdownDataBlockScanner();
+  }
+  
+  /**
+   * See {@link DataBlockScanner}
+   */
+  private synchronized void initDataBlockScanner(Configuration conf) {
+    if (blockScanner != null) {
+      return;
+    }
     String reason = null;
-    if (conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 
-                    DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
+    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
       reason = "verification is turned off by configuration";
-    } else if ( !(data instanceof FSDataset) ) {
+    } else if (!(data instanceof FSDataset)) {
       reason = "verifcation is supported only with FSDataset";
     } 
-    if ( reason == null ) {
+    if (reason == null) {
       blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
+      blockScanner.start();
+    } else {
+      LOG.info("Periodic Block Verification scan is disabled because " +
+               reason + ".");
+    }
+  }
+  
+  private void shutdownDataBlockScanner() {
+    if (blockScanner != null) {
+      blockScanner.shutdown();
+    }
+  }
+  
+  /**
+   * See {@link DirectoryScanner}
+   */
+  private synchronized void initDirectoryScanner(Configuration conf) {
+    if (directoryScanner != null) {
+      return;
+    }
+    String reason = null;
+    if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 
+                    DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
+      reason = "verification is turned off by configuration";
+    } else if (!(data instanceof FSDataset)) {
+      reason = "verification is supported only with FSDataset";
+    } 
+    if (reason == null) {
+      directoryScanner = new DirectoryScanner((FSDataset) data, conf);
+      directoryScanner.start();
     } else {
-      LOG.info("Periodic Block Verification is disabled because " +
+      LOG.info("Periodic Directory Tree Verification scan is disabled because " +
                reason + ".");
     }
   }
   
+  private synchronized void shutdownDirectoryScanner() {
+    if (directoryScanner != null) {
+      directoryScanner.shutdown();
+    }
+  }
+  
   private void initDataXceiver(Configuration conf) throws IOException {
-    // construct registration
     InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
-    int tmpPort = socAddr.getPort();
-    this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
 
     // find free port or use privileged port provided
     ServerSocket ss;
@@ -471,10 +621,9 @@ public class DataNode extends Configured
     }
     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
-    tmpPort = ss.getLocalPort();
+    int tmpPort = ss.getLocalPort();
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                      tmpPort);
-    this.dnRegistration.setName(machineName + ":" + tmpPort);
     LOG.info("Opened info server at " + tmpPort);
       
     this.threadGroup = new ThreadGroup("dataXceiverServer");
@@ -483,6 +632,715 @@ public class DataNode extends Configured
     this.threadGroup.setDaemon(true); // auto destroy when empty
   }
   
+  // calls specific to BP
+  protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if(bpos != null) {
+      bpos.notifyNamenodeReceivedBlock(block, delHint); 
+    } else {
+      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+          + block.getBlockPoolId());
+    }
+  }
+  
+  public void reportBadBlocks(ExtendedBlock block) throws IOException{
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if(bpos == null || bpos.bpNamenode == null) {
+      throw new IOException("cannot locate OfferService thread for bp="+block.getBlockPoolId());
+    }
+    bpos.reportBadBlocks(block);
+  }
+  
+  /**
+   * A thread per namenode to perform:
+   * <ul>
+   * <li> Pre-registration handshake with namenode</li>
+   * <li> Registration with namenode</li>
+   * <li> Send periodic heartbeats to the namenode</li>
+   * <li> Handle commands received from the datanode</li>
+   * </ul>
+   */
+  @InterfaceAudience.Private
+  class BPOfferService implements Runnable {
+    final InetSocketAddress nnAddr;
+    DatanodeRegistration bpRegistration;
+    NamespaceInfo bpNSInfo;
+    long lastBlockReport = 0;
+    private Thread bpThread;
+    private DatanodeProtocol bpNamenode;
+    private String blockPoolId;
+    private long lastHeartbeat = 0;
+    private volatile boolean initialized = false;
+    private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+    private final LinkedList<String> delHints = new LinkedList<String>();
+    private volatile boolean shouldServiceRun = true;
+    private boolean isBlockTokenInitialized = false;
+    UpgradeManagerDatanode upgradeManager = null;
+
+    BPOfferService(InetSocketAddress isa) {
+      this.bpRegistration = new DatanodeRegistration(getMachineName());
+      bpRegistration.setInfoPort(infoServer.getPort());
+      bpRegistration.setIpcPort(getIpcPort());
+      this.nnAddr = isa;
+    }
+
+    /**
+     * returns true if BP thread has completed initialization of storage
+     * and has registered with the corresponding namenode
+     * @return true if initialized
+     */
+    public boolean initialized() {
+      return initialized;
+    }
+    
+    public boolean isAlive() {
+      return shouldServiceRun && bpThread.isAlive();
+    }
+    
+    public String getBlockPoolId() {
+      return blockPoolId;
+    }
+    
+    private InetSocketAddress getNNSocketAddress() {
+      return nnAddr;
+    }
+ 
+    void setNamespaceInfo(NamespaceInfo nsinfo) {
+      bpNSInfo = nsinfo;
+      this.blockPoolId = nsinfo.getBlockPoolID();
+      blockPoolManager.addBlockPool(this);
+    }
+
+    void setNameNode(DatanodeProtocol dnProtocol) {
+        bpNamenode = dnProtocol;
+    }
+
+    private NamespaceInfo handshake() throws IOException {
+      NamespaceInfo nsInfo = new NamespaceInfo();
+      while (shouldRun && shouldServiceRun) {
+        try {
+          nsInfo = bpNamenode.versionRequest();
+          // verify build version
+          String nsVer = nsInfo.getBuildVersion();
+          String stVer = Storage.getBuildVersion();
+          LOG.info("handshake: namespace info = " + nsInfo);
+          
+          if(! nsVer.equals(stVer)) {
+            String errorMsg = "Incompatible build versions: bp = " + blockPoolId + 
+            "namenode BV = " + nsVer + "; datanode BV = " + stVer;
+            LOG.warn(errorMsg);
+            bpNamenode.errorReport( bpRegistration, 
+                DatanodeProtocol.NOTIFY, errorMsg );
+          } else {
+            break;
+          }
+        } catch(SocketTimeoutException e) {  // namenode is busy
+          LOG.warn("Problem connecting to server: " + nnAddr);
+        } catch(IOException e ) {  // namenode is not available
+          LOG.warn("Problem connecting to server: " + nnAddr);
+        }
+        
+        // try again in a second
+        try {
+          Thread.sleep(5000);
+        } catch (InterruptedException ie) {}
+      }
+      
+      assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+        "Data-node and name-node layout versions must be the same."
+        + "Expected: "+ FSConstants.LAYOUT_VERSION 
+        + " actual "+ nsInfo.getLayoutVersion();
+      return nsInfo;
+    }
+
+    void setupBP(Configuration conf, AbstractList<File> dataDirs) 
+    throws IOException {
+      // get NN proxy
+      DatanodeProtocol dnp = 
+        (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+            DatanodeProtocol.versionID, nnAddr, conf);
+      setNameNode(dnp);
+
+      // handshake with NN
+      NamespaceInfo nsInfo = handshake();
+      setNamespaceInfo(nsInfo);
+      synchronized(DataNode.this) {
+        // we do not allow namenode from different cluster to register
+        if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
+          throw new IOException(
+              "cannot register with the namenode because clusterid do not match:"
+              + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + 
+              ";dn cid=" + clusterId);
+        }
+
+        setupBPStorage();
+
+        setClusterId(nsInfo.clusterID);
+      }
+    
+      initPeriodicScanners(conf);
+    }
+    
+    void setupBPStorage() throws IOException {
+      StartupOption startOpt = getStartupOption(conf);
+      assert startOpt != null : "Startup option must be set.";
+
+      boolean simulatedFSDataset = 
+        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+      
+      if (simulatedFSDataset) {
+        initFsDataSet(conf, dataDirs);
+        bpRegistration.setStorageID(getStorageId()); //same as DN
+        bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+        bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
+        bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
+      } else {
+        // read storage info, lock data dirs and transition fs state if necessary          
+        storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
+        LOG.info("setting up storage: nsid=" + storage.namespaceID + ";bpid="
+            + blockPoolId + ";lv=" + storage.layoutVersion + ";nsInfo="
+            + bpNSInfo);
+
+        bpRegistration.setStorageID(getStorageId());
+        bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
+        initFsDataSet(conf, dataDirs);
+      }
+      data.addBlockPool(blockPoolId, conf);
+    }
+
+    /**
+     * This methods  arranges for the data node to send the block report at 
+     * the next heartbeat.
+     */
+    void scheduleBlockReport(long delay) {
+      if (delay > 0) { // send BR after random delay
+        lastBlockReport = System.currentTimeMillis()
+        - ( blockReportInterval - R.nextInt((int)(delay)));
+      } else { // send at next heartbeat
+        lastBlockReport = lastHeartbeat - blockReportInterval;
+      }
+      resetBlockReportTime = true; // reset future BRs for randomness
+    }
+
+    private void reportBadBlocks(ExtendedBlock block) {
+      DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
+      LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+      
+      try {
+        bpNamenode.reportBadBlocks(blocks);  
+      } catch (IOException e){
+        /* One common reason is that NameNode could be in safe mode.
+         * Should we keep on retrying in that case?
+         */
+        LOG.warn("Failed to report bad block " + block + " to namenode : " +
+                 " Exception : " + StringUtils.stringifyException(e));
+      }
+      
+    }
+    
+    /**
+     * Report received blocks and delete hints to the Namenode
+     * @throws IOException
+     */
+    private void reportReceivedBlocks() throws IOException {
+      //check if there are newly received blocks
+      Block [] blockArray=null;
+      String [] delHintArray=null;
+      synchronized(receivedBlockList) {
+        synchronized(delHints){
+          int numBlocks = receivedBlockList.size();
+          if (numBlocks > 0) {
+            if(numBlocks!=delHints.size()) {
+              LOG.warn("Panic: receiveBlockList and delHints are not of " +
+              "the same length" );
+            }
+            //
+            // Send newly-received blockids to namenode
+            //
+            blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+            delHintArray = delHints.toArray(new String[numBlocks]);
+          }
+        }
+      }
+      if (blockArray != null) {
+        if(delHintArray == null || delHintArray.length != blockArray.length ) {
+          LOG.warn("Panic: block array & delHintArray are not the same" );
+        }
+        bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
+            delHintArray);
+        synchronized(receivedBlockList) {
+          synchronized(delHints){
+            for(int i=0; i<blockArray.length; i++) {
+              receivedBlockList.remove(blockArray[i]);
+              delHints.remove(delHintArray[i]);
+            }
+          }
+        }
+      }
+    }
+
+    /*
+     * Informing the name node could take a long long time! Should we wait
+     * till namenode is informed before responding with success to the
+     * client? For now we don't.
+     */
+    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+      if(block==null || delHint==null) {
+        throw new IllegalArgumentException(
+            block==null?"Block is null":"delHint is null");
+      }
+      
+      if (!block.getBlockPoolId().equals(blockPoolId)) {
+        LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + 
+            " vs. " + blockPoolId);
+        return;
+      }
+      
+      synchronized (receivedBlockList) {
+        synchronized (delHints) {
+          receivedBlockList.add(block.getLocalBlock());
+          delHints.add(delHint);
+          receivedBlockList.notifyAll();
+        }
+      }
+    }
+
+
+    /**
+     * Report the list blocks to the Namenode
+     * @throws IOException
+     */
+    DatanodeCommand blockReport() throws IOException {
+      // send block report
+      DatanodeCommand cmd = null;
+      long startTime = now();
+      if (startTime - lastBlockReport > blockReportInterval) {
+        //
+        // Send latest block report if timer has expired.
+        // Get back a list of local block(s) that are obsolete
+        // and can be safely GC'ed.
+        //
+        long brStartTime = now();
+        BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+        cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
+            .getBlockListAsLongs());
+        long brTime = now() - brStartTime;
+        myMetrics.blockReports.inc(brTime);
+        LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
+            " blocks got processed in " + brTime + " msecs");
+        //
+        // If we have sent the first block report, then wait a random
+        // time before we start the periodic block reports.
+        //
+        if (resetBlockReportTime) {
+          lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
+          resetBlockReportTime = false;
+        } else {
+          /* say the last block report was at 8:20:14. The current report
+           * should have started around 9:20:14 (default 1 hour interval).
+           * If current time is :
+           *   1) normal like 9:20:18, next report should be at 10:20:14
+           *   2) unexpected like 11:35:43, next report should be at 12:20:14
+           */
+          lastBlockReport += (now() - lastBlockReport) /
+          blockReportInterval * blockReportInterval;
+        }
+        LOG.info("sent block report, processed command:" + cmd);
+      }
+      return cmd;
+    }
+    
+    
+    DatanodeCommand [] sendHeartBeat() throws IOException {
+      return bpNamenode.sendHeartbeat(bpRegistration,
+          data.getCapacity(),
+          data.getDfsUsed(),
+          data.getRemaining(),
+          data.getBlockPoolUsed(blockPoolId),
+          xmitsInProgress.get(),
+          getXceiverCount());
+    }
+    
+    //This must be called only by blockPoolManager
+    void start() {
+      if ((bpThread != null) && (bpThread.isAlive())) {
+        //Thread is started already
+        return;
+      }
+      bpThread = new Thread(this, dnThreadName);
+      bpThread.setDaemon(true); // needed for JUnit testing
+      bpThread.start();
+    }
+    
+    //This must be called only by blockPoolManager.
+    void stop() {
+      shouldServiceRun = false;
+      if (bpThread != null) {
+          bpThread.interrupt();
+      }
+    }
+    
+    //This must be called only by blockPoolManager
+    void join() {
+      try {
+        if (bpThread != null) {
+          bpThread.join();
+        }
+      } catch (InterruptedException ie) { }
+    }
+    
+    //Cleanup method to be called by current thread before exiting.
+    private synchronized void cleanUp() {
+      
+      if(upgradeManager != null)
+        upgradeManager.shutdownUpgrade();
+      
+      blockPoolManager.remove(this);
+      shouldServiceRun = false;
+      RPC.stopProxy(bpNamenode);
+      if (blockScanner != null) {
+        blockScanner.removeBlockPool(this.getBlockPoolId());
+      }
+     
+      data.shutdownBlockPool(this.getBlockPoolId());
+      storage.removeBlockPoolStorage(this.getBlockPoolId());
+    }
+
+    /**
+     * Main loop for each BP thread. Run until shutdown,
+     * forever calling remote NameNode functions.
+     */
+    private void offerService() throws Exception {
+      LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+          + blockReportInterval + "msec" + " Initial delay: "
+          + initialBlockReportDelay + "msec" + "; heartBeatInterval="
+          + heartBeatInterval);
+
+      //
+      // Now loop for a long time....
+      //
+      while (shouldRun && shouldServiceRun) {
+        try {
+          long startTime = now();
+
+          //
+          // Every so often, send heartbeat or block-report
+          //
+          if (startTime - lastHeartbeat > heartBeatInterval) {
+            //
+            // All heartbeat messages include following info:
+            // -- Datanode name
+            // -- data transfer port
+            // -- Total capacity
+            // -- Bytes remaining
+            //
+            lastHeartbeat = startTime;
+            DatanodeCommand[] cmds = sendHeartBeat();
+            myMetrics.heartbeats.inc(now() - startTime);
+            if (!processCommand(cmds))
+              continue;
+          }
+
+          reportReceivedBlocks();
+
+          DatanodeCommand cmd = blockReport();
+          processCommand(cmd);
+
+          // Now safe to start scanning the block pool
+          if (blockScanner != null) {
+            blockScanner.addBlockPool(this.blockPoolId);
+          }
+
+          //
+          // There is no work to do;  sleep until hearbeat timer elapses, 
+          // or work arrives, and then iterate again.
+          //
+          long waitTime = heartBeatInterval - 
+          (System.currentTimeMillis() - lastHeartbeat);
+          synchronized(receivedBlockList) {
+            if (waitTime > 0 && receivedBlockList.size() == 0) {
+              try {
+                receivedBlockList.wait(waitTime);
+              } catch (InterruptedException ie) {
+                LOG.warn("BPOfferService for block pool="
+                    + this.getBlockPoolId() + " received exception:" + ie);
+              }
+            }
+          } // synchronized
+        } catch(RemoteException re) {
+          String reClass = re.getClassName();
+          if (UnregisteredNodeException.class.getName().equals(reClass) ||
+              DisallowedDatanodeException.class.getName().equals(reClass) ||
+              IncorrectVersionException.class.getName().equals(reClass)) {
+            LOG.warn("blockpool " + blockPoolId + " is shutting down: " + 
+                StringUtils.stringifyException(re));
+            shouldServiceRun = false;
+            return;
+          }
+          LOG.warn(StringUtils.stringifyException(re));
+          try {
+            long sleepTime = Math.min(1000, heartBeatInterval);
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+        } catch (IOException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      } // while (shouldRun && shouldServiceRun)
+    } // offerService
+
+    /**
+     * Register one bp with the corresponding NameNode
+     * <p>
+     * The bpDatanode needs to register with the namenode on startup in order
+     * 1) to report which storage it is serving now and 
+     * 2) to receive a registrationID
+     *  
+     * issued by the namenode to recognize registered datanodes.
+     * 
+     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+     * @throws IOException
+     */
+    void register() throws IOException {
+      LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
+          + bpRegistration.storageInfo); 
+                
+      while(shouldRun && shouldServiceRun) {
+        try {
+          // Use returned registration from namenode with updated machine name.
+          bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+
+          LOG.info("bpReg after =" + bpRegistration.storageInfo + 
+              ";sid=" + bpRegistration.storageID + ";name="+bpRegistration.getName());
+
+          NetUtils.getHostname();
+          hostName = bpRegistration.getHost();
+          break;
+        } catch(SocketTimeoutException e) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + nnAddr);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
+
+      if (storage.getStorageID().equals("")) {
+        storage.setStorageID(bpRegistration.getStorageID());
+        storage.writeAll();
+        LOG.info("New storage id " + bpRegistration.getStorageID()
+            + " is assigned to data-node " + bpRegistration.getName());
+      } else if(!storage.getStorageID().equals(bpRegistration.getStorageID())) {
+        throw new IOException("Inconsistent storage IDs. Name-node returned "
+            + bpRegistration.getStorageID() 
+            + ". Expecting " + storage.getStorageID());
+      }
+
+      if (!isBlockTokenInitialized) {
+        /* first time registering with NN */
+        ExportedBlockKeys keys = bpRegistration.exportedKeys;
+        isBlockTokenEnabled = keys.isBlockTokenEnabled();
+        if (isBlockTokenEnabled) {
+          long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+          long blockTokenLifetime = keys.getTokenLifetime();
+          LOG.info("Block token params received from NN: for block pool " +
+              blockPoolId + " keyUpdateInterval="
+              + blockKeyUpdateInterval / (60 * 1000)
+              + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+              + " min(s)");
+          final BlockTokenSecretManager secretMgr = 
+            new BlockTokenSecretManager(false, 0, blockTokenLifetime);
+          blockPoolTokenSecretManager.addBlockPool(blockPoolId, secretMgr);
+        }
+        isBlockTokenInitialized = true;
+      }
+
+      if (isBlockTokenEnabled) {
+        blockPoolTokenSecretManager.setKeys(blockPoolId,
+            bpRegistration.exportedKeys);
+        bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
+      }
+
+      LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
+
+      // random short delay - helps scatter the BR from all DNs
+      scheduleBlockReport(initialBlockReportDelay);
+    }
+
+
+    /**
+     * No matter what kind of exception we get, keep retrying to offerService().
+     * That's the loop that connects to the NameNode and provides basic DataNode
+     * functionality.
+     *
+     * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
+     * happen either at shutdown or due to refreshNamenodes.
+     */
+    @Override
+    public void run() {
+      LOG.info(bpRegistration + "In BPOfferService.run, data = " + data
+          + ";bp=" + blockPoolId);
+
+      try {
+        // init stuff
+        try {
+          // setup storage
+          setupBP(conf, dataDirs);
+          register();
+        } catch (IOException ioe) {
+          // Initial handshake, storage recovery or registration failed
+          // End BPOfferService thread
+          LOG.fatal(bpRegistration + " initialization failed for block pool "
+              + blockPoolId, ioe);
+          return;
+        }
+
+        initialized = true; // bp is initialized;
+        
+        while (shouldRun && shouldServiceRun) {
+          try {
+            startDistributedUpgradeIfNeeded();
+            offerService();
+          } catch (Exception ex) {
+            LOG.error("Exception: " + StringUtils.stringifyException(ex));
+            if (shouldRun && shouldServiceRun) {
+              try {
+                Thread.sleep(5000);
+              } catch (InterruptedException ie) {
+                LOG.warn("Received exception: ", ie);
+              }
+            }
+          }
+        }
+      } catch (Throwable ex) {
+        LOG.warn("Unexpected exception ", ex);
+      } finally {
+        LOG.warn(bpRegistration + " ending block pool service for: " 
+            + blockPoolId);
+        cleanUp();
+      }
+    }
+
+    /**
+     * Process an array of datanode commands
+     * 
+     * @param cmds an array of datanode commands
+     * @return true if further processing may be required or false otherwise. 
+     */
+    private boolean processCommand(DatanodeCommand[] cmds) {
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          try {
+            if (processCommand(cmd) == false) {
+              return false;
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Error processing datanode Command", ioe);
+          }
+        }
+      }
+      return true;
+    }
+
+    /**
+     * 
+     * @param cmd
+     * @return true if further processing may be required or false otherwise. 
+     * @throws IOException
+     */
+    private boolean processCommand(DatanodeCommand cmd) throws IOException {
+      if (cmd == null)
+        return true;
+      final BlockCommand bcmd = 
+        cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
+      switch(cmd.getAction()) {
+      case DatanodeProtocol.DNA_TRANSFER:
+        // Send a copy of a block to another datanode
+        transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+        myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
+        break;
+      case DatanodeProtocol.DNA_INVALIDATE:
+        //
+        // Some local block(s) are obsolete and can be 
+        // safely garbage-collected.
+        //
+        Block toDelete[] = bcmd.getBlocks();
+        try {
+          if (blockScanner != null) {
+            blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
+          }
+          // using global fsdataset
+          data.invalidate(bcmd.getBlockPoolId(), toDelete);
+        } catch(IOException e) {
+          checkDiskError();
+          throw e;
+        }
+        myMetrics.blocksRemoved.inc(toDelete.length);
+        break;
+      case DatanodeProtocol.DNA_SHUTDOWN:
+        // shut down the data node
+        shouldServiceRun = false;
+        return false;
+      case DatanodeProtocol.DNA_REGISTER:
+        // namenode requested a registration - at start or if NN lost contact
+        LOG.info("DatanodeCommand action: DNA_REGISTER");
+        if (shouldRun && shouldServiceRun) {
+          register();
+        }
+        break;
+      case DatanodeProtocol.DNA_FINALIZE:
+        storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
+            .getBlockPoolId());
+        break;
+      case UpgradeCommand.UC_ACTION_START_UPGRADE:
+        // start distributed upgrade here
+        processDistributedUpgradeCommand((UpgradeCommand)cmd);
+        break;
+      case DatanodeProtocol.DNA_RECOVERBLOCK:
+        recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+        break;
+      case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+        LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+        if (isBlockTokenEnabled) {
+          blockPoolTokenSecretManager.setKeys(blockPoolId, 
+              ((KeyUpdateCommand) cmd).getExportedKeys());
+        }
+        break;
+      default:
+        LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+      }
+      return true;
+    }
+    
+    private void processDistributedUpgradeCommand(UpgradeCommand comm)
+    throws IOException {
+      UpgradeManagerDatanode upgradeManager = getUpgradeManager();
+      upgradeManager.processUpgradeCommand(comm);
+    }
+
+    synchronized UpgradeManagerDatanode getUpgradeManager() {
+      if(upgradeManager == null)
+        upgradeManager = 
+          new UpgradeManagerDatanode(DataNode.getDataNode(), blockPoolId);
+      
+      return upgradeManager;
+    }
+    
+    /**
+     * Start distributed upgrade if it should be initiated by the data-node.
+     */
+    private void startDistributedUpgradeIfNeeded() throws IOException {
+      UpgradeManagerDatanode um = getUpgradeManager();
+      
+      if(!um.getUpgradeState())
+        return;
+      um.setUpgradeState(false, um.getUpgradeVersion());
+      um.startUpgrade();
+      return;
+    }
+  }
+
   /**
    * This method starts the data node with the specified conf.
    * 
@@ -495,32 +1353,69 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      AbstractList<File> dataDirs,
-                     DatanodeProtocol namenode, SecureResources resources
+                    // DatanodeProtocol namenode,
+                     SecureResources resources
                      ) throws IOException {
     if(UserGroupInformation.isSecurityEnabled() && resources == null)
       throw new RuntimeException("Cannot start secure cluster without " +
       "privileged resources.");
 
+    // settings global for all BPs in the Data Node
     this.secureResources = resources;
-    this.namenode = namenode;
+    this.dataDirs = dataDirs;
+    this.conf = conf;
+
     storage = new DataStorage();
     
+    // global DN settings
     initConfig(conf);
     registerMXBean();
     initDataXceiver(conf);
-    initFsDataSet(conf, dataDirs);
-    initBlockScanner(conf);
     startInfoServer(conf);
   
-    myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
-    // TODO check what code removed here
-
+    // BlockPoolTokenSecretManager is required to create ipc server.
+    this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
     initIpcServer(conf);
-    startPlugins(conf);
-    
-    // BlockTokenSecretManager is created here, but it shouldn't be
-    // used until it is initialized in register().
-    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+
+    myMetrics = new DataNodeMetrics(conf, getMachineName());
+
+    blockPoolManager = new BlockPoolManager(conf);
+  }
+  
+  BPOfferService[] getAllBpOs() {
+    return blockPoolManager.getAllNamenodeThreads();
+  }
+  
+  /**
+   * Initializes the {@link #data}. The initialization is done only once, when
+   * handshake with the the first namenode is completed.
+   */
+  private synchronized void initFsDataSet(Configuration conf,
+      AbstractList<File> dataDirs) throws IOException {
+    if (data != null) { // Already initialized
+      return;
+    }
+
+    // get version and id info from the name-node
+    boolean simulatedFSDataset = 
+      conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+
+    if (simulatedFSDataset) {
+      storage.createStorageID();
+      // it would have been better to pass storage as a parameter to
+      // constructor below - need to augment ReflectionUtils used below.
+      conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
+      try {
+        data = (FSDatasetInterface) ReflectionUtils.newInstance(
+            Class.forName(
+            "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
+            conf);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(StringUtils.stringifyException(e));
+      }
+    } else {
+      data = new FSDataset(storage, conf);
+    }
   }
 
   /**
@@ -544,7 +1439,58 @@ public class DataNode extends Configured
       LOG.warn("Failed to register DataNode MXBean", e);
     }
   }
-
+  
+  int getPort() {
+    return selfAddr.getPort();
+  }
+  
+  String getStorageId() {
+    return storage.getStorageID();
+  }
+  
+  /** 
+   * Get host:port with host set to Datanode host and port set to the
+   * port {@link DataXceiver} is serving.
+   * @return host:port string
+   */
+  public String getMachineName() {
+    return hostName + ":" + getPort();
+  }
+  
+  public int getIpcPort() {
+    return ipcServer.getListenerAddress().getPort();
+  }
+  
+  /**
+   * get BP registration by blockPool id
+   * @param bpid
+   * @return BP registration object
+   * @throws IOException
+   */
+  DatanodeRegistration getDNRegistrationForBP(String bpid) 
+  throws IOException {
+    BPOfferService bpos = blockPoolManager.get(bpid);
+    if(bpos==null || bpos.bpRegistration==null) {
+      throw new IOException("cannot find BPOfferService for bpid="+bpid);
+    }
+    return bpos.bpRegistration;
+  }
+  
+  /**
+   * get BP registration by machine and port name (host:port)
+   * @param mName
+   * @return BP registration 
+   * @throws IOException 
+   */
+  DatanodeRegistration getDNRegistrationByMachineName(String mName) {
+    BPOfferService [] bposArray = blockPoolManager.getAllNamenodeThreads();
+    for (BPOfferService bpos : bposArray) {
+      if(bpos.bpRegistration.getName().equals(mName))
+        return bpos.bpRegistration;
+    }
+    return null;
+  }
+  
   /**
    * Creates either NIO or regular depending on socketWriteTimeout.
    */
@@ -552,48 +1498,12 @@ public class DataNode extends Configured
     return (socketWriteTimeout > 0) ? 
            SocketChannel.open().socket() : new Socket();                                   
   }
-  
-  private NamespaceInfo handshake() throws IOException {
-    NamespaceInfo nsInfo = new NamespaceInfo();
-    while (shouldRun) {
-      try {
-        nsInfo = namenode.versionRequest();
-        break;
-      } catch(IOException e) {  // namenode cannot be contacted
-        LOG.info("Problem connecting to server: " + getNameNodeAddr(), e);
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {}
-    }
-    String errorMsg = null;
-    // verify build version
-    if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
-      errorMsg = "Incompatible build versions: namenode BV = " 
-        + nsInfo.getBuildVersion() + "; datanode BV = "
-        + Storage.getBuildVersion();
-      LOG.fatal( errorMsg );
-      try {
-        namenode.errorReport( dnRegistration,
-                              DatanodeProtocol.NOTIFY, errorMsg );
-      } catch( SocketTimeoutException e ) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + getNameNodeAddr());
-      }
-      throw new IOException( errorMsg );
-    }
-    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
-      "Data-node and name-node layout versions must be the same."
-      + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
-    return nsInfo;
-  }
 
   private static void setDataNode(DataNode node) {
     datanodeObject = node;
   }
 
-  /** Return the DataNode object
-   * 
-   */
+  /** Return the DataNode object */
   public static DataNode getDataNode() {
     return datanodeObject;
   } 
@@ -622,12 +1532,18 @@ public class DataNode extends Configured
     }
   }
 
-  public InetSocketAddress getNameNodeAddr() {
-    return nameNodeAddr;
-  }
-  
-  public InetSocketAddress getNameNodeAddrForClient() {
-    return nameNodeAddrForClient;
+  /**
+   * get the name node address based on the block pool id
+   * @param bpid block pool ID
+   * @return namenode address corresponding to the bpid
+   */
+  public InetSocketAddress getNameNodeAddr(String bpid) {
+    BPOfferService bp = blockPoolManager.get(bpid);
+    if (bp != null) {
+      return bp.getNNSocketAddress();
+    }
+    LOG.warn("No name node address found for block pool ID " + bpid);
+    return null;
   }
   
   public InetSocketAddress getSelfAddr() {
@@ -638,12 +1554,16 @@ public class DataNode extends Configured
     return myMetrics;
   }
   
-  /** Return DatanodeRegistration */
-  public DatanodeRegistration getDatanodeRegistration() {
-    return dnRegistration;
+  public static void setNewStorageID(DatanodeID dnId) {
+    LOG.info("Datanode is " + dnId);
+    dnId.storageID = createNewStorageId(dnId.getPort());
   }
-
-  public static void setNewStorageID(DatanodeRegistration dnReg) {
+  
+  static String createNewStorageId() {
+    return createNewStorageId(datanodeObject.getPort());
+  }
+  
+  private static String createNewStorageId(int port) {
     /* Return 
      * "DS-randInt-ipaddr-currentTimeMillis"
      * It is considered extermely rare for all these numbers to match
@@ -670,84 +1590,8 @@ public class DataNode extends Configured
       LOG.warn("Could not use SecureRandom");
       rand = R.nextInt(Integer.MAX_VALUE);
     }
-    dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" + 
-                      System.currentTimeMillis();
-  }
-  /**
-   * Register datanode
-   * <p>
-   * The datanode needs to register with the namenode on startup in order
-   * 1) to report which storage it is serving now and 
-   * 2) to receive a registrationID 
-   * issued by the namenode to recognize registered datanodes.
-   * 
-   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
-   * @throws IOException
-   */
-  private void register() throws IOException {
-    if (dnRegistration.getStorageID().equals("")) {
-      setNewStorageID(dnRegistration);
-    }
-    while(shouldRun) {
-      try {
-        // reset name to machineName. Mainly for web interface.
-        dnRegistration.name = machineName + ":" + dnRegistration.getPort();
-        dnRegistration = namenode.registerDatanode(dnRegistration);
-        break;
-      } catch(RemoteException re) {
-        IOException ue = re.unwrapRemoteException(
-                           UnregisteredNodeException.class,
-                           DisallowedDatanodeException.class,
-                           IncorrectVersionException.class);
-        if (ue != re) {
-          LOG.warn("DataNode is shutting down: ", re);
-          throw ue; 
-        }
-      } catch(IOException e) {  // namenode cannot be contacted
-        LOG.info("Problem connecting to server: " + getNameNodeAddr(), e);
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ie) {}
-    }
-    assert ("".equals(storage.getStorageID()) 
-            && !"".equals(dnRegistration.getStorageID()))
-            || storage.getStorageID().equals(dnRegistration.getStorageID()) :
-            "New storageID can be assigned only if data-node is not formatted";
-    if (storage.getStorageID().equals("")) {
-      storage.setStorageID(dnRegistration.getStorageID());
-      storage.writeAll();
-      LOG.info("New storage id " + dnRegistration.getStorageID()
-          + " is assigned to data-node " + dnRegistration.getName());
-    }
-    if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
-      throw new IOException("Inconsistent storage IDs. Name-node returned "
-          + dnRegistration.getStorageID() 
-          + ". Expecting " + storage.getStorageID());
-    }
-    
-    if (!isBlockTokenInitialized) {
-      /* first time registering with NN */
-      ExportedBlockKeys keys = dnRegistration.exportedKeys;
-      this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-      if (isBlockTokenEnabled) {
-        long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-        long blockTokenLifetime = keys.getTokenLifetime();
-        LOG.info("Block token params received from NN: keyUpdateInterval="
-            + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-            + blockTokenLifetime / (60 * 1000) + " min(s)");
-        blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
-      }
-      isBlockTokenInitialized = true;
-    }
-
-    if (isBlockTokenEnabled) {
-      blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
-      dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
-    }
-
-    // random short delay - helps scatter the BR from all DNs
-    scheduleBlockReport(initialBlockReportDelay);
+    return "DS-" + rand + "-" + ip + "-" + port + "-"
+        + System.currentTimeMillis();
   }
 
   /**
@@ -768,6 +1612,8 @@ public class DataNode extends Configured
       }
     }
     
+    shutdownPeriodicScanners();
+    
     if (infoServer != null) {
       try {
         infoServer.stop();
@@ -778,6 +1624,7 @@ public class DataNode extends Configured
     if (ipcServer != null) {
       ipcServer.stop();
     }
+    
     this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
@@ -809,17 +1656,14 @@ public class DataNode extends Configured
       }
     }
     
-    RPC.stopProxy(namenode); // stop the RPC threads
-    
-    if(upgradeManager != null)
-      upgradeManager.shutdownUpgrade();
-    if (blockScannerThread != null) { 
-      blockScannerThread.interrupt();
+    if(blockPoolManager != null) {
       try {
-        blockScannerThread.join(3600000L); // wait for at most 1 hour
+        this.blockPoolManager.shutDownAll();
       } catch (InterruptedException ie) {
+        LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
       }
     }
+    
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -827,13 +1671,6 @@ public class DataNode extends Configured
         LOG.warn("Exception when unlocking storage: " + ie, ie);
       }
     }
-    if (dataNodeThread != null) {
-      dataNodeThread.interrupt();
-      try {
-        dataNodeThread.join();
-      } catch (InterruptedException ie) {
-      }
-    }
     if (data != null) {
       data.shutdown();
     }
@@ -873,329 +1710,58 @@ public class DataNode extends Configured
   private void handleDiskError(String errMsgr) {
     final boolean hasEnoughResources = data.hasEnoughResource();
     LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources);
-
+    
     // If we have enough active valid volumes then we do not want to 
     // shutdown the DN completely.
     int dpError = hasEnoughResources ? DatanodeProtocol.DISK_ERROR  
                                      : DatanodeProtocol.FATAL_DISK_ERROR;  
-    
     myMetrics.volumesFailed.inc(1);
-    try {
-      namenode.errorReport(dnRegistration, dpError, errMsgr);
-    } catch (IOException e) {
-      LOG.warn("Error reporting disk failure to NameNode: " + 
-          StringUtils.stringifyException(e));
+
+    //inform NameNodes
+    for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) {
+      DatanodeProtocol nn = bpos.bpNamenode;
+      try {
+        nn.errorReport(bpos.bpRegistration, dpError, errMsgr);
+      } catch(IOException e) {
+        LOG.warn("Error reporting disk failure to NameNode: " + 
+            StringUtils.stringifyException(e));
+      }
     }
     
-    if (hasEnoughResources) {
-      scheduleBlockReport(0);
+    if(hasEnoughResources) {
+      scheduleAllBlockReport(0);
       return; // do not shutdown
     }
     
     LOG.warn("DataNode is shutting down: " + errMsgr);
-    shouldRun = false; 
+    shouldRun = false;
   }
     
   /** Number of concurrent xceivers per node. */
   int getXceiverCount() {
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
-    
-  /**
-   * Main loop for the DataNode.  Runs until shutdown,
-   * forever calling remote NameNode functions.
-   */
-  public void offerService() throws Exception {
-     
-    LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
-       " Initial delay: " + initialBlockReportDelay + "msec");
-
-    //
-    // Now loop for a long time....
-    //
-    while (shouldRun) {
-      try {
-        long startTime = now();
-
-        //
-        // Every so often, send heartbeat or block-report
-        //
-        
-        if (startTime - lastHeartbeat > heartBeatInterval) {
-          //
-          // All heartbeat messages include following info:
-          // -- Datanode name
-          // -- data transfer port
-          // -- Total capacity
-          // -- Bytes remaining
-          //
-          lastHeartbeat = startTime;
-          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
-                                                       data.getCapacity(),
-                                                       data.getDfsUsed(),
-                                                       data.getRemaining(),
-                                                       xmitsInProgress.get(),
-                                                       getXceiverCount());
-          myMetrics.heartbeats.inc(now() - startTime);
-          if (!processCommand(cmds))
-            continue;
-        }
-            
-        reportReceivedBlocks();
-
-        DatanodeCommand cmd = blockReport();
-        processCommand(cmd);
-
-        // start block scanner
-        if (blockScanner != null && blockScannerThread == null &&
-            upgradeManager.isUpgradeCompleted()) {
-          LOG.info("Starting Periodic block scanner.");
-          blockScannerThread = new Daemon(blockScanner);
-          blockScannerThread.start();
-        }
-            
-        //
-        // There is no work to do;  sleep until hearbeat timer elapses, 
-        // or work arrives, and then iterate again.
-        //
-        long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedBlockList) {
-          if (waitTime > 0 && receivedBlockList.size() == 0) {
-            try {
-              receivedBlockList.wait(waitTime);
-            } catch (InterruptedException ie) {
-            }
-          }
-        } // synchronized
-      } catch(RemoteException re) {
-        String reClass = re.getClassName();
-        if (UnregisteredNodeException.class.getName().equals(reClass) ||
-            DisallowedDatanodeException.class.getName().equals(reClass) ||
-            IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn("DataNode is shutting down: " + 
-                   StringUtils.stringifyException(re));
-          shutdown();
-          return;
-        }
-        LOG.warn(StringUtils.stringifyException(re));
-        try {
-          long sleepTime = Math.min(1000, heartBeatInterval);
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-      } catch (IOException e) {
-        LOG.warn(StringUtils.stringifyException(e));
-      }
-    } // while (shouldRun)
-  } // offerService
-
-  /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
-   */
-  private boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (processCommand(cmd) == false) {
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
-      }
-    }
-    return true;
-  }
-  
-    /**
-     * 
-     * @param cmd
-     * @return true if further processing may be required or false otherwise. 
-     * @throws IOException
-     */
-  private boolean processCommand(DatanodeCommand cmd) throws IOException {
-    if (cmd == null)
-      return true;
-    final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
-    switch(cmd.getAction()) {
-    case DatanodeProtocol.DNA_TRANSFER:
-      // Send a copy of a block to another datanode
-      transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
-      myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
-      break;
-    case DatanodeProtocol.DNA_INVALIDATE:
-      //
-      // Some local block(s) are obsolete and can be 
-      // safely garbage-collected.
-      //
-      Block toDelete[] = bcmd.getBlocks();
-      try {
-        if (blockScanner != null) {
-          blockScanner.deleteBlocks(toDelete);
-        }
-        data.invalidate(toDelete);
-      } catch(IOException e) {
-        checkDiskError();
-        throw e;
-      }
-      myMetrics.blocksRemoved.inc(toDelete.length);
-      break;
-    case DatanodeProtocol.DNA_SHUTDOWN:
-      // shut down the data node
-      this.shutdown();
-      return false;
-    case DatanodeProtocol.DNA_REGISTER:
-      // namenode requested a registration - at start or if NN lost contact
-      LOG.info("DatanodeCommand action: DNA_REGISTER");
-      if (shouldRun) {
-        register();
-      }
-      break;
-    case DatanodeProtocol.DNA_FINALIZE:
-      storage.finalizeUpgrade();
-      break;
-    case UpgradeCommand.UC_ACTION_START_UPGRADE:
-      // start distributed upgrade here
-      processDistributedUpgradeCommand((UpgradeCommand)cmd);
-      break;
-    case DatanodeProtocol.DNA_RECOVERBLOCK:
-      recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
-      break;
-    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
-      LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-      if (isBlockTokenEnabled) {
-        blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
-      }
-      break;
-    default:
-      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
-    }
-    return true;
-  }
-
-  // Distributed upgrade manager
-  UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
-
-  private void processDistributedUpgradeCommand(UpgradeCommand comm
-                                               ) throws IOException {
-    assert upgradeManager != null : "DataNode.upgradeManager is null.";
-    upgradeManager.processUpgradeCommand(comm);
-  }
-
-  /**
-   * Report received blocks and delete hints to the Namenode
-   * @throws IOException
-   */
-  private void reportReceivedBlocks() throws IOException {
-    //check if there are newly received blocks
-    Block [] blockArray=null;
-    String [] delHintArray=null;
-    synchronized(receivedBlockList) {
-      synchronized(delHints){
-        int numBlocks = receivedBlockList.size();
-        if (numBlocks > 0) {
-          if(numBlocks!=delHints.size()) {
-            LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
-          }
-          //
-          // Send newly-received blockids to namenode
-          //
-          blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-          delHintArray = delHints.toArray(new String[numBlocks]);
-        }
-      }
-    }
-    if (blockArray != null) {
-      if(delHintArray == null || delHintArray.length != blockArray.length ) {
-        LOG.warn("Panic: block array & delHintArray are not the same" );
-      }
-      namenode.blockReceived(dnRegistration, blockArray, delHintArray);
-      synchronized(receivedBlockList) {
-        synchronized(delHints){
-          for(int i=0; i<blockArray.length; i++) {
-            receivedBlockList.remove(blockArray[i]);
-            delHints.remove(delHintArray[i]);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Report the list blocks to the Namenode
-   * @throws IOException
-   */
-  private DatanodeCommand blockReport() throws IOException {
-    // send block report if timer has expired.
-    DatanodeCommand cmd = null;
-    long startTime = now();
-    if (startTime - lastBlockReport > blockReportInterval) {
-
-      // Create block report
-      long brCreateStartTime = now();
-      BlockListAsLongs bReport = data.getBlockReport();
-
-      // Send block report
-      long brSendStartTime = now();
-      cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
-
-      // Log the block report processing stats from Datanode perspective
-      long brSendCost = now() - brSendStartTime;
-      long brCreateCost = brSendStartTime - brCreateStartTime;
-      myMetrics.blockReports.inc(brSendCost);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
-          + " blocks took " + brCreateCost + " msec to generate and "
-          + brSendCost + " msecs for RPC and NN processing");
 
-      //
-      // If we have sent the first block report, then wait a random
-      // time before we start the periodic block reports.
-      //
-      if (resetBlockReportTime) {
-        lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
-        resetBlockReportTime = false;
-      } else {
-        /* say the last block report was at 8:20:14. The current report
-         * should have started around 9:20:14 (default 1 hour interval).
-         * If current time is :
-         *   1) normal like 9:20:18, next report should be at 10:20:14
-         *   2) unexpected like 11:35:43, next report should be at 12:20:14
-         */
-        lastBlockReport += (now() - lastBlockReport) /
-                           blockReportInterval * blockReportInterval;
-      }
+  static UpgradeManagerDatanode getUpgradeManagerDatanode(String bpid) {
+    DataNode dn = getDataNode();
+    BPOfferService bpos = dn.blockPoolManager.get(bpid);
+    if(bpos==null) {
+      return null;
     }
-    return cmd;
-  }
-
-  /**
-   * Start distributed upgrade if it should be initiated by the data-node.
-   */
-  private void startDistributedUpgradeIfNeeded() throws IOException {
-    UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
-    assert um != null : "DataNode.upgradeManager is null.";
-    if(!um.getUpgradeState())
-      return;
-    um.setUpgradeState(false, um.getUpgradeVersion());
-    um.startUpgrade();
-    return;
+    return bpos.getUpgradeManager();
   }
 
-  private void transferBlock( Block block, 
+  private void transferBlock( ExtendedBlock block, 
                               DatanodeInfo xferTargets[] 
                               ) throws IOException {
+    DatanodeProtocol nn = getBPNamenode(block.getBlockPoolId());
+    DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
+    
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
       String errStr = "Can't send invalid block " + block;
       LOG.info(errStr);
-      namenode.errorReport(dnRegistration, 
-                           DatanodeProtocol.INVALID_BLOCK, 
-                           errStr);
+      nn.errorReport(bpReg, DatanodeProtocol.INVALID_BLOCK, errStr);
       return;
     }
 
@@ -1203,9 +1769,9 @@ public class DataNode extends Configured
     long onDiskLength = data.getLength(block);
     if (block.getNumBytes() > onDiskLength) {
       // Shorter on-disk len indicates corruption so report NN the corrupt block
-      namenode.reportBadBlocks(new LocatedBlock[]{
+      nn.reportBadBlocks(new LocatedBlock[]{
           new LocatedBlock(block, new DatanodeInfo[] {
-              new DatanodeInfo(dnRegistration)})});
+              new DatanodeInfo(bpReg)})});
       LOG.info("Can't replicate block " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
@@ -1220,7 +1786,7 @@ public class DataNode extends Configured
           xfersBuilder.append(xferTargets[i].getName());
           xfersBuilder.append(" ");
         }
-        LOG.info(dnRegistration + " Starting thread to transfer block " + 
+        LOG.info(bpReg + " Starting thread to transfer block " + 
                  block + " to " + xfersBuilder);                       
       }
 
@@ -1229,39 +1795,17 @@ public class DataNode extends Configured
     }
   }
 
-  private void transferBlocks( Block blocks[], 
-                               DatanodeInfo xferTargets[][] 
-                               ) {
+  private void transferBlocks(String poolId, Block blocks[],
+      DatanodeInfo xferTargets[][]) {
     for (int i = 0; i < blocks.length; i++) {
       try {
-        transferBlock(blocks[i], xferTargets[i]);
+        transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
     }
   }
 
-  /*
-   * Informing the name node could take a long long time! Should we wait
-   * till namenode is informed before responding with success to the
-   * client? For now we don't.
-   */
-  protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
-    if(block==null || delHint==null) {
-      throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
-    }
-    synchronized (receivedBlockList) {
-      synchronized (delHints) {
-        receivedBlockList.add(block);
-        delHints.add(delHint);
-        receivedBlockList.notifyAll();
-      }
-    }
-  }
-
-  
-
-
   /* ********************************************************************
   Protocol when a client reads data from Datanode (Cur Ver: 9):
   
@@ -1352,15 +1896,16 @@ public class DataNode extends Configured
    */
   private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
-    final Block b;
+    final ExtendedBlock b;
     final BlockConstructionStage stage;
+    final private DatanodeRegistration bpReg;
     final String clientname;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage,
+    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
         final String clientname) throws IOException {
       if (DataTransferProtocol.LOG.isDebugEnabled()) {
         DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1372,6 +1917,8 @@ public class DataNode extends Configured
       this.targets = targets;
       this.b = b;
       this.stage = stage;
+      BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
+      bpReg = bpos.bpRegistration;
       this.clientname = clientname;
     }
 
@@ -1400,15 +1947,15 @@ public class DataNode extends Configured
                                                             SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, false, DataNode.this);
-        DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
+        DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
         // Header info
         //
         Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
         if (isBlockTokenEnabled) {
-          accessToken = blockTokenSecretManager.generateToken(null, b,
-          EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
 
         DataTransferProtocol.Sender.opWriteBlock(out,
@@ -1440,7 +1987,7 @@ public class DataNode extends Configured
           }
         }
       } catch (IOException ie) {
-        LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
+        LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
         // check if there are any disk problem
         checkDiskError();
@@ -1461,66 +2008,46 @@ public class DataNode extends Configured
    * @param block
    * @param delHint
    */
-  void closeBlock(Block block, String delHint) {
+  void closeBlock(ExtendedBlock block, String delHint) {
     myMetrics.blocksWritten.inc();
-    notifyNamenodeReceivedBlock(block, delHint);
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if(bpos != null) {
+      bpos.notifyNamenodeReceivedBlock(block, delHint);
+    } else {
+      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+          + block.getBlockPoolId());
+    }
     if (blockScanner != null) {
       blockScanner.addBlock(block);
     }
   }
 
-  /**
-   * No matter what kind of exception we get, keep retrying to offerService().
-   * That's the loop that connects to the NameNode and provides basic DataNode
-   * functionality.
-   *
-   * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+  /** Start a single datanode daemon and wait for it to finish.
+   *  If this thread is specifically interrupted, it will stop waiting.
    */
-  public void run() {
-    LOG.info(dnRegistration + "In DataNode.run, data = " + data);
+  public void runDatanodeDaemon() throws IOException {
+    blockPoolManager.startAll();
 
     // start dataXceiveServer
     dataXceiverServer.start();
     ipcServer.start();
-        
-    while (shouldRun) {
-      try {
-        startDistributedUpgradeIfNeeded();
-        offerService();
-      } catch (Exception ex) {
-        LOG.error("Exception: " + StringUtils.stringifyException(ex));
-        if (shouldRun) {
-          try {
-            Thread.sleep(5000);
-          } catch (InterruptedException ie) {
-          }
-        }
-      }
-    }
-        
-    LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
-    shutdown();
+    startPlugins(conf);
   }
-    
-  /** Start a single datanode daemon and wait for it to finish.
-   *  If this thread is specifically interrupted, it will stop waiting.
+
+  /**
+   * A data node is considered to be up if one of the bp services is up
    */
-  public static void runDatanodeDaemon(DataNode dn) throws IOException {
-    if (dn != null) {
-      //register datanode
-      dn.register();
-      dn.dataNodeThread = new Thread(dn, dnThreadName);
-      dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
-      dn.dataNodeThread.start();
+  public boolean isDatanodeUp() {
+    for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) {
+      if (bp.isAlive()) {
+        return true;
+      }
     }
-  }
-  
-  static boolean isDatanodeUp(DataNode dn) {
-    return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
+    return false;
   }
 
   /** Instantiate a single datanode object. This must be run by invoking
-   *  {@link DataNode#runDatanodeDaemon(DataNode)} subsequently. 
+   *  {@link DataNode#runDatanodeDaemon()} subsequently. 
    */
   public static DataNode instantiateDataNode(String args[],
                                       Configuration conf) throws IOException {
@@ -1528,7 +2055,7 @@ public class DataNode extends Configured
   }
   
   /** Instantiate a single datanode object, along with its secure resources. 
-   * This must be run by invoking{@link DataNode#runDatanodeDaemon(DataNode)} 
+   * This must be run by invoking{@link DataNode#runDatanodeDaemon()} 
    * subsequently. 
    */
   public static DataNode instantiateDataNode(String args [], Configuration conf,
@@ -1555,14 +2082,14 @@ public class DataNode extends Configured
     dnThreadName = "DataNode: [" +
                     StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
    UserGroupInformation.setConfiguration(conf);
-    SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
-        DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
+    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
+        DFS_DATANODE_USER_NAME_KEY);
     return makeInstance(dataDirs, conf, resources);
   }
 
   static Collection<URI> getStorageDirs(Configuration conf) {
     Collection<String> dirNames =
-      conf.getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+      conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
     return Util.stringCollectionAsURIs(dirNames);
   }
 
@@ -1581,15 +2108,20 @@ public class DataNode extends Configured
   public static DataNode createDataNode(String args[], Configuration conf,
       SecureResources resources) throws IOException {
     DataNode dn = instantiateDataNode(args, conf, resources);
-    runDatanodeDaemon(dn);
+    if (dn != null) {
+      dn.runDatanodeDaemon();
+    }
     return dn;
   }
 
   void join() {
-    if (dataNodeThread != null) {
+    while (shouldRun) {
       try {
-        dataNodeThread.join();
-      } catch (InterruptedException e) {}
+        blockPoolManager.joinAll();
+        Thread.sleep(2000);
+      } catch (InterruptedException ex) {
+        LOG.warn("Received exception in Datanode#join: " + ex);
+      }
     }
   }
 
@@ -1609,8 +2141,8 @@ public class DataNode extends Configured
       SecureResources resources) throws IOException {
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     FsPermission permission = new FsPermission(
-        conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
-                 DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
+        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
+                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
     ArrayList<File> dirs = getDataDirsFromURIs(dataDirs, localFS, permission);
 
     assert dirs.size() > 0 : "number of data directories should be > 0";
@@ -1635,29 +2167,24 @@ public class DataNode extends Configured
         dirs.add(data);
       } catch (IOException e) {
         LOG.warn("Invalid directory in: "
-                 + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": ", e);
+                 + DFS_DATANODE_DATA_DIR_KEY + ": ", e);
         invalidDirs.append("\"").append(data.getCanonicalPath()).append("\" ");
       }
     }
     if (dirs.size() == 0)
       throw new IOException("All directories in "
-          + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
+          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
           + invalidDirs);
     return dirs;
   }
 
   @Override
   public String toString() {
-    return "DataNode{" +
-      "data=" + data +
-      (dnRegistration != null ?
-          (", localName='" + dnRegistration.getName() + "'" +
-              ", storageID='" + dnRegistration.getStorageID() + "'")
-          : "") +
-      ", xmitsInProgress=" + xmitsInProgress.get() +
-      "}";
+    return "DataNode{data=" + data + ", localName='" + getMachineName()
+        + "', storageID='" + getStorageId() + "', xmitsInProgress="
+        + xmitsInProgress.get() + "}";
   }
-  
+
   private static void printUsage() {
     System.err.println("Usage: java DataNode");
     System.err.println("           [-rollback]");
@@ -1699,19 +2226,15 @@ public class DataNode extends Configured
   }
 
   /**
-   * This methods  arranges for the data node to send the block report at the next heartbeat.
+   * This methods  arranges for the data node to send 
+   * the block report at the next heartbeat.
    */
-  public void scheduleBlockReport(long delay) {
-    if (delay > 0) { // send BR after random delay
-      lastBlockReport = System.currentTimeMillis()
-                            - ( blockReportInterval - R.nextInt((int)(delay)));
-    } else { // send at next heartbeat
-      lastBlockReport = lastHeartbeat - blockReportInterval;
+  public void scheduleAllBlockReport(long delay) {
+    for(BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
+      bpos.scheduleBlockReport(delay);
     }
-    resetBlockReportTime = true; // reset future BRs for randomness
   }
-  
-  
+
   /**
    * This method is used for testing. 
    * Examples are adding and deleting blocks directly.
@@ -1723,7 +2246,6 @@ public class DataNode extends Configured
     return data;
   }
 
-
   public static void secureMain(String args[], SecureResources resources) {
     try {
       StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
@@ -1783,12 +2305,12 @@ public class DataNode extends Configured
    * Update replica with the new generation stamp and length.  
    */
   @Override // InterDatanodeProtocol
-  public Block updateReplicaUnderRecovery(Block oldBlock,
+  public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
                                           long recoveryId,
                                           long newLength) throws IOException {
-    ReplicaInfo r =

[... 281 lines stripped ...]


Mime
View raw message