hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [7/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 19 02:25:55 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.CommonConfig
 public class DFSConfigKeys extends CommonConfigurationKeys {
 
   public static final String  DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
-  public static final long    DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
+  public static final long    DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
   public static final String  DFS_REPLICATION_KEY = "dfs.replication";
   public static final short   DFS_REPLICATION_DEFAULT = 3;
   public static final String  DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size";
@@ -52,6 +52,14 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
   public static final String  DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
   public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
+  public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
+  public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
+  public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
+  public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";
+  public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
+  public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout";
+  public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60;
 
   // HA related configuration
   public static final String  DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@@ -66,6 +74,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
   public static final int     DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
   
+  public static final String  DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+  public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -74,13 +84,15 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
   public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
-  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0;
+  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
   public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
   public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
   public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
   public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+  public static final String  DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
+  public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
@@ -150,6 +162,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
   public static final String  DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
   public static final int     DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
+  public static final String  DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
+  public static final int     DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
 
@@ -164,11 +178,30 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
+  
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
+  public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
+  // The default value of the time interval for marking datanodes as stale
+  public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  
+  // When the number stale datanodes marked as stale reached this certian ratio, 
+  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
       "dfs.namenode.invalidate.work.pct.per.iteration";
-  public static final int DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 32;
+  public static final float DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 0.32f;
   public static final String DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION =
       "dfs.namenode.replication.work.multiplier.per.iteration";
   public static final int DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = 2;
@@ -203,6 +236,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
   public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
+  public static final String  DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
   public static final String  DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
@@ -240,7 +274,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_DU_RESERVED_KEY = "dfs.datanode.du.reserved";
   public static final long    DFS_DATANODE_DU_RESERVED_DEFAULT = 0;
   public static final String  DFS_DATANODE_HANDLER_COUNT_KEY = "dfs.datanode.handler.count";
-  public static final int     DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3;
+  public static final int     DFS_DATANODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";
   public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 50075;
   public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
@@ -318,6 +352,10 @@ public class DFSConfigKeys extends Commo
                                            "dfs.image.transfer.bandwidthPerSec";
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
 
+  // Image transfer timeout
+  public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
+  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
@@ -366,4 +404,47 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
   public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
   public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
+  
+  // Security-related configs
+  public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+  public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
+  public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
+  
+  // Journal-node related configs. These are read on the JN side.
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+  public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+    
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+  public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+  public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+  public static final String  DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+  public static final String  DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+  // Journal-node related configs for the client side.
+  public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+  public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+  
+  // Quorum-journal timeouts for various operations. Unlikely to need
+  // to be tweaked, but configurable just in case.
+  public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+  public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+  public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
+  public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Oct 19 02:25:55 2012
@@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@@ -196,7 +199,8 @@ public class DFSInputStream extends FSIn
       
       try {
         cdp = DFSUtil.createClientDatanodeProtocolProxy(
-        datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
+            datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
+            dfsClient.getConf().connectToDnViaHostname, locatedblock);
         
         final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
         
@@ -239,6 +243,10 @@ public class DFSInputStream extends FSIn
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
+  private synchronized boolean blockUnderConstruction() {
+    return locatedBlocks.isUnderConstruction();
+  }
+
   /**
    * Returns the datanode from which the stream is currently reading.
    */
@@ -425,6 +433,7 @@ public class DFSInputStream extends FSIn
     //
     DatanodeInfo chosenNode = null;
     int refetchToken = 1; // only need to get a new access token once
+    int refetchEncryptionKey = 1; // only need to get a new encryption key once
     
     boolean connectFailedOnce = false;
 
@@ -452,7 +461,14 @@ public class DFSInputStream extends FSIn
         }
         return chosenNode;
       } catch (IOException ex) {
-        if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
+        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+              + "encryption key was invalid when connecting to " + targetAddr
+              + " : " + ex);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+        } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will fetch a new access token and retry, " 
               + "access token was invalid when connecting to " + targetAddr
               + " : " + ex);
@@ -705,8 +721,12 @@ public class DFSInputStream extends FSIn
       DatanodeInfo[] nodes = block.getLocations();
       try {
         DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
-        InetSocketAddress targetAddr = 
-          NetUtils.createSocketAddr(chosenNode.getXferAddr());
+        final String dnAddr =
+            chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+        }
+        InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
         return new DNAddrPair(chosenNode, targetAddr);
       } catch (IOException ie) {
         String blockInfo = block.getBlock() + " file=" + src;
@@ -754,6 +774,7 @@ public class DFSInputStream extends FSIn
     // Connect to best DataNode for desired Block, with potential offset
     //
     int refetchToken = 1; // only need to get a new access token once
+    int refetchEncryptionKey = 1; // only need to get a new encryption key once
     
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
@@ -789,7 +810,14 @@ public class DFSInputStream extends FSIn
         dfsClient.disableShortCircuit();
         continue;
       } catch (IOException e) {
-        if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
+        if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+              + "encryption key was invalid when connecting to " + targetAddr
+              + " : " + e);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+        } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
           DFSClient.LOG.info("Will get a new access token and retry, "
               + "access token was invalid when connecting to " + targetAddr
               + " : " + e);
@@ -818,8 +846,9 @@ public class DFSInputStream extends FSIn
    */
   private void closeBlockReader(BlockReader reader) throws IOException {
     if (reader.hasSentStatusCode()) {
+      IOStreamPair ioStreams = reader.getStreams();
       Socket oldSock = reader.takeSocket();
-      socketCache.put(oldSock);
+      socketCache.put(oldSock, ioStreams);
     }
     reader.close();
   }
@@ -853,9 +882,12 @@ public class DFSInputStream extends FSIn
                                        String clientName)
       throws IOException {
     
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
+    // Can't local read a block under construction, see HDFS-2757
+    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
+        !blockUnderConstruction()) {
       return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
-          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset);
+          blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
+          dfsClient.connectToDnViaHostname());
     }
     
     IOException err = null;
@@ -864,14 +896,15 @@ public class DFSInputStream extends FSIn
     // Allow retry since there is no way of knowing whether the cached socket
     // is good until we actually use it.
     for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
-      Socket sock = null;
+      SocketAndStreams sockAndStreams = null;
       // Don't use the cache on the last attempt - it's possible that there
       // are arbitrarily many unusable sockets in the cache, but we don't
       // want to fail the read.
       if (retries < nCachedConnRetry) {
-        sock = socketCache.get(dnAddr);
+        sockAndStreams = socketCache.get(dnAddr);
       }
-      if (sock == null) {
+      Socket sock;
+      if (sockAndStreams == null) {
         fromCache = false;
 
         sock = dfsClient.socketFactory.createSocket();
@@ -895,6 +928,8 @@ public class DFSInputStream extends FSIn
             dfsClient.getRandomLocalInterfaceAddr(),
             dfsClient.getConf().socketTimeout);
         sock.setSoTimeout(dfsClient.getConf().socketTimeout);
+      } else {
+        sock = sockAndStreams.sock;
       }
 
       try {
@@ -905,12 +940,18 @@ public class DFSInputStream extends FSIn
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,
-                                       clientName);
+                                       clientName,
+                                       dfsClient.getDataEncryptionKey(),
+                                       sockAndStreams == null ? null : sockAndStreams.ioStreams);
         return reader;
       } catch (IOException ex) {
         // Our socket is no good.
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
-        sock.close();
+        if (sockAndStreams != null) {
+          sockAndStreams.close();
+        } else {
+          sock.close();
+        }
         err = ex;
       }
     }
@@ -1154,7 +1195,7 @@ public class DFSInputStream extends FSIn
     throw new IOException("No live nodes contain current block");
   }
 
-  /** Utility class to encapsulate data node info and its ip address. */
+  /** Utility class to encapsulate data node info and its address. */
   static class DNAddrPair {
     DatanodeInfo info;
     InetSocketAddress addr;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Oct 19 02:25:55 2012
@@ -24,11 +24,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -55,7 +56,10 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -74,6 +78,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
 
 
 /****************************************************************
@@ -100,8 +107,8 @@ import org.apache.hadoop.util.Progressab
 ****************************************************************/
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer implements Syncable {
-  private final DFSClient dfsClient;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+  private final DFSClient dfsClient;
   private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
@@ -118,7 +125,7 @@ public class DFSOutputStream extends FSO
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes writen in current block
-  private int packetSize = 0; // write packet size, including the header.
+  private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
   private volatile IOException lastException = null;
   private long artificialSlowdown = 0;
@@ -131,80 +138,75 @@ public class DFSOutputStream extends FSO
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
-  private class Packet {
-    long    seqno;               // sequencenumber of buffer in block
-    long    offsetInBlock;       // offset in block
-    private boolean lastPacketInBlock;   // is this the last packet in block?
-    boolean syncBlock;          // this packet forces the current block to disk
-    int     numChunks;           // number of chunks currently in packet
-    int     maxChunks;           // max chunks in packet
-
-    /** buffer for accumulating packet checksum and data */
-    ByteBuffer buffer; // wraps buf, only one of these two may be non-null
+  private static class Packet {
+    private static final long HEART_BEAT_SEQNO = -1L;
+    long seqno; // sequencenumber of buffer in block
+    final long offsetInBlock; // offset in block
+    boolean syncBlock; // this packet forces the current block to disk
+    int numChunks; // number of chunks currently in packet
+    final int maxChunks; // max chunks in packet
     byte[]  buf;
+    private boolean lastPacketInBlock; // is this the last packet in block?
 
     /**
      * buf is pointed into like follows:
      *  (C is checksum data, D is payload data)
      *
-     * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
-     *       ^    ^               ^               ^
-     *       |    checksumPos     dataStart       dataPos
-     *   checksumStart
+     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+     *           ^        ^               ^               ^
+     *           |        checksumPos     dataStart       dataPos
+     *           checksumStart
+     * 
+     * Right before sending, we move the checksum data to immediately precede
+     * the actual data, and then insert the header into the buffer immediately
+     * preceding the checksum data, so we make sure to keep enough space in
+     * front of the checksum data to support the largest conceivable header. 
      */
     int checksumStart;
-    int dataStart;
-    int dataPos;
     int checksumPos;
-
-    private static final long HEART_BEAT_SEQNO = -1L;
+    final int dataStart;
+    int dataPos;
 
     /**
-     *  create a heartbeat packet
+     * Create a heartbeat packet.
      */
-    Packet() {
-      this.lastPacketInBlock = false;
-      this.numChunks = 0;
-      this.offsetInBlock = 0;
-      this.seqno = HEART_BEAT_SEQNO;
-      
-      buffer = null;
-      int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
-      buf = new byte[packetSize];
-      
-      checksumStart = dataStart = packetSize;
-      checksumPos = checksumStart;
-      dataPos = dataStart;
-      maxChunks = 0;
+    Packet(int checksumSize) {
+      this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
     }
     
-    // create a new packet
-    Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+    /**
+     * Create a new packet.
+     * 
+     * @param pktSize maximum size of the packet, 
+     *                including checksum data and actual data.
+     * @param chunksPerPkt maximum number of chunks per packet.
+     * @param offsetInBlock offset in bytes into the HDFS block.
+     */
+    Packet(int pktSize, int chunksPerPkt, long offsetInBlock, 
+                              long seqno, int checksumSize) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
       this.offsetInBlock = offsetInBlock;
-      this.seqno = currentSeqno;
-      currentSeqno++;
+      this.seqno = seqno;
       
-      buffer = null;
-      buf = new byte[pktSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
-      checksumStart = PacketHeader.PKT_HEADER_LEN;
+      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
-      dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+      dataStart = checksumStart + (chunksPerPkt * checksumSize);
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
 
     void writeData(byte[] inarray, int off, int len) {
-      if ( dataPos + len > buf.length) {
+      if (dataPos + len > buf.length) {
         throw new BufferOverflowException();
       }
       System.arraycopy(inarray, off, buf, dataPos, len);
       dataPos += len;
     }
 
-    void  writeChecksum(byte[] inarray, int off, int len) {
+    void writeChecksum(byte[] inarray, int off, int len) {
       if (checksumPos + len > dataStart) {
         throw new BufferOverflowException();
       }
@@ -213,45 +215,38 @@ public class DFSOutputStream extends FSO
     }
     
     /**
-     * Returns ByteBuffer that contains one full packet, including header.
+     * Write the full packet, including the header, to the given output stream.
      */
-    ByteBuffer getBuffer() {
-      /* Once this is called, no more data can be added to the packet.
-       * setting 'buf' to null ensures that.
-       * This is called only when the packet is ready to be sent.
-       */
-      if (buffer != null) {
-        return buffer;
-      }
-      
-      //prepare the header and close any gap between checksum and data.
-      
-      int dataLen = dataPos - dataStart;
-      int checksumLen = checksumPos - checksumStart;
+    void writeTo(DataOutputStream stm) throws IOException {
+      final int dataLen = dataPos - dataStart;
+      final int checksumLen = checksumPos - checksumStart;
+      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+      PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       
       if (checksumPos != dataStart) {
-        /* move the checksum to cover the gap.
-         * This can happen for the last packet.
-         */
+        // Move the checksum to cover the gap. This can happen for the last
+        // packet or during an hflush/hsync call.
         System.arraycopy(buf, checksumStart, buf, 
                          dataStart - checksumLen , checksumLen); 
+        checksumPos = dataStart;
+        checksumStart = checksumPos - checksumLen;
       }
       
-      int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+      final int headerStart = checksumStart - header.getSerializedSize();
+      assert checksumStart + 1 >= header.getSerializedSize();
+      assert checksumPos == dataStart;
+      assert headerStart >= 0;
+      assert headerStart + header.getSerializedSize() == checksumStart;
       
-      //normally dataStart == checksumPos, i.e., offset is zero.
-      buffer = ByteBuffer.wrap(
-        buf, dataStart - checksumPos,
-        PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
-      buf = null;
-      buffer.mark();
-
-      PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-      header.putInBuffer(buffer);
+      // Copy the header data into the buffer immediately preceding the checksum
+      // data.
+      System.arraycopy(header.getBytes(), 0, buf, headerStart,
+          header.getSerializedSize());
       
-      buffer.reset();
-      return buffer;
+      // Write the now contiguous full packet to the output stream.
+      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
     }
     
     // get the packet's last byte's offset in the block
@@ -267,6 +262,7 @@ public class DFSOutputStream extends FSO
       return seqno == HEART_BEAT_SEQNO;
     }
     
+    @Override
     public String toString() {
       return "packet seqno:" + this.seqno +
       " offsetInBlock:" + this.offsetInBlock + 
@@ -395,8 +391,9 @@ public class DFSOutputStream extends FSO
      * streamer thread is the only thread that opens streams to datanode, 
      * and closes them. Any error recovery is also done by this thread.
      */
+    @Override
     public void run() {
-      long lastPacket = System.currentTimeMillis();
+      long lastPacket = Time.now();
       while (!streamerClosed && dfsClient.clientRunning) {
 
         // if the Responder encountered an error, shutdown Responder
@@ -406,6 +403,7 @@ public class DFSOutputStream extends FSO
             response.join();
             response = null;
           } catch (InterruptedException  e) {
+            DFSClient.LOG.warn("Caught exception ", e);
           }
         }
 
@@ -420,7 +418,7 @@ public class DFSOutputStream extends FSO
 
           synchronized (dataQueue) {
             // wait for a packet to be sent.
-            long now = System.currentTimeMillis();
+            long now = Time.now();
             while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                 && dataQueue.size() == 0 && 
                 (stage != BlockConstructionStage.DATA_STREAMING || 
@@ -433,16 +431,17 @@ public class DFSOutputStream extends FSO
               try {
                 dataQueue.wait(timeout);
               } catch (InterruptedException  e) {
+                DFSClient.LOG.warn("Caught exception ", e);
               }
               doSleep = false;
-              now = System.currentTimeMillis();
+              now = Time.now();
             }
             if (streamerClosed || hasError || !dfsClient.clientRunning) {
               continue;
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet();  // heartbeat packet
+              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -482,6 +481,7 @@ public class DFSOutputStream extends FSO
                   // wait for acks to arrive from datanodes
                   dataQueue.wait(1000);
                 } catch (InterruptedException  e) {
+                  DFSClient.LOG.warn("Caught exception ", e);
                 }
               }
             }
@@ -492,8 +492,6 @@ public class DFSOutputStream extends FSO
           }
           
           // send the packet
-          ByteBuffer buf = one.getBuffer();
-
           synchronized (dataQueue) {
             // move packet from dataQueue to ackQueue
             if (!one.isHeartbeatPacket()) {
@@ -509,16 +507,16 @@ public class DFSOutputStream extends FSO
           }
 
           // write out data to remote datanode
-          try {            
-            blockStream.write(buf.array(), buf.position(), buf.remaining());
+          try {
+            one.writeTo(blockStream);
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
-            // write to primary DN 
+            // write to primary DN
             errorIndex = 0;
             throw e;
           }
-          lastPacket = System.currentTimeMillis();
+          lastPacket = Time.now();
           
           if (one.isHeartbeatPacket()) {  //heartbeat packet
           }
@@ -603,6 +601,7 @@ public class DFSOutputStream extends FSO
           response.close();
           response.join();
         } catch (InterruptedException  e) {
+          DFSClient.LOG.warn("Caught exception ", e);
         } finally {
           response = null;
         }
@@ -653,6 +652,7 @@ public class DFSOutputStream extends FSO
         this.targets = targets;
       }
 
+      @Override
       public void run() {
 
         setName("ResponseProcessor for block " + block);
@@ -861,16 +861,26 @@ public class DFSOutputStream extends FSO
       try {
         sock = createSocketForPipeline(src, 2, dfsClient);
         final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock, writeTimeout),
+        
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dfsClient.shouldEncryptData()) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
 
         //send the TRANSFER_BLOCK request
         new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
             targets);
+        out.flush();
 
         //ack
-        in = new DataInputStream(NetUtils.getInputStream(sock));
         BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
         if (SUCCESS != response.getStatus()) {
@@ -981,7 +991,7 @@ public class DFSOutputStream extends FSO
         errorIndex = -1;
         success = false;
 
-        long startTime = System.currentTimeMillis();
+        long startTime = Time.now();
         DatanodeInfo[] excluded = excludedNodes.toArray(
             new DatanodeInfo[excludedNodes.size()]);
         block = oldBlock;
@@ -1028,77 +1038,99 @@ public class DFSOutputStream extends FSO
       // persist blocks on namenode on next flush
       persistBlocks.set(true);
 
-      boolean result = false;
-      DataOutputStream out = null;
-      try {
-        assert null == s : "Previous socket unclosed";
-        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
-        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-
-        //
-        // Xmit header info to datanode
-        //
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(s, writeTimeout),
-            HdfsConstants.SMALL_BUFFER_SIZE));
-        
-        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
-        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-        // send the request
-        new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
-            nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-            nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
-
-        // receive ack for connect
-        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            HdfsProtoUtil.vintPrefixed(blockReplyStream));
-        pipelineStatus = resp.getStatus();
-        firstBadLink = resp.getFirstBadLink();
-        
-        if (pipelineStatus != SUCCESS) {
-          if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
-            throw new InvalidBlockTokenException(
-                "Got access token error for connect ack with firstBadLink as "
-                    + firstBadLink);
-          } else {
-            throw new IOException("Bad connect ack with firstBadLink as "
-                + firstBadLink);
+      int refetchEncryptionKey = 1;
+      while (true) {
+        boolean result = false;
+        DataOutputStream out = null;
+        try {
+          assert null == s : "Previous socket unclosed";
+          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+          
+          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+          InputStream unbufIn = NetUtils.getInputStream(s);
+          if (dfsClient.shouldEncryptData()) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(unbufOut,
+                    unbufIn, dfsClient.getDataEncryptionKey());
+            unbufOut = encryptedStreams.out;
+            unbufIn = encryptedStreams.in;
+          }
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          blockReplyStream = new DataInputStream(unbufIn);
+  
+          //
+          // Xmit header info to datanode
+          //
+  
+          // send the request
+          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
+              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+  
+          // receive ack for connect
+          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(blockReplyStream));
+          pipelineStatus = resp.getStatus();
+          firstBadLink = resp.getFirstBadLink();
+          
+          if (pipelineStatus != SUCCESS) {
+            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException(
+                  "Got access token error for connect ack with firstBadLink as "
+                      + firstBadLink);
+            } else {
+              throw new IOException("Bad connect ack with firstBadLink as "
+                  + firstBadLink);
+            }
           }
-        }
-        assert null == blockStream : "Previous blockStream unclosed";
-        blockStream = out;
-        result =  true; // success
-
-      } catch (IOException ie) {
-
-        DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].getXferAddr().equals(firstBadLink)) {
-              errorIndex = i;
-              break;
+          assert null == blockStream : "Previous blockStream unclosed";
+          blockStream = out;
+          result =  true; // success
+  
+        } catch (IOException ie) {
+          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+                + "encryption key was invalid when connecting to "
+                + nodes[0] + " : " + ie);
+            // The encryption key used is invalid.
+            refetchEncryptionKey--;
+            dfsClient.clearDataEncryptionKey();
+            // Don't close the socket/exclude this node just yet. Try again with
+            // a new encryption key.
+            continue;
+          }
+  
+          // find the datanode that matches
+          if (firstBadLink.length() != 0) {
+            for (int i = 0; i < nodes.length; i++) {
+              // NB: Unconditionally using the xfer addr w/o hostname
+              if (firstBadLink.equals(nodes[i].getXferAddr())) {
+                errorIndex = i;
+                break;
+              }
             }
+          } else {
+            errorIndex = 0;
+          }
+          hasError = true;
+          setLastException(ie);
+          result =  false;  // error
+        } finally {
+          if (!result) {
+            IOUtils.closeSocket(s);
+            s = null;
+            IOUtils.closeStream(out);
+            out = null;
+            IOUtils.closeStream(blockReplyStream);
+            blockReplyStream = null;
           }
-        } else {
-          errorIndex = 0;
-        }
-        hasError = true;
-        setLastException(ie);
-        result =  false;  // error
-      } finally {
-        if (!result) {
-          IOUtils.closeSocket(s);
-          s = null;
-          IOUtils.closeStream(out);
-          out = null;
-          IOUtils.closeStream(blockReplyStream);
-          blockReplyStream = null;
         }
+        return result;
       }
-      return result;
     }
 
     private LocatedBlock locateFollowingBlock(long start,
@@ -1107,7 +1139,7 @@ public class DFSOutputStream extends FSO
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       long sleeptime = 400;
       while (true) {
-        long localstart = System.currentTimeMillis();
+        long localstart = Time.now();
         while (true) {
           try {
             return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes);
@@ -1130,9 +1162,9 @@ public class DFSOutputStream extends FSO
               } else {
                 --retries;
                 DFSClient.LOG.info("Exception while adding a block", e);
-                if (System.currentTimeMillis() - localstart > 5000) {
+                if (Time.now() - localstart > 5000) {
                   DFSClient.LOG.info("Waiting for replication for "
-                      + (System.currentTimeMillis() - localstart) / 1000
+                      + (Time.now() - localstart) / 1000
                       + " seconds");
                 }
                 try {
@@ -1141,6 +1173,7 @@ public class DFSOutputStream extends FSO
                   Thread.sleep(sleeptime);
                   sleeptime *= 2;
                 } catch (InterruptedException ie) {
+                  DFSClient.LOG.warn("Caught exception ", ie);
                 }
               }
             } else {
@@ -1180,11 +1213,11 @@ public class DFSOutputStream extends FSO
    */
   static Socket createSocketForPipeline(final DatanodeInfo first,
       final int length, final DFSClient client) throws IOException {
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + first);
+    final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
     }
-    final InetSocketAddress isa =
-      NetUtils.createSocketAddr(first.getXferAddr());
+    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
     final Socket sock = client.socketFactory.createSocket();
     final int timeout = client.getDatanodeReadTimeout(length);
     NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
@@ -1206,7 +1239,8 @@ public class DFSOutputStream extends FSO
   //
   // returns the list of targets, if any, that is being currently used.
   //
-  synchronized DatanodeInfo[] getPipeline() {
+  @VisibleForTesting
+  public synchronized DatanodeInfo[] getPipeline() {
     if (streamer == null) {
       return null;
     }
@@ -1315,9 +1349,8 @@ public class DFSOutputStream extends FSO
 
   private void computePacketChunkSize(int psize, int csize) {
     int chunkSize = csize + checksum.getChecksumSize();
-    int n = PacketHeader.PKT_HEADER_LEN;
-    chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
-    packetSize = n + chunkSize*chunksPerPacket;
+    chunksPerPacket = Math.max(psize/chunkSize, 1);
+    packetSize = chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
                 ", chunkSize=" + chunkSize +
@@ -1384,7 +1417,7 @@ public class DFSOutputStream extends FSO
 
     if (currentPacket == null) {
       currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock);
+          bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1431,8 +1464,8 @@ public class DFSOutputStream extends FSO
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1504,7 +1537,7 @@ public class DFSOutputStream extends FSO
             // but sync was requested.
             // Send an empty packet
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           }
         } else {
           // We already flushed up to this offset.
@@ -1521,7 +1554,7 @@ public class DFSOutputStream extends FSO
             // and sync was requested.
             // So send an empty sync packet.
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -1658,6 +1691,7 @@ public class DFSOutputStream extends FSO
     streamer.setLastException(new IOException("Lease timeout of " +
                              (dfsClient.hdfsTimeout/1000) + " seconds expired."));
     closeThreads(true);
+    dfsClient.endFileLease(src);
   }
 
   // shutdown datastreamer and responseprocessor threads.
@@ -1701,8 +1735,8 @@ public class DFSOutputStream extends FSO
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -1712,7 +1746,7 @@ public class DFSOutputStream extends FSO
       ExtendedBlock lastBlock = streamer.getBlock();
       closeThreads(false);
       completeFile(lastBlock);
-      dfsClient.leaserenewer.closeFile(src, dfsClient);
+      dfsClient.endFileLease(src);
     } finally {
       closed = true;
     }
@@ -1721,14 +1755,14 @@ public class DFSOutputStream extends FSO
   // should be called holding (this) lock since setTestFilename() may 
   // be called during unit tests
   private void completeFile(ExtendedBlock last) throws IOException {
-    long localstart = System.currentTimeMillis();
+    long localstart = Time.now();
     boolean fileComplete = false;
     while (!fileComplete) {
       fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last);
       if (!fileComplete) {
         if (!dfsClient.clientRunning ||
               (dfsClient.hdfsTimeout > 0 &&
-               localstart + dfsClient.hdfsTimeout < System.currentTimeMillis())) {
+               localstart + dfsClient.hdfsTimeout < Time.now())) {
             String msg = "Unable to close file because dfsclient " +
                           " was unable to contact the HDFS servers." +
                           " clientRunning " + dfsClient.clientRunning +
@@ -1738,23 +1772,25 @@ public class DFSOutputStream extends FSO
         }
         try {
           Thread.sleep(400);
-          if (System.currentTimeMillis() - localstart > 5000) {
+          if (Time.now() - localstart > 5000) {
             DFSClient.LOG.info("Could not complete file " + src + " retrying...");
           }
         } catch (InterruptedException ie) {
+          DFSClient.LOG.warn("Caught exception ", ie);
         }
       }
     }
   }
 
-  void setArtificialSlowdown(long period) {
+  @VisibleForTesting
+  public void setArtificialSlowdown(long period) {
     artificialSlowdown = period;
   }
 
-  synchronized void setChunksPerPacket(int value) {
+  @VisibleForTesting
+  public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = PacketHeader.PKT_HEADER_LEN +
-                 (checksum.getBytesPerChecksum() + 
+    packetSize = (checksum.getBytesPerChecksum() + 
                   checksum.getChecksumSize()) * chunksPerPacket;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Oct 19 02:25:55 2012
@@ -18,8 +18,21 @@
 
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+
 import java.io.IOException;
+import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -33,10 +46,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.StringTokenizer;
 
 import javax.net.SocketFactory;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -55,11 +75,13 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
@@ -106,10 +128,48 @@ public class DFSUtil {
           a.isDecommissioned() ? 1 : -1;
       }
     };
+    
+      
+  /**
+   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
+   * Decommissioned/stale nodes are moved to the end of the array on sorting
+   * with this compartor.
+   */ 
+  @InterfaceAudience.Private 
+  public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+    private long staleInterval;
+
+    /**
+     * Constructor of DecomStaleComparator
+     * 
+     * @param interval
+     *          The time invertal for marking datanodes as stale is passed from
+     *          outside, since the interval may be changed dynamically
+     */
+    public DecomStaleComparator(long interval) {
+      this.staleInterval = interval;
+    }
+
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Decommissioned nodes will still be moved to the end of the list
+      if (a.isDecommissioned()) {
+        return b.isDecommissioned() ? 0 : 1;
+      } else if (b.isDecommissioned()) {
+        return -1;
+      }
+      // Stale nodes will be moved behind the normal nodes
+      boolean aStale = a.isStale(staleInterval);
+      boolean bStale = b.isStale(staleInterval);
+      return aStale == bStale ? 0 : (aStale ? 1 : -1);
+    }
+  }    
+    
   /**
    * Address matcher for matching an address to local address
    */
   static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() {
+    @Override
     public boolean match(InetSocketAddress s) {
       return NetUtils.isLocalAddress(s.getAddress());
     };
@@ -117,7 +177,7 @@ public class DFSUtil {
   
   /**
    * Whether the pathname is valid.  Currently prohibits relative paths, 
-   * and names which contain a ":" or "/" 
+   * names which contain a ":" or "//", or other non-canonical paths.
    */
   public static boolean isValidName(String src) {
     // Path must be absolute.
@@ -126,15 +186,22 @@ public class DFSUtil {
     }
       
     // Check for ".." "." ":" "/"
-    StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
-    while(tokens.hasMoreTokens()) {
-      String element = tokens.nextToken();
+    String[] components = StringUtils.split(src, '/');
+    for (int i = 0; i < components.length; i++) {
+      String element = components[i];
       if (element.equals("..") || 
           element.equals(".")  ||
           (element.indexOf(":") >= 0)  ||
           (element.indexOf("/") >= 0)) {
         return false;
       }
+      
+      // The string may start or end with a /, but not have
+      // "//" in the middle.
+      if (element.isEmpty() && i != components.length - 1 &&
+          i != 0) {
+        return false;
+      }
     }
     return true;
   }
@@ -254,13 +321,25 @@ public class DFSUtil {
     if (blocks == null) {
       return new BlockLocation[0];
     }
-    int nrBlocks = blocks.locatedBlockCount();
+    return locatedBlocks2Locations(blocks.getLocatedBlocks());
+  }
+  
+  /**
+   * Convert a List<LocatedBlock> to BlockLocation[]
+   * @param blocks A List<LocatedBlock> to be converted
+   * @return converted array of BlockLocation
+   */
+  public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
+    if (blocks == null) {
+      return new BlockLocation[0];
+    }
+    int nrBlocks = blocks.size();
     BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
     if (nrBlocks == 0) {
       return blkLocations;
     }
     int idx = 0;
-    for (LocatedBlock blk : blocks.getLocatedBlocks()) {
+    for (LocatedBlock blk : blocks) {
       assert idx < nrBlocks : "Incorrect index";
       DatanodeInfo[] locations = blk.getLocations();
       String[] hosts = new String[locations.length];
@@ -410,12 +489,39 @@ public class DFSUtil {
   }
 
   /**
+   * @return a collection of all configured NN Kerberos principals.
+   */
+  public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
+    Set<String> principals = new HashSet<String>();
+    for (String nsId : DFSUtil.getNameServiceIds(conf)) {
+      if (HAUtil.isHAEnabled(conf, nsId)) {
+        for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
+          Configuration confForNn = new Configuration(conf);
+          NameNode.initializeGenericKeys(confForNn, nsId, nnId);
+          String principal = SecurityUtil.getServerPrincipal(confForNn
+              .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+              NameNode.getAddress(confForNn).getHostName());
+          principals.add(principal);
+        }
+      } else {
+        Configuration confForNn = new Configuration(conf);
+        NameNode.initializeGenericKeys(confForNn, nsId, null);
+        String principal = SecurityUtil.getServerPrincipal(confForNn
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+            NameNode.getAddress(confForNn).getHostName());
+        principals.add(principal);
+      }
+    }
+
+    return principals;
+  }
+
+  /**
    * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
    * the configuration.
    * 
    * @param conf configuration
    * @return list of InetSocketAddresses
-   * @throws IOException if no addresses are configured
    */
   public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
       Configuration conf) {
@@ -832,17 +938,17 @@ public class DFSUtil {
   /** Create a {@link ClientDatanodeProtocol} proxy */
   public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
       DatanodeID datanodeid, Configuration conf, int socketTimeout,
-      LocatedBlock locatedBlock) throws IOException {
+      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
     return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
-             locatedBlock);
+        connectToDnViaHostname, locatedBlock);
   }
   
   /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf, int socketTimeout)
-      throws IOException {
+      DatanodeID datanodeid, Configuration conf, int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
     return new ClientDatanodeProtocolTranslatorPB(
-        datanodeid, conf, socketTimeout);
+        datanodeid, conf, socketTimeout, connectToDnViaHostname);
   }
   
   /** Create a {@link ClientDatanodeProtocol} proxy */
@@ -1064,4 +1170,82 @@ public class DFSUtil {
       return null;
     }
   }
+  
+  public static Options helpOptions = new Options();
+  public static Option helpOpt = new Option("h", "help", false,
+      "get help information");
+
+  static {
+    helpOptions.addOption(helpOpt);
+  }
+
+  /**
+   * Parse the arguments for commands
+   * 
+   * @param args the argument to be parsed
+   * @param helpDescription help information to be printed out
+   * @param out Printer
+   * @param printGenericCommandUsage whether to print the 
+   *              generic command usage defined in ToolRunner
+   * @return true when the argument matches help option, false if not
+   */
+  public static boolean parseHelpArgument(String[] args,
+      String helpDescription, PrintStream out, boolean printGenericCommandUsage) {
+    if (args.length == 1) {
+      try {
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = parser.parse(helpOptions, args);
+        if (cmdLine.hasOption(helpOpt.getOpt())
+            || cmdLine.hasOption(helpOpt.getLongOpt())) {
+          // should print out the help information
+          out.println(helpDescription + "\n");
+          if (printGenericCommandUsage) {
+            ToolRunner.printGenericCommandUsage(out);
+          }
+          return true;
+        }
+      } catch (ParseException pe) {
+        return false;
+      }
+    }
+    return false;
+  }
+  
+  /**
+   * Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration.
+   * 
+   * @param conf Configuration
+   * @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION
+   */
+  public static float getInvalidateWorkPctPerIteration(Configuration conf) {
+    float blocksInvalidateWorkPct = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
+        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
+    Preconditions.checkArgument(
+        (blocksInvalidateWorkPct > 0 && blocksInvalidateWorkPct <= 1.0f),
+        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION +
+        " = '" + blocksInvalidateWorkPct + "' is invalid. " +
+        "It should be a positive, non-zero float value, not greater than 1.0f, " +
+        "to indicate a percentage.");
+    return blocksInvalidateWorkPct;
+  }
+
+  /**
+   * Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION from
+   * configuration.
+   * 
+   * @param conf Configuration
+   * @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+   */
+  public static int getReplWorkMultiplier(Configuration conf) {
+    int blocksReplWorkMultiplier = conf.getInt(
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+    Preconditions.checkArgument(
+        (blocksReplWorkMultiplier > 0),
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION +
+        " = '" + blocksReplWorkMultiplier + "' is invalid. " +
+        "It should be a positive, non-zero integer value.");
+    return blocksReplWorkMultiplier;
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Oct 19 02:25:55 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -52,12 +55,11 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
@@ -189,6 +191,36 @@ public class DistributedFileSystem exten
 
   }
 
+  /**
+   * Used to query storage location information for a list of blocks. This list
+   * of blocks is normally constructed via a series of calls to
+   * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
+   * get the blocks for ranges of a file.
+   * 
+   * The returned array of {@link BlockStorageLocation} augments
+   * {@link BlockLocation} with a {@link VolumeId} per block replica. The
+   * VolumeId specifies the volume on the datanode on which the replica resides.
+   * The VolumeId has to be checked via {@link VolumeId#isValid()} before being
+   * used because volume information can be unavailable if the corresponding
+   * datanode is down or if the requested block is not found.
+   * 
+   * This API is unstable, and datanode-side support is disabled by default. It
+   * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
+   * true.
+   * 
+   * @param blocks
+   *          List of target BlockLocations to query volume location information
+   * @return volumeBlockLocations Augmented array of
+   *         {@link BlockStorageLocation}s containing additional volume location
+   *         information for each replica of each block.
+   */
+  @InterfaceStability.Unstable
+  public BlockStorageLocation[] getFileBlockStorageLocations(
+      List<BlockLocation> blocks) throws IOException, 
+      UnsupportedOperationException, InvalidBlockTokenException {
+    return dfs.getBlockStorageLocations(blocks);
+  }
+
   @Override
   public void setVerifyChecksum(boolean verifyChecksum) {
     this.verifyChecksum = verifyChecksum;
@@ -225,19 +257,19 @@ public class DistributedFileSystem exten
   public HdfsDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
-    return create(f, permission,
+    return this.create(f, permission,
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
-        blockSize, progress);
+        blockSize, progress, null);
   }
   
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,
     EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
-    Progressable progress) throws IOException {
+    Progressable progress, ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
     final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
-        replication, blockSize, progress, bufferSize);
+        replication, blockSize, progress, bufferSize, checksumOpt);
     return new HdfsDataOutputStream(out, statistics);
   }
   
@@ -246,11 +278,11 @@ public class DistributedFileSystem exten
   protected HdfsDataOutputStream primitiveCreate(Path f,
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
-    int bytesPerChecksum) throws IOException {
-    statistics.incrementReadOps(1);
+    ChecksumOpt checksumOpt) throws IOException {
+    statistics.incrementWriteOps(1);
     return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
         absolutePermission, flag, true, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum),statistics);
+        progress, bufferSize, checksumOpt),statistics);
    } 
 
   /**
@@ -265,7 +297,8 @@ public class DistributedFileSystem exten
       flag.add(CreateFlag.CREATE);
     }
     return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
-        false, replication, blockSize, progress, bufferSize), statistics);
+        false, replication, blockSize, progress, 
+        bufferSize, null), statistics);
   }
 
   @Override
@@ -502,10 +535,10 @@ public class DistributedFileSystem exten
   @Override
   public void close() throws IOException {
     try {
-      super.processDeleteOnExit();
-      dfs.close();
-    } finally {
+      dfs.closeOutputStreams(false);
       super.close();
+    } finally {
+      dfs.close();
     }
   }
 
@@ -591,6 +624,16 @@ public class DistributedFileSystem exten
   public void saveNamespace() throws AccessControlException, IOException {
     dfs.saveNamespace();
   }
+  
+  /**
+   * Rolls the edit log on the active NameNode.
+   * Requires super-user privileges.
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+   * @return the transaction ID of the newly created segment
+   */
+  public long rollEdits() throws AccessControlException, IOException {
+    return dfs.rollEdits();
+  }
 
   /**
    * enable/disable/check restoreFaileStorage
@@ -619,11 +662,6 @@ public class DistributedFileSystem exten
     dfs.finalizeUpgrade();
   }
 
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
-  ) throws IOException {
-    return dfs.distributedUpgradeProgress(action);
-  }
-
   /*
    * Requests the namenode to dump data strcutures into specified 
    * file.
@@ -765,14 +803,6 @@ public class DistributedFileSystem exten
     return getDelegationToken(renewer.toString());
   }
   
-  @Override // FileSystem
-  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
-    List<Token<?>> tokenList = new ArrayList<Token<?>>();
-    Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
-    tokenList.add(token);
-    return tokenList;
-  }
-
   /**
    * Renew an existing delegation token.
    * 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.ZKFCProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
         HAServiceProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Fri Oct 19 02:25:55 2012
@@ -105,4 +105,9 @@ public class HdfsConfiguration extends C
     deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
     deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
   }
+
+  public static void main(String[] args) {
+    init();
+    Configuration.dumpDeprecatedKeys();
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Oct 19 02:25:55 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -113,6 +114,7 @@ public class HftpFileSystem extends File
 
   protected static final ThreadLocal<SimpleDateFormat> df =
     new ThreadLocal<SimpleDateFormat>() {
+    @Override
     protected SimpleDateFormat initialValue() {
       return getDateFormat();
     }
@@ -240,19 +242,22 @@ public class HftpFileSystem extends File
       //Renew TGT if needed
       ugi.reloginFromKeytab();
       return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+        @Override
         public Token<?> run() throws IOException {
           final String nnHttpUrl = nnSecureUri.toString();
           Credentials c;
           try {
             c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
-          } catch (Exception e) {
-            LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
-            " using http.");
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("error was ", e);
+          } catch (IOException e) {
+            if (e.getCause() instanceof ConnectException) {
+              LOG.warn("Couldn't connect to " + nnHttpUrl +
+                  ", assuming security is disabled");
+              return null;
             }
-            //Maybe the server is in unsecure mode (that's bad but okay)
-            return null;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception getting delegation token", e);
+            }
+            throw e;
           }
           for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
             if(LOG.isDebugEnabled()) {
@@ -340,19 +345,28 @@ public class HftpFileSystem extends File
       super(url);
     }
 
-    @Override
     protected HttpURLConnection openConnection() throws IOException {
       return (HttpURLConnection)URLUtils.openConnection(url);
     }
 
     /** Use HTTP Range header for specifying offset. */
     @Override
-    protected HttpURLConnection openConnection(final long offset) throws IOException {
+    protected HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException {
       final HttpURLConnection conn = openConnection();
       conn.setRequestMethod("GET");
       if (offset != 0L) {
         conn.setRequestProperty("Range", "bytes=" + offset + "-");
       }
+      conn.connect();
+
+      //Expects HTTP_OK or HTTP_PARTIAL response codes. 
+      final int code = conn.getResponseCode();
+      if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
+        throw new IOException("HTTP_PARTIAL expected, received " + code);
+      } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
+        throw new IOException("HTTP_OK expected, received " + code);
+      }
       return conn;
     }  
   }
@@ -366,22 +380,6 @@ public class HftpFileSystem extends File
       this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
     }
 
-    /** Expects HTTP_OK and HTTP_PARTIAL response codes. */
-    @Override
-    protected void checkResponseCode(final HttpURLConnection connection
-        ) throws IOException {
-      final int code = connection.getResponseCode();
-      if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
-        // We asked for a byte range but did not receive a partial content
-        // response...
-        throw new IOException("HTTP_PARTIAL expected, received " + code);
-      } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
-        // We asked for all bytes from the beginning but didn't receive a 200
-        // response (none of the other 2xx codes are valid here)
-        throw new IOException("HTTP_OK expected, received " + code);
-      }
-    }
-
     @Override
     protected URL getResolvedUrl(final HttpURLConnection connection) {
       return connection.getURL();
@@ -402,6 +400,7 @@ public class HftpFileSystem extends File
 
     ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
 
+    @Override
     public void startElement(String ns, String localname, String qname,
                 Attributes attrs) throws SAXException {
       if ("listing".equals(qname)) return;
@@ -541,6 +540,7 @@ public class HftpFileSystem extends File
   public void setWorkingDirectory(Path f) { }
 
   /** This optional operation is not yet supported. */
+  @Override
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
     throw new IOException("Not supported");

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Fri Oct 19 02:25:55 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.KeyStore;
 import java.security.cert.X509Certificate;
@@ -42,6 +41,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.web.URLUtils;
+import org.apache.hadoop.util.Time;
 
 /**
  * An implementation of a protocol for accessing filesystems over HTTPS. The
@@ -164,8 +164,7 @@ public class HsftpFileSystem extends Hft
     final int warnDays = ExpWarnDays;
     if (warnDays > 0) { // make sure only check once
       ExpWarnDays = 0;
-      long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY
-          + System.currentTimeMillis();
+      long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY + Time.now();
       X509Certificate[] clientCerts = (X509Certificate[]) conn
           .getLocalCertificates();
       if (clientCerts != null) {
@@ -175,7 +174,7 @@ public class HsftpFileSystem extends Hft
             StringBuilder sb = new StringBuilder();
             sb.append("\n Client certificate "
                 + cert.getSubjectX500Principal().getName());
-            int dayOffSet = (int) ((expTime - System.currentTimeMillis()) / MM_SECONDS_PER_DAY);
+            int dayOffSet = (int) ((expTime - Time.now()) / MM_SECONDS_PER_DAY);
             sb.append(" have " + dayOffSet + " days to expire");
             LOG.warn(sb.toString());
           }
@@ -189,6 +188,7 @@ public class HsftpFileSystem extends Hft
    * Dummy hostname verifier that is used to bypass hostname checking
    */
   protected static class DummyHostnameVerifier implements HostnameVerifier {
+    @Override
     public boolean verify(String hostname, SSLSession session) {
       return true;
     }
@@ -198,12 +198,15 @@ public class HsftpFileSystem extends Hft
    * Dummy trustmanager that is used to trust all server certificates
    */
   protected static class DummyTrustManager implements X509TrustManager {
+    @Override
     public void checkClientTrusted(X509Certificate[] chain, String authType) {
     }
 
+    @Override
     public void checkServerTrusted(X509Certificate[] chain, String authType) {
     }
 
+    @Override
     public X509Certificate[] getAcceptedIssuers() {
       return null;
     }



Mime
View raw message