hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1390763 [2/4] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/servlet/ hadoop-hdfs-httpfs/src/site/apt/ hadoop-hdfs-httpfs/src/test/java/org/apac...
Date Wed, 26 Sep 2012 22:55:20 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java Wed Sep 26 22:55:00 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -201,7 +202,7 @@ class BlockStorageLocationUtil {
       ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
       // Start off all IDs as invalid, fill it in later with results from RPCs
       for (int i = 0; i < b.getLocations().length; i++) {
-        l.add(new HdfsVolumeId((byte)-1, false));
+        l.add(new HdfsVolumeId(null, false));
       }
       blockVolumeIds.put(b, l);
     }
@@ -234,8 +235,8 @@ class BlockStorageLocationUtil {
         }
         // Get the VolumeId by indexing into the list of VolumeIds
         // provided by the datanode
-        HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0],
-            true);
+        byte[] volumeId = metaVolumeIds.get(volumeIndex);
+        HdfsVolumeId id = new HdfsVolumeId(volumeId, true);
         // Find out which index we are in the LocatedBlock's replicas
         LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
         DatanodeInfo[] dnInfos = locBlock.getLocations();
@@ -255,8 +256,8 @@ class BlockStorageLocationUtil {
         }
         // Place VolumeId at the same index as the DN's index in the list of
         // replicas
-        List<VolumeId> VolumeIds = blockVolumeIds.get(locBlock);
-        VolumeIds.set(index, id);
+        List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+        volumeIds.set(index, id);
       }
     }
     return blockVolumeIds;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Sep 26 22:55:00 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -77,6 +79,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
@@ -208,6 +211,7 @@ public class DFSClient implements java.i
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
+    final long socketCacheExpiry;
     /** Wait time window (in msec) if BlockMissingException is caught */
     final int timeWindow;
     final int nCachedConnRetry;
@@ -256,6 +260,8 @@ public class DFSClient implements java.i
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
@@ -426,7 +432,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**
@@ -640,7 +646,6 @@ public class DFSClient implements java.i
   void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-    socketCache.clear();
 
     try {
       // remove reference to this client and stop the renewer,
@@ -687,7 +692,6 @@ public class DFSClient implements java.i
   public synchronized void close() throws IOException {
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
-      socketCache.clear();
       clientRunning = false;
       getLeaseRenewer().closeClient(this);
       // close connections to the namenode
@@ -696,6 +700,17 @@ public class DFSClient implements java.i
   }
 
   /**
+   * Close all open streams, abandoning all of the leases and files being
+   * created.
+   * @param abort whether streams should be gracefully closed
+   */
+  public void closeOutputStreams(boolean abort) {
+    if (clientRunning) {
+      closeAllFilesBeingWritten(abort);
+    }
+  }
+
+  /**
    * Get the default block size for this cluster
    * @return the default block size in bytes
    */
@@ -1870,6 +1885,20 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class);
     }
   }
+
+  /**
+   * Rolls the edit log on the active NameNode.
+   * @return the txid of the new log segment 
+   *
+   * @see ClientProtocol#rollEdits()
+   */
+  long rollEdits() throws AccessControlException, IOException {
+    try {
+      return namenode.rollEdits();
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
+  }
   
   /**
    * enable/disable restore failed storage.
@@ -1942,34 +1971,29 @@ public class DFSClient implements java.i
    */
   public boolean mkdirs(String src, FsPermission permission,
       boolean createParent) throws IOException {
-    checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
     }
     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(src + ": masked=" + masked);
-    }
-    try {
-      return namenode.mkdirs(src, masked, createParent);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     InvalidPathException.class,
-                                     FileAlreadyExistsException.class,
-                                     FileNotFoundException.class,
-                                     ParentNotDirectoryException.class,
-                                     SafeModeException.class,
-                                     NSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
-    }
+    return primitiveMkdir(src, masked, createParent);
   }
-  
+
   /**
    * Same {{@link #mkdirs(String, FsPermission, boolean)} except
    * that the permissions has already been masked against umask.
    */
   public boolean primitiveMkdir(String src, FsPermission absPermission)
     throws IOException {
+    return primitiveMkdir(src, absPermission, true);
+  }
+
+  /**
+   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+   * that the permissions has already been masked against umask.
+   */
+  public boolean primitiveMkdir(String src, FsPermission absPermission, 
+    boolean createParent)
+    throws IOException {
     checkOpen();
     if (absPermission == null) {
       absPermission = 
@@ -1980,15 +2004,20 @@ public class DFSClient implements java.i
       LOG.debug(src + ": masked=" + absPermission);
     }
     try {
-      return namenode.mkdirs(src, absPermission, true);
+      return namenode.mkdirs(src, absPermission, createParent);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     InvalidPathException.class,
+                                     FileAlreadyExistsException.class,
+                                     FileNotFoundException.class,
+                                     ParentNotDirectoryException.class,
+                                     SafeModeException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      UnresolvedPathException.class);
     }
   }
-
+  
   /**
    * Get {@link ContentSummary} rooted at the specified directory.
    * @param path The string representation of the path
@@ -2060,10 +2089,7 @@ public class DFSClient implements java.i
   }
   
   boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
-      return true;
-    }
-    return false;
+    return shortCircuitLocalReads && isLocalAddress(targetAddr);
   }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Sep 26 22:55:00 2012
@@ -74,6 +74,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
   public static final int     DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
   
+  public static final String  DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+  public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -174,6 +176,13 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
+  
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
+  public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // The default value of the time interval for marking datanodes as stale
+  public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT = 30 * 1000; // 30s
 
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
@@ -329,6 +338,10 @@ public class DFSConfigKeys extends Commo
                                            "dfs.image.transfer.bandwidthPerSec";
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
 
+  // Image transfer timeout
+  public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
+  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Sep 26 22:55:00 2012
@@ -243,6 +243,10 @@ public class DFSInputStream extends FSIn
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
+  private synchronized boolean blockUnderConstruction() {
+    return locatedBlocks.isUnderConstruction();
+  }
+
   /**
    * Returns the datanode from which the stream is currently reading.
    */
@@ -878,7 +882,9 @@ public class DFSInputStream extends FSIn
                                        String clientName)
       throws IOException {
     
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
+    // Can't local read a block under construction, see HDFS-2757
+    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
+        !blockUnderConstruction()) {
       return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
           blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
           dfsClient.connectToDnViaHostname());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Sep 26 22:55:00 2012
@@ -127,6 +127,43 @@ public class DFSUtil {
           a.isDecommissioned() ? 1 : -1;
       }
     };
+    
+      
+  /**
+   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
+   * Decommissioned/stale nodes are moved to the end of the array on sorting
+   * with this compartor.
+   */ 
+  @InterfaceAudience.Private 
+  public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+    private long staleInterval;
+
+    /**
+     * Constructor of DecomStaleComparator
+     * 
+     * @param interval
+     *          The time invertal for marking datanodes as stale is passed from
+     *          outside, since the interval may be changed dynamically
+     */
+    public DecomStaleComparator(long interval) {
+      this.staleInterval = interval;
+    }
+
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Decommissioned nodes will still be moved to the end of the list
+      if (a.isDecommissioned()) {
+        return b.isDecommissioned() ? 0 : 1;
+      } else if (b.isDecommissioned()) {
+        return -1;
+      }
+      // Stale nodes will be moved behind the normal nodes
+      boolean aStale = a.isStale(staleInterval);
+      boolean bStale = b.isStale(staleInterval);
+      return aStale == bStale ? 0 : (aStale ? 1 : -1);
+    }
+  }    
+    
   /**
    * Address matcher for matching an address to local address
    */

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Sep 26 22:55:00 2012
@@ -535,10 +535,10 @@ public class DistributedFileSystem exten
   @Override
   public void close() throws IOException {
     try {
-      super.processDeleteOnExit();
-      dfs.close();
-    } finally {
+      dfs.closeOutputStreams(false);
       super.close();
+    } finally {
+      dfs.close();
     }
   }
 
@@ -624,6 +624,16 @@ public class DistributedFileSystem exten
   public void saveNamespace() throws AccessControlException, IOException {
     dfs.saveNamespace();
   }
+  
+  /**
+   * Rolls the edit log on the active NameNode.
+   * Requires super-user privileges.
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+   * @return the transaction ID of the newly created segment
+   */
+  public long rollEdits() throws AccessControlException, IOException {
+    return dfs.rollEdits();
+  }
 
   /**
    * enable/disable/check restoreFaileStorage

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Wed Sep 26 22:55:00 2012
@@ -105,4 +105,9 @@ public class HdfsConfiguration extends C
     deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
     deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
   }
+
+  public static void main(String[] args) {
+    init();
+    Configuration.dumpDeprecatedKeys();
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Sep 26 22:55:00 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -247,14 +248,13 @@ public class HftpFileSystem extends File
           Credentials c;
           try {
             c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
-          } catch (Exception e) {
-            LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
-            " using http.");
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("error was ", e);
+          } catch (IOException e) {
+            if (e.getCause() instanceof ConnectException) {
+              LOG.warn("Couldn't connect to " + nnHttpUrl +
+                  ", assuming security is disabled");
+              return null;
             }
-            //Maybe the server is in unsecure mode (that's bad but okay)
-            return null;
+            throw e;
           }
           for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
             if(LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Wed Sep 26 22:55:00 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -243,106 +243,20 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
-  /**
-   * Return the default retry policy used in RPC.
-   * 
-   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
-   * 
-   * Otherwise, first unwrap ServiceException if possible, and then 
-   * (1) use multipleLinearRandomRetry for
-   *     - SafeModeException, or
-   *     - IOException other than RemoteException, or
-   *     - ServiceException; and
-   * (2) use TRY_ONCE_THEN_FAIL for
-   *     - non-SafeMode RemoteException, or
-   *     - non-IOException.
-   *     
-   * Note that dfs.client.retry.max < 0 is not allowed.
-   */
-  public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
-    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
-    }
-    if (multipleLinearRandomRetry == null) {
-      //no retry
-      return RetryPolicies.TRY_ONCE_THEN_FAIL;
-    } else {
-      return new RetryPolicy() {
-        @Override
-        public RetryAction shouldRetry(Exception e, int retries, int failovers,
-            boolean isMethodIdempotent) throws Exception {
-          if (e instanceof ServiceException) {
-            //unwrap ServiceException
-            final Throwable cause = e.getCause();
-            if (cause != null && cause instanceof Exception) {
-              e = (Exception)cause;
-            }
-          }
-
-          //see (1) and (2) in the javadoc of this method.
-          final RetryPolicy p;
-          if (e instanceof RemoteException) {
-            final RemoteException re = (RemoteException)e;
-            p = SafeModeException.class.getName().equals(re.getClassName())?
-                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
-          } else if (e instanceof IOException || e instanceof ServiceException) {
-            p = multipleLinearRandomRetry;
-          } else { //non-IOException
-            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RETRY " + retries + ") policy="
-                + p.getClass().getSimpleName() + ", exception=" + e);
-          }
-          LOG.info("RETRY " + retries + ") policy="
-              + p.getClass().getSimpleName() + ", exception=" + e);
-          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
-        }
-
-        @Override
-        public String toString() {
-          return "RetryPolicy[" + multipleLinearRandomRetry + ", "
-              + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
-              + "]";
-        }
-      };
-    }
-  }
-
-  /**
-   * Return the MultipleLinearRandomRetry policy specified in the conf,
-   * or null if the feature is disabled.
-   * If the policy is specified in the conf but the policy cannot be parsed,
-   * the default policy is returned.
-   * 
-   * Conf property: N pairs of sleep-time and number-of-retries
-   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
-   */
-  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
-    final boolean enabled = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
-    if (!enabled) {
-      return null;
-    }
-
-    final String policy = conf.get(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-
-    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
-    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-  }
-
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
 
-    final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
+    final RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class);
+    
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Wed Sep 26 22:55:00 2012
@@ -26,51 +26,131 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import java.io.IOException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
  */
 class SocketCache {
-  static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
-  private final int capacity;
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    long createTime;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+      this.createTime = System.currentTimeMillis();
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
 
-  /**
-   * Create a SocketCache with the given capacity.
-   * @param capacity  Max cache size.
-   */
-  public SocketCache(int capacity) {
-    multimap = LinkedListMultimap.create();
-    this.capacity = capacity;
-    if (capacity <= 0) {
-      LOG.debug("SocketCache disabled in configuration.");
+    public long getCreateTime() {
+      return this.createTime;
     }
   }
 
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static SocketCache scInstance = new SocketCache();
+  private static boolean isInitedOnce = false;
+ 
+  public static synchronized SocketCache getInstance(int c, long e) {
+    // capacity is only initialized once
+    if (isInitedOnce == false) {
+      capacity = c;
+      expiryPeriod = e;
+
+      if (capacity == 0 ) {
+        LOG.info("SocketCache disabled.");
+      }
+      else if (expiryPeriod == 0) {
+        throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+           expiryPeriod + "when cache is enabled.");
+      }
+      isInitedOnce = true;
+    } else { //already initialized once
+      if (capacity != c || expiryPeriod != e) {
+        LOG.info("capacity and expiry periods already set to " + capacity + 
+          " and " + expiryPeriod + " respectively. Cannot set it to " + c + 
+          " and " + e);
+      }
+    }
+
+    return scInstance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SocketCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          SocketCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(SocketCache.this);
+      }
+    });
+    daemon.start();
+  }
+
   /**
    * Get a cached socket to the given address.
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
   public synchronized SocketAndStreams get(SocketAddress remote) {
+
     if (capacity <= 0) { // disabled
       return null;
     }
-    
-    List<SocketAndStreams> socklist = multimap.get(remote);
-    if (socklist == null) {
+
+    List<SocketAndStreams> sockStreamList = multimap.get(remote);
+    if (sockStreamList == null) {
       return null;
     }
 
-    Iterator<SocketAndStreams> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = sockStreamList.iterator();
     while (iter.hasNext()) {
       SocketAndStreams candidate = iter.next();
       iter.remove();
@@ -86,14 +166,16 @@ class SocketCache {
    * @param sock socket not used by anyone.
    */
   public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+    Preconditions.checkNotNull(sock);
     SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
       s.close();
       return;
     }
-    
-    Preconditions.checkNotNull(sock);
+ 
+    startExpiryDaemon();
 
     SocketAddress remoteAddr = sock.getRemoteSocketAddress();
     if (remoteAddr == null) {
@@ -106,7 +188,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
+    multimap.put(remoteAddr, s);
   }
 
   public synchronized int size() {
@@ -114,13 +196,34 @@ class SocketCache {
   }
 
   /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+        multimap.entries().iterator();
+      Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        System.currentTimeMillis() - entry.getValue().getCreateTime() < 
+        expiryPeriod) {
+        break;
+      }
+      iter.remove();
+      SocketAndStreams s = entry.getValue();
+      s.close();
+    }
+  }
+
+  /**
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
     Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
-      throw new IllegalStateException("Cannot evict from empty cache!");
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
     }
     Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
@@ -129,38 +232,31 @@ class SocketCache {
   }
 
   /**
-   * Empty the cache, and close all sockets.
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
    */
-  public synchronized void clear() {
-    for (SocketAndStreams s : multimap.values()) {
-      s.close();
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = System.currentTimeMillis();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = System.currentTimeMillis();
+      }
     }
-    multimap.clear();
-  }
-
-  @Override
-  protected void finalize() {
     clear();
+    throw new InterruptedException("Daemon Interrupted");
   }
-  
-  @InterfaceAudience.Private
-  static class SocketAndStreams implements Closeable {
-    public final Socket sock;
-    public final IOStreamPair ioStreams;
-    
-    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
-      this.sock = s;
-      this.ioStreams = ioStreams;
-    }
-    
-    @Override
-    public void close() {
-      if (ioStreams != null) { 
-        IOUtils.closeStream(ioStreams.in);
-        IOUtils.closeStream(ioStreams.out);
-      }
-      IOUtils.closeSocket(sock);
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  private synchronized void clear() {
+    for (SocketAndStreams sockAndStream : multimap.values()) {
+      sockAndStream.close();
     }
+    multimap.clear();
   }
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Sep 26 22:55:00 2012
@@ -667,6 +667,18 @@ public interface ClientProtocol {
    */
   public void saveNamespace() throws AccessControlException, IOException;
 
+  
+  /**
+   * Roll the edit log.
+   * Requires superuser privileges.
+   * 
+   * @throws AccessControlException if the superuser privilege is violated
+   * @throws IOException if log roll fails
+   * @return the txid of the new segment
+   */
+  @Idempotent
+  public long rollEdits() throws AccessControlException, IOException;
+
   /**
    * Enable/Disable restore failed storage.
    * <p>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Sep 26 22:55:00 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 /** 
  * This class extends the primary identifier of a Datanode with ephemeral
@@ -321,7 +322,24 @@ public class DatanodeInfo extends Datano
     }
     return adminState;
   }
-
+ 
+  /**
+   * Check if the datanode is in stale state. Here if 
+   * the namenode has not received heartbeat msg from a 
+   * datanode for more than staleInterval (default value is
+   * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+   * the datanode will be treated as stale node.
+   * 
+   * @param staleInterval
+   *          the time interval for marking the node as stale. If the last
+   *          update time is beyond the given time interval, the node will be
+   *          marked as stale.
+   * @return true if the node is stale
+   */
+  public boolean isStale(long staleInterval) {
+    return (Time.now() - lastUpdate) >= staleInterval;
+  }
+  
   /**
    * Sets the admin state of this node.
    */

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Wed Sep 26 22:55:00 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 
+import com.google.protobuf.TextFormat;
+
 /** Pipeline Acknowledgment **/
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -120,6 +122,6 @@ public class PipelineAck {
   
   @Override //Object
   public String toString() {
-    return proto.toString();
+    return TextFormat.shortDebugString(proto);
   }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Wed Sep 26 22:55:00 2012
@@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
@@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe
     }
 
   }
+  
+  @Override
+  public RollEditsResponseProto rollEdits(RpcController controller,
+      RollEditsRequestProto request) throws ServiceException {
+    try {
+      long txid = server.rollEdits();
+      return RollEditsResponseProto.newBuilder()
+          .setNewSegmentTxId(txid)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
 
   static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = 
       RefreshNodesResponseProto.newBuilder().build();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Wed Sep 26 22:55:00 2012
@@ -87,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
@@ -525,6 +527,17 @@ public class ClientNamenodeProtocolTrans
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public long rollEdits() throws AccessControlException, IOException {
+    RollEditsRequestProto req = RollEditsRequestProto.getDefaultInstance();
+    try {
+      RollEditsResponseProto resp = rpcProxy.rollEdits(null, req);
+      return resp.getNewSegmentTxId();
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
 
   @Override
   public boolean restoreFailedStorage(String arg) 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Sep 26 22:55:00 2012
@@ -363,11 +363,10 @@ public class BlockManager {
         replicationThread.join(3000);
       }
     } catch (InterruptedException ie) {
-    } finally {
-      if (pendingReplications != null) pendingReplications.stop();
-      blocksMap.close();
-      datanodeManager.close();
     }
+    datanodeManager.close();
+    pendingReplications.stop();
+    blocksMap.close();
   }
 
   /** @return the datanodeManager */
@@ -1315,8 +1314,9 @@ public class BlockManager {
       final HashMap<Node, Node> excludedNodes,
       final long blocksize) throws IOException {
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
-        src, numOfReplicas, client, excludedNodes, blocksize);
+    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
+        excludedNodes, blocksize);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Wed Sep 26 22:55:00 2012
@@ -71,21 +71,6 @@ public abstract class BlockPlacementPoli
                                              long blocksize);
 
   /**
-   * Same as
-   * {{@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, HashMap, long)}
-   * with returnChosenNodes equal to false.
-   */
-  final DatanodeDescriptor[] chooseTarget(String srcPath,
-                                          int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          List<DatanodeDescriptor> chosenNodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer, chosenNodes, false,
-        excludedNodes, blocksize);
-  }
-
-  /**
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
    * If not, return as many as we can.
@@ -131,7 +116,7 @@ public abstract class BlockPlacementPoli
                                     HashMap<Node, Node> excludedNodes,
                                     long blocksize) {
     return chooseTarget(srcBC.getName(), numOfReplicas, writer,
-                        chosenNodes, excludedNodes, blocksize);
+                        chosenNodes, false, excludedNodes, blocksize);
   }
 
   /**
@@ -198,51 +183,6 @@ public abstract class BlockPlacementPoli
     replicator.initialize(conf, stats, clusterMap);
     return replicator;
   }
-
-  /**
-   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param srcPath a string representation of the file for which chooseTarget is invoked
-   * @param numOfReplicas number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer,
-                        new ArrayList<DatanodeDescriptor>(),
-                        blocksize);
-  }
-
-  /**
-   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i>
-   * If not, return as many as we can.
-   *
-   * @param srcPath a string representation of the file for which chooseTarget is invoked
-   * @param numOfReplicas number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param blocksize size of the data to be written.
-   * @param excludedNodes datanodes that should not be considered as targets.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer,
-                        new ArrayList<DatanodeDescriptor>(),
-                        excludedNodes,
-                        blocksize);
-  }
   
   /**
    * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Sep 26 22:55:00 2012
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -55,9 +53,6 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Private
 public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
-  private static final Log LOG =
-    LogFactory.getLog(BlockPlacementPolicyDefault.class.getName());
-
   private static final String enableDebugLogging =
     "For more information, please enable DEBUG log level on "
     + LOG.getClass().getName();
@@ -124,7 +119,6 @@ public class BlockPlacementPolicyDefault
         excludedNodes, blocksize);
   }
 
-
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -162,7 +156,8 @@ public class BlockPlacementPolicyDefault
     }
       
     DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
+                                                excludedNodes, blocksize, 
+                                                maxNodesPerRack, results);
     if (!returnChosenNodes) {  
       results.removeAll(chosenNodes);
     }
@@ -455,14 +450,29 @@ public class BlockPlacementPolicyDefault
    * does not have too much load, and the rack does not have too many nodes
    */
   private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
+                               long blockSize, int maxTargetPerRack,
                                List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+    return isGoodTarget(node, blockSize, maxTargetPerRack,
                         this.considerLoad, results);
   }
-    
+  
+  /**
+   * Determine if a node is a good target. 
+   * 
+   * @param node The target node
+   * @param blockSize Size of block
+   * @param maxTargetPerRack Maximum number of targets per rack. The value of 
+   *                       this parameter depends on the number of racks in 
+   *                       the cluster and total number of replicas for a block
+   * @param considerLoad whether or not to consider load of the target node
+   * @param results A list containing currently chosen nodes. Used to check if 
+   *                too many nodes has been chosen in the target rack. 
+   * @return Return true if <i>node</i> has enough space, 
+   *         does not have too much load, 
+   *         and the rack does not have too many nodes.
+   */
   protected boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
+                               long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results) {
     // check if the node is (being) decommissed
@@ -514,7 +524,7 @@ public class BlockPlacementPolicyDefault
         counter++;
       }
     }
-    if (counter>maxTargetPerLoc) {
+    if (counter>maxTargetPerRack) {
       if(LOG.isDebugEnabled()) {
         threadLocalBuilder.get().append(node.toString()).append(": ")
           .append("Node ").append(NodeBase.getPath(node))

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Wed Sep 26 22:55:00 2012
@@ -94,7 +94,7 @@ class BlocksMap {
   }
 
   void close() {
-    blocks = null;
+    // Empty blocks once GSet#clear is implemented (HDFS-3940)
   }
 
   BlockCollection getBlockCollection(Block b) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Sep 26 22:55:00 2012
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -132,6 +133,11 @@ public class DatanodeManager {
    */
   private boolean hasClusterEverBeenMultiRack = false;
   
+  /** Whether or not to check the stale datanodes */
+  private volatile boolean checkForStaleNodes;
+  /** The time interval for detecting stale datanodes */
+  private volatile long staleInterval;
+  
   DatanodeManager(final BlockManager blockManager,
       final Namesystem namesystem, final Configuration conf
       ) throws IOException {
@@ -175,6 +181,21 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
     LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
         + "=" + this.blockInvalidateLimit);
+    // set the value of stale interval based on configuration
+    this.checkForStaleNodes = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+    if (this.checkForStaleNodes) {
+      this.staleInterval = conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
+      if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
+        LOG.warn("The given interval for marking stale datanode = "
+            + this.staleInterval + ", which is smaller than the default value "
+            + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
+            + ".");
+      }
+    }
   }
 
   private Daemon decommissionthread = null;
@@ -192,7 +213,13 @@ public class DatanodeManager {
   }
 
   void close() {
-    if (decommissionthread != null) decommissionthread.interrupt();
+    if (decommissionthread != null) {
+      decommissionthread.interrupt();
+      try {
+        decommissionthread.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
     heartbeatManager.close();
   }
 
@@ -225,14 +252,17 @@ public class DatanodeManager {
       if (rName != null)
         client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
     }
+    
+    Comparator<DatanodeInfo> comparator = checkForStaleNodes ? 
+                    new DFSUtil.DecomStaleComparator(staleInterval) : 
+                    DFSUtil.DECOM_COMPARATOR;
     for (LocatedBlock b : locatedblocks) {
       networktopology.pseudoSortByDistance(client, b.getLocations());
-      
-      // Move decommissioned datanodes to the bottom
-      Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
+      // Move decommissioned/stale datanodes to the bottom
+      Arrays.sort(b.getLocations(), comparator);
     }
   }
-
+  
   CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
       final String firstkey) {
     return new CyclicIteration<String, DatanodeDescriptor>(

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Sep 26 22:55:00 2012
@@ -74,6 +74,11 @@ class HeartbeatManager implements Datano
 
   void close() {
     heartbeatThread.interrupt();
+    try {
+      // This will no effect if the thread hasn't yet been started.
+      heartbeatThread.join(3000);
+    } catch (InterruptedException e) {
+    }
   }
   
   synchronized int getLiveDatanodeCount() {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Wed Sep 26 22:55:00 2012
@@ -51,6 +51,8 @@ import org.apache.hadoop.hdfs.util.DataT
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Scans the block files under a block pool and verifies that the
  * files are not corrupt.
@@ -255,6 +257,11 @@ class BlockPoolSliceScanner {
     }
   }
 
+  @VisibleForTesting
+  long getTotalScans() {
+    return totalScans;
+  }
+
   /** @return the last scan time for the block pool. */
   long getLastScanTime() {
     return lastScanTime.get();
@@ -367,7 +374,8 @@ class BlockPoolSliceScanner {
     throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
   }
   
-  private void verifyBlock(ExtendedBlock block) {
+  @VisibleForTesting
+  void verifyBlock(ExtendedBlock block) {
     BlockSender blockSender = null;
 
     /* In case of failure, attempt to read second time to reduce
@@ -563,7 +571,24 @@ class BlockPoolSliceScanner {
     currentPeriodStart = Time.now();
   }
   
+  private synchronized boolean workRemainingInCurrentPeriod() {
+    if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
+                  currentPeriodStart + ", period=" + scanPeriod + ", now=" +
+                  Time.now() + " " + blockPoolId);
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
   void scanBlockPoolSlice() {
+    if (!workRemainingInCurrentPeriod()) {
+      return;
+    }
+
     // Create a new processedBlocks structure
     processedBlocks = new HashMap<Long, Integer>();
     if (!assignInitialVerificationTimes()) {
@@ -608,14 +633,14 @@ class BlockPoolSliceScanner {
       LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
       throw e;
     } finally {
-      cleanUp();
+      rollVerificationLogs();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Done scanning block pool: " + blockPoolId);
       }
     }
   }
   
-  private synchronized void cleanUp() {
+  private synchronized void rollVerificationLogs() {
     if (verificationLog != null) {
       try {
         verificationLog.logs.roll();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Wed Sep 26 22:55:00 2012
@@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * DataBlockScanner manages block scanning for all the block pools. For each
  * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
@@ -47,6 +49,8 @@ public class DataBlockScanner implements
   private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
   private final Configuration conf;
   
+  static final int SLEEP_PERIOD_MS = 5 * 1000;
+
   /**
    * Map to find the BlockPoolScanner for a given block pool id. This is updated
    * when a BPOfferService becomes alive or dies.
@@ -68,10 +72,10 @@ public class DataBlockScanner implements
     String currentBpId = "";
     boolean firstRun = true;
     while (datanode.shouldRun && !Thread.interrupted()) {
-      //Sleep everytime except in the first interation.
+      //Sleep everytime except in the first iteration.
       if (!firstRun) {
         try {
-          Thread.sleep(5000);
+          Thread.sleep(SLEEP_PERIOD_MS);
         } catch (InterruptedException ex) {
           // Interrupt itself again to set the interrupt status
           blockScannerThread.interrupt();
@@ -103,7 +107,7 @@ public class DataBlockScanner implements
     while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
         || (getBlockPoolSetSize() < 1)) {
       try {
-        Thread.sleep(5000);
+        Thread.sleep(SLEEP_PERIOD_MS);
       } catch (InterruptedException e) {
         blockScannerThread.interrupt();
         return;
@@ -168,7 +172,8 @@ public class DataBlockScanner implements
     return blockPoolScannerMap.size();
   }
   
-  private synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
+  @VisibleForTesting
+  synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
     return blockPoolScannerMap.get(bpid);
   }
   
@@ -249,7 +254,7 @@ public class DataBlockScanner implements
     LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
   }
   
-  // This method is used for testing
+  @VisibleForTesting
   long getBlocksScannedInLastRun(String bpid) throws IOException {
     BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
     if (bpScanner == null) {
@@ -259,6 +264,16 @@ public class DataBlockScanner implements
     }
   }
 
+  @VisibleForTesting
+  long getTotalScans(String bpid) throws IOException {
+    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
+    if (bpScanner == null) {
+      throw new IOException("Block Pool: "+bpid+" is not running");
+    } else {
+      return bpScanner.getTotalScans();
+    }
+  }
+
   public void start() {
     blockScannerThread = new Thread(this);
     blockScannerThread.setDaemon(true);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Sep 26 22:55:00 2012
@@ -277,7 +277,7 @@ public class DataNode extends Configured
   private AbstractList<File> dataDirs;
   private Configuration conf;
 
-  private final String userWithLocalPathAccess;
+  private final List<String> usersWithLocalPathAccess;
   private boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
   private final boolean getHdfsBlockLocationsEnabled;
@@ -300,8 +300,8 @@ public class DataNode extends Configured
            final SecureResources resources) throws IOException {
     super(conf);
 
-    this.userWithLocalPathAccess =
-        conf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
+    this.usersWithLocalPathAccess = Arrays.asList(
+        conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
     this.connectToDnViaHostname = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
@@ -417,10 +417,15 @@ public class DataNode extends Configured
           new ClientDatanodeProtocolServerSideTranslatorPB(this);
     BlockingService service = ClientDatanodeProtocolService
         .newReflectiveBlockingService(clientDatanodeProtocolXlator);
-    ipcServer = RPC.getServer(ClientDatanodeProtocolPB.class, service, ipcAddr
-        .getHostName(), ipcAddr.getPort(), conf.getInt(
-        DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT),
-        false, conf, blockPoolTokenSecretManager);
+    ipcServer = new RPC.Builder(conf)
+        .setProtocol(ClientDatanodeProtocolPB.class)
+        .setInstance(service)
+        .setBindAddress(ipcAddr.getHostName())
+        .setPort(ipcAddr.getPort())
+        .setNumHandlers(
+            conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+                DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
+        .setSecretManager(blockPoolTokenSecretManager).build();
     
     InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
         new InterDatanodeProtocolServerSideTranslatorPB(this);
@@ -1007,7 +1012,7 @@ public class DataNode extends Configured
   private void checkBlockLocalPathAccess() throws IOException {
     checkKerberosAuthMethod("getBlockLocalPathInfo()");
     String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-    if (!currentUser.equals(this.userWithLocalPathAccess)) {
+    if (!usersWithLocalPathAccess.contains(currentUser)) {
       throw new AccessControlException(
           "Can't continue with getBlockLocalPathInfo() "
               + "authorization. The user " + currentUser

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java Wed Sep 26 22:55:00 2012
@@ -16,9 +16,11 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.nio.channels.ServerSocketChannel;
+import java.security.GeneralSecurityException;
 
 import org.apache.commons.daemon.Daemon;
 import org.apache.commons.daemon.DaemonContext;
@@ -26,9 +28,15 @@ import org.apache.hadoop.conf.Configurat
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+
+import javax.net.ssl.SSLServerSocketFactory;
 
 /**
  * Utility class to start a datanode in a secure cluster, first obtaining 
@@ -40,9 +48,9 @@ public class SecureDataNodeStarter imple
    */
   public static class SecureResources {
     private final ServerSocket streamingSocket;
-    private final SelectChannelConnector listener;
+    private final Connector listener;
     public SecureResources(ServerSocket streamingSocket,
-        SelectChannelConnector listener) {
+        Connector listener) {
 
       this.streamingSocket = streamingSocket;
       this.listener = listener;
@@ -50,12 +58,13 @@ public class SecureDataNodeStarter imple
 
     public ServerSocket getStreamingSocket() { return streamingSocket; }
 
-    public SelectChannelConnector getListener() { return listener; }
+    public Connector getListener() { return listener; }
   }
   
   private String [] args;
   private SecureResources resources;
-  
+  private SSLFactory sslFactory;
+
   @Override
   public void init(DaemonContext context) throws Exception {
     System.err.println("Initializing secure datanode resources");
@@ -80,13 +89,30 @@ public class SecureDataNodeStarter imple
     }
 
     // Obtain secure listener for web server
-    SelectChannelConnector listener = 
-                   (SelectChannelConnector)HttpServer.createDefaultChannelConnector();
+    Connector listener;
+    if (HttpConfig.isSecure()) {
+      sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+      try {
+        sslFactory.init();
+      } catch (GeneralSecurityException ex) {
+        throw new IOException(ex);
+      }
+      SslSocketConnector sslListener = new SslSocketConnector() {
+        @Override
+        protected SSLServerSocketFactory createFactory() throws Exception {
+          return sslFactory.createSSLServerSocketFactory();
+        }
+      };
+      listener = sslListener;
+    } else {
+      listener = HttpServer.createDefaultChannelConnector();
+    }
+
     InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
     listener.setHost(infoSocAddr.getHostName());
     listener.setPort(infoSocAddr.getPort());
     // Open listener here in order to bind to port as root
-    listener.open(); 
+    listener.open();
     if (listener.getPort() != infoSocAddr.getPort()) {
       throw new RuntimeException("Unable to bind on specified info port in secure " +
           "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
@@ -109,6 +135,9 @@ public class SecureDataNodeStarter imple
     DataNode.secureMain(args, resources);
   }
   
-  @Override public void destroy() { /* Nothing to do */ }
+  @Override public void destroy() {
+    sslFactory.destroy();
+  }
+
   @Override public void stop() throws Exception { /* Nothing to do */ }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Sep 26 22:55:00 2012
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -1676,10 +1677,10 @@ class FsDatasetImpl implements FsDataset
     List<byte[]> blocksVolumeIds = new ArrayList<byte[]>(volumes.volumes.size());
     // List of indexes into the list of VolumeIds, pointing at the VolumeId of
     // the volume that the block is on
-    List<Integer> blocksVolumendexes = new ArrayList<Integer>(blocks.size());
+    List<Integer> blocksVolumeIndexes = new ArrayList<Integer>(blocks.size());
     // Initialize the list of VolumeIds simply by enumerating the volumes
     for (int i = 0; i < volumes.volumes.size(); i++) {
-      blocksVolumeIds.add(new byte[] { (byte) i });
+      blocksVolumeIds.add(ByteBuffer.allocate(4).putInt(i).array());
     }
     // Determine the index of the VolumeId of each block's volume, by comparing 
     // the block's volume against the enumerated volumes
@@ -1700,10 +1701,10 @@ class FsDatasetImpl implements FsDataset
       if (!isValid) {
         volumeIndex = Integer.MAX_VALUE;
       }
-      blocksVolumendexes.add(volumeIndex);
+      blocksVolumeIndexes.add(volumeIndex);
     }
     return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
-        blocksVolumeIds, blocksVolumendexes);
+        blocksVolumeIds, blocksVolumeIndexes);
   }
 
   @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1390763&r1=1390762&r2=1390763&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java Wed Sep 26 22:55:00 2012
@@ -283,8 +283,9 @@ public class JournalService implements J
         new JournalProtocolServerSideTranslatorPB(impl);
     BlockingService service = 
         JournalProtocolService.newReflectiveBlockingService(xlator);
-    return RPC.getServer(JournalProtocolPB.class, service,
-        address.getHostName(), address.getPort(), 1, false, conf, null);
+    return new RPC.Builder(conf).setProtocol(JournalProtocolPB.class)
+        .setInstance(service).setBindAddress(address.getHostName())
+        .setPort(address.getPort()).setNumHandlers(1).setVerbose(false).build();
   }
   
   private void verifyEpoch(long e) throws FencedException {



Mime
View raw message