Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BBC2FD164 for ; Fri, 19 Oct 2012 02:29:49 +0000 (UTC) Received: (qmail 45283 invoked by uid 500); 19 Oct 2012 02:29:49 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 45244 invoked by uid 500); 19 Oct 2012 02:29:49 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 45235 invoked by uid 99); 19 Oct 2012 02:29:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 02:29:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 02:29:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 25A572388C79; Fri, 19 Oct 2012 02:28:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1399950 [7/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach... Date: Fri, 19 Oct 2012 02:28:07 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121019022842.25A572388C79@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 19 02:25:55 2012 @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.CommonConfig public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize"; - public static final long DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024; + public static final long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024; public static final String DFS_REPLICATION_KEY = "dfs.replication"; public static final short DFS_REPLICATION_DEFAULT = 3; public static final String DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size"; @@ -52,6 +52,14 @@ public class DFSConfigKeys extends Commo public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; + public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; + public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false; + public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled"; + public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; + public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout"; + public static final int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60; // HA related configuration public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; @@ -66,6 +74,8 @@ public class DFSConfigKeys extends Commo public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts"; public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0; + public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; + public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; @@ -74,13 +84,15 @@ public class DFSConfigKeys extends Commo public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; - public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0; + public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; + public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname"; + public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false; public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; @@ -150,6 +162,8 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2; public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained"; public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M + public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained"; + public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version"; public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; @@ -164,11 +178,30 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000; + + // Whether to enable datanode's stale state detection and usage + public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode"; + public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false; + // Whether to enable datanode's stale state detection and usage + public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode"; + public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false; + // The default value of the time interval for marking datanodes as stale + public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval"; + public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s + // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. + // This value uses the times of heartbeat interval to define the minimum value for stale interval. + public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval"; + public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s + + // When the number stale datanodes marked as stale reached this certian ratio, + // stop avoiding writing to stale nodes so as to prevent causing hotspots. + public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio"; + public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f; // Replication monitoring related keys public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION = "dfs.namenode.invalidate.work.pct.per.iteration"; - public static final int DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 32; + public static final float DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 0.32f; public static final String DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION = "dfs.namenode.replication.work.multiplier.per.iteration"; public static final int DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = 2; @@ -203,6 +236,7 @@ public class DFSConfigKeys extends Commo public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base"; public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id"; + public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname"; public static final String DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts"; public static final String DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude"; @@ -240,7 +274,7 @@ public class DFSConfigKeys extends Commo public static final String DFS_DATANODE_DU_RESERVED_KEY = "dfs.datanode.du.reserved"; public static final long DFS_DATANODE_DU_RESERVED_DEFAULT = 0; public static final String DFS_DATANODE_HANDLER_COUNT_KEY = "dfs.datanode.handler.count"; - public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 3; + public static final int DFS_DATANODE_HANDLER_COUNT_DEFAULT = 10; public static final String DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address"; public static final int DFS_DATANODE_HTTP_DEFAULT_PORT = 50075; public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT; @@ -318,6 +352,10 @@ public class DFSConfigKeys extends Commo "dfs.image.transfer.bandwidthPerSec"; public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling + // Image transfer timeout + public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout"; + public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000; + //Keys with no defaults public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins"; public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory"; @@ -366,4 +404,47 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false; public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port"; public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019; + + // Security-related configs + public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; + public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; + public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; + + // Journal-node related configs. These are read on the JN side. + public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; + public static final String DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/"; + public static final String DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address"; + public static final int DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485; + public static final String DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT; + + public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address"; + public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480; + public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT; + + public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file"; + public static final String DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal"; + public static final String DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal"; + + // Journal-node related configs for the client side. + public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb"; + public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10; + + // Quorum-journal timeouts for various operations. Unlikely to need + // to be tweaked, but configurable just in case. + public static final String DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms"; + public static final String DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms"; + public static final String DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms"; + public static final String DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms"; + public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms"; + public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms"; + public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms"; + public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms"; + public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000; + public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000; + public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Oct 19 02:25:55 2012 @@ -37,11 +37,14 @@ import org.apache.hadoop.fs.ChecksumExce import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.SocketCache.SocketAndStreams; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; @@ -196,7 +199,8 @@ public class DFSInputStream extends FSIn try { cdp = DFSUtil.createClientDatanodeProtocolProxy( - datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock); + datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, + dfsClient.getConf().connectToDnViaHostname, locatedblock); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); @@ -239,6 +243,10 @@ public class DFSInputStream extends FSIn locatedBlocks.getFileLength() + lastBlockBeingWrittenLength; } + private synchronized boolean blockUnderConstruction() { + return locatedBlocks.isUnderConstruction(); + } + /** * Returns the datanode from which the stream is currently reading. */ @@ -425,6 +433,7 @@ public class DFSInputStream extends FSIn // DatanodeInfo chosenNode = null; int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once boolean connectFailedOnce = false; @@ -452,7 +461,14 @@ public class DFSInputStream extends FSIn } return chosenNode; } catch (IOException ex) { - if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { + if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + ex); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will fetch a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + ex); @@ -705,8 +721,12 @@ public class DFSInputStream extends FSIn DatanodeInfo[] nodes = block.getLocations(); try { DatanodeInfo chosenNode = bestNode(nodes, deadNodes); - InetSocketAddress targetAddr = - NetUtils.createSocketAddr(chosenNode.getXferAddr()); + final String dnAddr = + chosenNode.getXferAddr(dfsClient.connectToDnViaHostname()); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Connecting to datanode " + dnAddr); + } + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); return new DNAddrPair(chosenNode, targetAddr); } catch (IOException ie) { String blockInfo = block.getBlock() + " file=" + src; @@ -754,6 +774,7 @@ public class DFSInputStream extends FSIn // Connect to best DataNode for desired Block, with potential offset // int refetchToken = 1; // only need to get a new access token once + int refetchEncryptionKey = 1; // only need to get a new encryption key once while (true) { // cached block locations may have been updated by chooseDataNode() @@ -789,7 +810,14 @@ public class DFSInputStream extends FSIn dfsClient.disableShortCircuit(); continue; } catch (IOException e) { - if (e instanceof InvalidBlockTokenException && refetchToken > 0) { + if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + targetAddr + + " : " + e); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) { DFSClient.LOG.info("Will get a new access token and retry, " + "access token was invalid when connecting to " + targetAddr + " : " + e); @@ -818,8 +846,9 @@ public class DFSInputStream extends FSIn */ private void closeBlockReader(BlockReader reader) throws IOException { if (reader.hasSentStatusCode()) { + IOStreamPair ioStreams = reader.getStreams(); Socket oldSock = reader.takeSocket(); - socketCache.put(oldSock); + socketCache.put(oldSock, ioStreams); } reader.close(); } @@ -853,9 +882,12 @@ public class DFSInputStream extends FSIn String clientName) throws IOException { - if (dfsClient.shouldTryShortCircuitRead(dnAddr)) { + // Can't local read a block under construction, see HDFS-2757 + if (dfsClient.shouldTryShortCircuitRead(dnAddr) && + !blockUnderConstruction()) { return DFSClient.getLocalBlockReader(dfsClient.conf, src, block, - blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset); + blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset, + dfsClient.connectToDnViaHostname()); } IOException err = null; @@ -864,14 +896,15 @@ public class DFSInputStream extends FSIn // Allow retry since there is no way of knowing whether the cached socket // is good until we actually use it. for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { - Socket sock = null; + SocketAndStreams sockAndStreams = null; // Don't use the cache on the last attempt - it's possible that there // are arbitrarily many unusable sockets in the cache, but we don't // want to fail the read. if (retries < nCachedConnRetry) { - sock = socketCache.get(dnAddr); + sockAndStreams = socketCache.get(dnAddr); } - if (sock == null) { + Socket sock; + if (sockAndStreams == null) { fromCache = false; sock = dfsClient.socketFactory.createSocket(); @@ -895,6 +928,8 @@ public class DFSInputStream extends FSIn dfsClient.getRandomLocalInterfaceAddr(), dfsClient.getConf().socketTimeout); sock.setSoTimeout(dfsClient.getConf().socketTimeout); + } else { + sock = sockAndStreams.sock; } try { @@ -905,12 +940,18 @@ public class DFSInputStream extends FSIn blockToken, startOffset, len, bufferSize, verifyChecksum, - clientName); + clientName, + dfsClient.getDataEncryptionKey(), + sockAndStreams == null ? null : sockAndStreams.ioStreams); return reader; } catch (IOException ex) { // Our socket is no good. DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); - sock.close(); + if (sockAndStreams != null) { + sockAndStreams.close(); + } else { + sock.close(); + } err = ex; } } @@ -1154,7 +1195,7 @@ public class DFSInputStream extends FSIn throw new IOException("No live nodes contain current block"); } - /** Utility class to encapsulate data node info and its ip address. */ + /** Utility class to encapsulate data node info and its address. */ static class DNAddrPair { DatanodeInfo info; InetSocketAddress addr; Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Oct 19 02:25:55 2012 @@ -24,11 +24,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -55,7 +56,10 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -74,6 +78,9 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; + +import com.google.common.annotations.VisibleForTesting; /**************************************************************** @@ -100,8 +107,8 @@ import org.apache.hadoop.util.Progressab ****************************************************************/ @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable { - private final DFSClient dfsClient; private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB + private final DFSClient dfsClient; private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; @@ -118,7 +125,7 @@ public class DFSOutputStream extends FSO private long lastQueuedSeqno = -1; private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes writen in current block - private int packetSize = 0; // write packet size, including the header. + private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; private volatile IOException lastException = null; private long artificialSlowdown = 0; @@ -131,80 +138,75 @@ public class DFSOutputStream extends FSO private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close - private class Packet { - long seqno; // sequencenumber of buffer in block - long offsetInBlock; // offset in block - private boolean lastPacketInBlock; // is this the last packet in block? - boolean syncBlock; // this packet forces the current block to disk - int numChunks; // number of chunks currently in packet - int maxChunks; // max chunks in packet - - /** buffer for accumulating packet checksum and data */ - ByteBuffer buffer; // wraps buf, only one of these two may be non-null + private static class Packet { + private static final long HEART_BEAT_SEQNO = -1L; + long seqno; // sequencenumber of buffer in block + final long offsetInBlock; // offset in block + boolean syncBlock; // this packet forces the current block to disk + int numChunks; // number of chunks currently in packet + final int maxChunks; // max chunks in packet byte[] buf; + private boolean lastPacketInBlock; // is this the last packet in block? /** * buf is pointed into like follows: * (C is checksum data, D is payload data) * - * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___] - * ^ ^ ^ ^ - * | checksumPos dataStart dataPos - * checksumStart + * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + * + * Right before sending, we move the checksum data to immediately precede + * the actual data, and then insert the header into the buffer immediately + * preceding the checksum data, so we make sure to keep enough space in + * front of the checksum data to support the largest conceivable header. */ int checksumStart; - int dataStart; - int dataPos; int checksumPos; - - private static final long HEART_BEAT_SEQNO = -1L; + final int dataStart; + int dataPos; /** - * create a heartbeat packet + * Create a heartbeat packet. */ - Packet() { - this.lastPacketInBlock = false; - this.numChunks = 0; - this.offsetInBlock = 0; - this.seqno = HEART_BEAT_SEQNO; - - buffer = null; - int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER; - buf = new byte[packetSize]; - - checksumStart = dataStart = packetSize; - checksumPos = checksumStart; - dataPos = dataStart; - maxChunks = 0; + Packet(int checksumSize) { + this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize); } - // create a new packet - Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { + /** + * Create a new packet. + * + * @param pktSize maximum size of the packet, + * including checksum data and actual data. + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + */ + Packet(int pktSize, int chunksPerPkt, long offsetInBlock, + long seqno, int checksumSize) { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = offsetInBlock; - this.seqno = currentSeqno; - currentSeqno++; + this.seqno = seqno; - buffer = null; - buf = new byte[pktSize]; + buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; - checksumStart = PacketHeader.PKT_HEADER_LEN; + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; - dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize(); + dataStart = checksumStart + (chunksPerPkt * checksumSize); dataPos = dataStart; maxChunks = chunksPerPkt; } void writeData(byte[] inarray, int off, int len) { - if ( dataPos + len > buf.length) { + if (dataPos + len > buf.length) { throw new BufferOverflowException(); } System.arraycopy(inarray, off, buf, dataPos, len); dataPos += len; } - void writeChecksum(byte[] inarray, int off, int len) { + void writeChecksum(byte[] inarray, int off, int len) { if (checksumPos + len > dataStart) { throw new BufferOverflowException(); } @@ -213,45 +215,38 @@ public class DFSOutputStream extends FSO } /** - * Returns ByteBuffer that contains one full packet, including header. + * Write the full packet, including the header, to the given output stream. */ - ByteBuffer getBuffer() { - /* Once this is called, no more data can be added to the packet. - * setting 'buf' to null ensures that. - * This is called only when the packet is ready to be sent. - */ - if (buffer != null) { - return buffer; - } - - //prepare the header and close any gap between checksum and data. - - int dataLen = dataPos - dataStart; - int checksumLen = checksumPos - checksumStart; + void writeTo(DataOutputStream stm) throws IOException { + final int dataLen = dataPos - dataStart; + final int checksumLen = checksumPos - checksumStart; + final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; + + PacketHeader header = new PacketHeader( + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); if (checksumPos != dataStart) { - /* move the checksum to cover the gap. - * This can happen for the last packet. - */ + // Move the checksum to cover the gap. This can happen for the last + // packet or during an hflush/hsync call. System.arraycopy(buf, checksumStart, buf, dataStart - checksumLen , checksumLen); + checksumPos = dataStart; + checksumStart = checksumPos - checksumLen; } - int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; + final int headerStart = checksumStart - header.getSerializedSize(); + assert checksumStart + 1 >= header.getSerializedSize(); + assert checksumPos == dataStart; + assert headerStart >= 0; + assert headerStart + header.getSerializedSize() == checksumStart; - //normally dataStart == checksumPos, i.e., offset is zero. - buffer = ByteBuffer.wrap( - buf, dataStart - checksumPos, - PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER); - buf = null; - buffer.mark(); - - PacketHeader header = new PacketHeader( - pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); - header.putInBuffer(buffer); + // Copy the header data into the buffer immediately preceding the checksum + // data. + System.arraycopy(header.getBytes(), 0, buf, headerStart, + header.getSerializedSize()); - buffer.reset(); - return buffer; + // Write the now contiguous full packet to the output stream. + stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); } // get the packet's last byte's offset in the block @@ -267,6 +262,7 @@ public class DFSOutputStream extends FSO return seqno == HEART_BEAT_SEQNO; } + @Override public String toString() { return "packet seqno:" + this.seqno + " offsetInBlock:" + this.offsetInBlock + @@ -395,8 +391,9 @@ public class DFSOutputStream extends FSO * streamer thread is the only thread that opens streams to datanode, * and closes them. Any error recovery is also done by this thread. */ + @Override public void run() { - long lastPacket = System.currentTimeMillis(); + long lastPacket = Time.now(); while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder @@ -406,6 +403,7 @@ public class DFSOutputStream extends FSO response.join(); response = null; } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } @@ -420,7 +418,7 @@ public class DFSOutputStream extends FSO synchronized (dataQueue) { // wait for a packet to be sent. - long now = System.currentTimeMillis(); + long now = Time.now(); while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || @@ -433,16 +431,17 @@ public class DFSOutputStream extends FSO try { dataQueue.wait(timeout); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; - now = System.currentTimeMillis(); + now = Time.now(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { - one = new Packet(); // heartbeat packet + one = new Packet(checksum.getChecksumSize()); // heartbeat packet } else { one = dataQueue.getFirst(); // regular data packet } @@ -482,6 +481,7 @@ public class DFSOutputStream extends FSO // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } } @@ -492,8 +492,6 @@ public class DFSOutputStream extends FSO } // send the packet - ByteBuffer buf = one.getBuffer(); - synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { @@ -509,16 +507,16 @@ public class DFSOutputStream extends FSO } // write out data to remote datanode - try { - blockStream.write(buf.array(), buf.position(), buf.remaining()); + try { + one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to - // write to primary DN + // write to primary DN errorIndex = 0; throw e; } - lastPacket = System.currentTimeMillis(); + lastPacket = Time.now(); if (one.isHeartbeatPacket()) { //heartbeat packet } @@ -603,6 +601,7 @@ public class DFSOutputStream extends FSO response.close(); response.join(); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } finally { response = null; } @@ -653,6 +652,7 @@ public class DFSOutputStream extends FSO this.targets = targets; } + @Override public void run() { setName("ResponseProcessor for block " + block); @@ -861,16 +861,26 @@ public class DFSOutputStream extends FSO try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, writeTimeout), + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); //send the TRANSFER_BLOCK request new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, targets); + out.flush(); //ack - in = new DataInputStream(NetUtils.getInputStream(sock)); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in)); if (SUCCESS != response.getStatus()) { @@ -981,7 +991,7 @@ public class DFSOutputStream extends FSO errorIndex = -1; success = false; - long startTime = System.currentTimeMillis(); + long startTime = Time.now(); DatanodeInfo[] excluded = excludedNodes.toArray( new DatanodeInfo[excludedNodes.size()]); block = oldBlock; @@ -1028,77 +1038,99 @@ public class DFSOutputStream extends FSO // persist blocks on namenode on next flush persistBlocks.set(true); - boolean result = false; - DataOutputStream out = null; - try { - assert null == s : "Previous socket unclosed"; - s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); - long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); - - // - // Xmit header info to datanode - // - out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(s, writeTimeout), - HdfsConstants.SMALL_BUFFER_SIZE)); - - assert null == blockReplyStream : "Previous blockReplyStream unclosed"; - blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); - - // send the request - new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, - nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, - nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); - - // receive ack for connect - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); - firstBadLink = resp.getFirstBadLink(); - - if (pipelineStatus != SUCCESS) { - if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException( - "Got access token error for connect ack with firstBadLink as " - + firstBadLink); - } else { - throw new IOException("Bad connect ack with firstBadLink as " - + firstBadLink); + int refetchEncryptionKey = 1; + while (true) { + boolean result = false; + DataOutputStream out = null; + try { + assert null == s : "Previous socket unclosed"; + assert null == blockReplyStream : "Previous blockReplyStream unclosed"; + s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); + long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); + + OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(s); + if (dfsClient.shouldEncryptData()) { + IOStreamPair encryptedStreams = + DataTransferEncryptor.getEncryptedStreams(unbufOut, + unbufIn, dfsClient.getDataEncryptionKey()); + unbufOut = encryptedStreams.out; + unbufIn = encryptedStreams.in; + } + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + blockReplyStream = new DataInputStream(unbufIn); + + // + // Xmit header info to datanode + // + + // send the request + new Sender(out).writeBlock(block, accessToken, dfsClient.clientName, + nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, + nodes.length, block.getNumBytes(), bytesSent, newGS, checksum); + + // receive ack for connect + BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( + HdfsProtoUtil.vintPrefixed(blockReplyStream)); + pipelineStatus = resp.getStatus(); + firstBadLink = resp.getFirstBadLink(); + + if (pipelineStatus != SUCCESS) { + if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException( + "Got access token error for connect ack with firstBadLink as " + + firstBadLink); + } else { + throw new IOException("Bad connect ack with firstBadLink as " + + firstBadLink); + } } - } - assert null == blockStream : "Previous blockStream unclosed"; - blockStream = out; - result = true; // success - - } catch (IOException ie) { - - DFSClient.LOG.info("Exception in createBlockOutputStream", ie); - - // find the datanode that matches - if (firstBadLink.length() != 0) { - for (int i = 0; i < nodes.length; i++) { - if (nodes[i].getXferAddr().equals(firstBadLink)) { - errorIndex = i; - break; + assert null == blockStream : "Previous blockStream unclosed"; + blockStream = out; + result = true; // success + + } catch (IOException ie) { + DFSClient.LOG.info("Exception in createBlockOutputStream", ie); + if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + nodes[0] + " : " + ie); + // The encryption key used is invalid. + refetchEncryptionKey--; + dfsClient.clearDataEncryptionKey(); + // Don't close the socket/exclude this node just yet. Try again with + // a new encryption key. + continue; + } + + // find the datanode that matches + if (firstBadLink.length() != 0) { + for (int i = 0; i < nodes.length; i++) { + // NB: Unconditionally using the xfer addr w/o hostname + if (firstBadLink.equals(nodes[i].getXferAddr())) { + errorIndex = i; + break; + } } + } else { + errorIndex = 0; + } + hasError = true; + setLastException(ie); + result = false; // error + } finally { + if (!result) { + IOUtils.closeSocket(s); + s = null; + IOUtils.closeStream(out); + out = null; + IOUtils.closeStream(blockReplyStream); + blockReplyStream = null; } - } else { - errorIndex = 0; - } - hasError = true; - setLastException(ie); - result = false; // error - } finally { - if (!result) { - IOUtils.closeSocket(s); - s = null; - IOUtils.closeStream(out); - out = null; - IOUtils.closeStream(blockReplyStream); - blockReplyStream = null; } + return result; } - return result; } private LocatedBlock locateFollowingBlock(long start, @@ -1107,7 +1139,7 @@ public class DFSOutputStream extends FSO int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; long sleeptime = 400; while (true) { - long localstart = System.currentTimeMillis(); + long localstart = Time.now(); while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes); @@ -1130,9 +1162,9 @@ public class DFSOutputStream extends FSO } else { --retries; DFSClient.LOG.info("Exception while adding a block", e); - if (System.currentTimeMillis() - localstart > 5000) { + if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Waiting for replication for " - + (System.currentTimeMillis() - localstart) / 1000 + + (Time.now() - localstart) / 1000 + " seconds"); } try { @@ -1141,6 +1173,7 @@ public class DFSOutputStream extends FSO Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } } else { @@ -1180,11 +1213,11 @@ public class DFSOutputStream extends FSO */ static Socket createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client) throws IOException { - if(DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Connecting to datanode " + first); + final String dnAddr = first.getXferAddr(client.connectToDnViaHostname()); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Connecting to datanode " + dnAddr); } - final InetSocketAddress isa = - NetUtils.createSocketAddr(first.getXferAddr()); + final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); final Socket sock = client.socketFactory.createSocket(); final int timeout = client.getDatanodeReadTimeout(length); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout); @@ -1206,7 +1239,8 @@ public class DFSOutputStream extends FSO // // returns the list of targets, if any, that is being currently used. // - synchronized DatanodeInfo[] getPipeline() { + @VisibleForTesting + public synchronized DatanodeInfo[] getPipeline() { if (streamer == null) { return null; } @@ -1315,9 +1349,8 @@ public class DFSOutputStream extends FSO private void computePacketChunkSize(int psize, int csize) { int chunkSize = csize + checksum.getChecksumSize(); - int n = PacketHeader.PKT_HEADER_LEN; - chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1); - packetSize = n + chunkSize*chunksPerPacket; + chunksPerPacket = Math.max(psize/chunkSize, 1); + packetSize = chunkSize*chunksPerPacket; if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("computePacketChunkSize: src=" + src + ", chunkSize=" + chunkSize + @@ -1384,7 +1417,7 @@ public class DFSOutputStream extends FSO if (currentPacket == null) { currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + @@ -1431,8 +1464,8 @@ public class DFSOutputStream extends FSO // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, - bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1504,7 +1537,7 @@ public class DFSOutputStream extends FSO // but sync was requested. // Send an empty packet currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } } else { // We already flushed up to this offset. @@ -1521,7 +1554,7 @@ public class DFSOutputStream extends FSO // and sync was requested. // So send an empty sync packet. currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } else { // just discard the current packet since it is already been sent. currentPacket = null; @@ -1658,6 +1691,7 @@ public class DFSOutputStream extends FSO streamer.setLastException(new IOException("Lease timeout of " + (dfsClient.hdfsTimeout/1000) + " seconds expired.")); closeThreads(true); + dfsClient.endFileLease(src); } // shutdown datastreamer and responseprocessor threads. @@ -1701,8 +1735,8 @@ public class DFSOutputStream extends FSO if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, - bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } @@ -1712,7 +1746,7 @@ public class DFSOutputStream extends FSO ExtendedBlock lastBlock = streamer.getBlock(); closeThreads(false); completeFile(lastBlock); - dfsClient.leaserenewer.closeFile(src, dfsClient); + dfsClient.endFileLease(src); } finally { closed = true; } @@ -1721,14 +1755,14 @@ public class DFSOutputStream extends FSO // should be called holding (this) lock since setTestFilename() may // be called during unit tests private void completeFile(ExtendedBlock last) throws IOException { - long localstart = System.currentTimeMillis(); + long localstart = Time.now(); boolean fileComplete = false; while (!fileComplete) { fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last); if (!fileComplete) { if (!dfsClient.clientRunning || (dfsClient.hdfsTimeout > 0 && - localstart + dfsClient.hdfsTimeout < System.currentTimeMillis())) { + localstart + dfsClient.hdfsTimeout < Time.now())) { String msg = "Unable to close file because dfsclient " + " was unable to contact the HDFS servers." + " clientRunning " + dfsClient.clientRunning + @@ -1738,23 +1772,25 @@ public class DFSOutputStream extends FSO } try { Thread.sleep(400); - if (System.currentTimeMillis() - localstart > 5000) { + if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Could not complete file " + src + " retrying..."); } } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } } } - void setArtificialSlowdown(long period) { + @VisibleForTesting + public void setArtificialSlowdown(long period) { artificialSlowdown = period; } - synchronized void setChunksPerPacket(int value) { + @VisibleForTesting + public synchronized void setChunksPerPacket(int value) { chunksPerPacket = Math.min(chunksPerPacket, value); - packetSize = PacketHeader.PKT_HEADER_LEN + - (checksum.getBytesPerChecksum() + + packetSize = (checksum.getBytesPerChecksum() + checksum.getChecksumSize()) * chunksPerPacket; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Oct 19 02:25:55 2012 @@ -18,8 +18,21 @@ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; + import java.io.IOException; +import java.io.PrintStream; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URI; @@ -33,10 +46,17 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.StringTokenizer; import javax.net.SocketFactory; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -55,11 +75,13 @@ import org.apache.hadoop.ipc.ProtobufRpc import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.BlockingService; @@ -106,10 +128,48 @@ public class DFSUtil { a.isDecommissioned() ? 1 : -1; } }; + + + /** + * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states. + * Decommissioned/stale nodes are moved to the end of the array on sorting + * with this compartor. + */ + @InterfaceAudience.Private + public static class DecomStaleComparator implements Comparator { + private long staleInterval; + + /** + * Constructor of DecomStaleComparator + * + * @param interval + * The time invertal for marking datanodes as stale is passed from + * outside, since the interval may be changed dynamically + */ + public DecomStaleComparator(long interval) { + this.staleInterval = interval; + } + + @Override + public int compare(DatanodeInfo a, DatanodeInfo b) { + // Decommissioned nodes will still be moved to the end of the list + if (a.isDecommissioned()) { + return b.isDecommissioned() ? 0 : 1; + } else if (b.isDecommissioned()) { + return -1; + } + // Stale nodes will be moved behind the normal nodes + boolean aStale = a.isStale(staleInterval); + boolean bStale = b.isStale(staleInterval); + return aStale == bStale ? 0 : (aStale ? 1 : -1); + } + } + /** * Address matcher for matching an address to local address */ static final AddressMatcher LOCAL_ADDRESS_MATCHER = new AddressMatcher() { + @Override public boolean match(InetSocketAddress s) { return NetUtils.isLocalAddress(s.getAddress()); }; @@ -117,7 +177,7 @@ public class DFSUtil { /** * Whether the pathname is valid. Currently prohibits relative paths, - * and names which contain a ":" or "/" + * names which contain a ":" or "//", or other non-canonical paths. */ public static boolean isValidName(String src) { // Path must be absolute. @@ -126,15 +186,22 @@ public class DFSUtil { } // Check for ".." "." ":" "/" - StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR); - while(tokens.hasMoreTokens()) { - String element = tokens.nextToken(); + String[] components = StringUtils.split(src, '/'); + for (int i = 0; i < components.length; i++) { + String element = components[i]; if (element.equals("..") || element.equals(".") || (element.indexOf(":") >= 0) || (element.indexOf("/") >= 0)) { return false; } + + // The string may start or end with a /, but not have + // "//" in the middle. + if (element.isEmpty() && i != components.length - 1 && + i != 0) { + return false; + } } return true; } @@ -254,13 +321,25 @@ public class DFSUtil { if (blocks == null) { return new BlockLocation[0]; } - int nrBlocks = blocks.locatedBlockCount(); + return locatedBlocks2Locations(blocks.getLocatedBlocks()); + } + + /** + * Convert a List to BlockLocation[] + * @param blocks A List to be converted + * @return converted array of BlockLocation + */ + public static BlockLocation[] locatedBlocks2Locations(List blocks) { + if (blocks == null) { + return new BlockLocation[0]; + } + int nrBlocks = blocks.size(); BlockLocation[] blkLocations = new BlockLocation[nrBlocks]; if (nrBlocks == 0) { return blkLocations; } int idx = 0; - for (LocatedBlock blk : blocks.getLocatedBlocks()) { + for (LocatedBlock blk : blocks) { assert idx < nrBlocks : "Incorrect index"; DatanodeInfo[] locations = blk.getLocations(); String[] hosts = new String[locations.length]; @@ -410,12 +489,39 @@ public class DFSUtil { } /** + * @return a collection of all configured NN Kerberos principals. + */ + public static Set getAllNnPrincipals(Configuration conf) throws IOException { + Set principals = new HashSet(); + for (String nsId : DFSUtil.getNameServiceIds(conf)) { + if (HAUtil.isHAEnabled(conf, nsId)) { + for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) { + Configuration confForNn = new Configuration(conf); + NameNode.initializeGenericKeys(confForNn, nsId, nnId); + String principal = SecurityUtil.getServerPrincipal(confForNn + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(confForNn).getHostName()); + principals.add(principal); + } + } else { + Configuration confForNn = new Configuration(conf); + NameNode.initializeGenericKeys(confForNn, nsId, null); + String principal = SecurityUtil.getServerPrincipal(confForNn + .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), + NameNode.getAddress(confForNn).getHostName()); + principals.add(principal); + } + } + + return principals; + } + + /** * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from * the configuration. * * @param conf configuration * @return list of InetSocketAddresses - * @throws IOException if no addresses are configured */ public static Map> getHaNnRpcAddresses( Configuration conf) { @@ -832,17 +938,17 @@ public class DFSUtil { /** Create a {@link ClientDatanodeProtocol} proxy */ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, - LocatedBlock locatedBlock) throws IOException { + boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, - locatedBlock); + connectToDnViaHostname, locatedBlock); } /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ static ClientDatanodeProtocol createClientDatanodeProtocolProxy( - DatanodeID datanodeid, Configuration conf, int socketTimeout) - throws IOException { + DatanodeID datanodeid, Configuration conf, int socketTimeout, + boolean connectToDnViaHostname) throws IOException { return new ClientDatanodeProtocolTranslatorPB( - datanodeid, conf, socketTimeout); + datanodeid, conf, socketTimeout, connectToDnViaHostname); } /** Create a {@link ClientDatanodeProtocol} proxy */ @@ -1064,4 +1170,82 @@ public class DFSUtil { return null; } } + + public static Options helpOptions = new Options(); + public static Option helpOpt = new Option("h", "help", false, + "get help information"); + + static { + helpOptions.addOption(helpOpt); + } + + /** + * Parse the arguments for commands + * + * @param args the argument to be parsed + * @param helpDescription help information to be printed out + * @param out Printer + * @param printGenericCommandUsage whether to print the + * generic command usage defined in ToolRunner + * @return true when the argument matches help option, false if not + */ + public static boolean parseHelpArgument(String[] args, + String helpDescription, PrintStream out, boolean printGenericCommandUsage) { + if (args.length == 1) { + try { + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = parser.parse(helpOptions, args); + if (cmdLine.hasOption(helpOpt.getOpt()) + || cmdLine.hasOption(helpOpt.getLongOpt())) { + // should print out the help information + out.println(helpDescription + "\n"); + if (printGenericCommandUsage) { + ToolRunner.printGenericCommandUsage(out); + } + return true; + } + } catch (ParseException pe) { + return false; + } + } + return false; + } + + /** + * Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration. + * + * @param conf Configuration + * @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION + */ + public static float getInvalidateWorkPctPerIteration(Configuration conf) { + float blocksInvalidateWorkPct = conf.getFloat( + DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, + DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (blocksInvalidateWorkPct > 0 && blocksInvalidateWorkPct <= 1.0f), + DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION + + " = '" + blocksInvalidateWorkPct + "' is invalid. " + + "It should be a positive, non-zero float value, not greater than 1.0f, " + + "to indicate a percentage."); + return blocksInvalidateWorkPct; + } + + /** + * Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION from + * configuration. + * + * @param conf Configuration + * @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION + */ + public static int getReplWorkMultiplier(Configuration conf) { + int blocksReplWorkMultiplier = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT); + Preconditions.checkArgument( + (blocksReplWorkMultiplier > 0), + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION + + " = '" + blocksReplWorkMultiplier + "' is invalid. " + + "It should be a positive, non-zero integer value."); + return blocksReplWorkMultiplier; + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Oct 19 02:25:55 2012 @@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.BlockStorageLocation; +import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -40,6 +42,7 @@ import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; @@ -52,12 +55,11 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.AccessControlException; @@ -189,6 +191,36 @@ public class DistributedFileSystem exten } + /** + * Used to query storage location information for a list of blocks. This list + * of blocks is normally constructed via a series of calls to + * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to + * get the blocks for ranges of a file. + * + * The returned array of {@link BlockStorageLocation} augments + * {@link BlockLocation} with a {@link VolumeId} per block replica. The + * VolumeId specifies the volume on the datanode on which the replica resides. + * The VolumeId has to be checked via {@link VolumeId#isValid()} before being + * used because volume information can be unavailable if the corresponding + * datanode is down or if the requested block is not found. + * + * This API is unstable, and datanode-side support is disabled by default. It + * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to + * true. + * + * @param blocks + * List of target BlockLocations to query volume location information + * @return volumeBlockLocations Augmented array of + * {@link BlockStorageLocation}s containing additional volume location + * information for each replica of each block. + */ + @InterfaceStability.Unstable + public BlockStorageLocation[] getFileBlockStorageLocations( + List blocks) throws IOException, + UnsupportedOperationException, InvalidBlockTokenException { + return dfs.getBlockStorageLocations(blocks); + } + @Override public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; @@ -225,19 +257,19 @@ public class DistributedFileSystem exten public HdfsDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return create(f, permission, + return this.create(f, permission, overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, - blockSize, progress); + blockSize, progress, null); } @Override public HdfsDataOutputStream create(Path f, FsPermission permission, EnumSet cflags, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { + Progressable progress, ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags, - replication, blockSize, progress, bufferSize); + replication, blockSize, progress, bufferSize, checksumOpt); return new HdfsDataOutputStream(out, statistics); } @@ -246,11 +278,11 @@ public class DistributedFileSystem exten protected HdfsDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet flag, int bufferSize, short replication, long blockSize, Progressable progress, - int bytesPerChecksum) throws IOException { - statistics.incrementReadOps(1); + ChecksumOpt checksumOpt) throws IOException { + statistics.incrementWriteOps(1); return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f), absolutePermission, flag, true, replication, blockSize, - progress, bufferSize, bytesPerChecksum),statistics); + progress, bufferSize, checksumOpt),statistics); } /** @@ -265,7 +297,8 @@ public class DistributedFileSystem exten flag.add(CreateFlag.CREATE); } return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag, - false, replication, blockSize, progress, bufferSize), statistics); + false, replication, blockSize, progress, + bufferSize, null), statistics); } @Override @@ -502,10 +535,10 @@ public class DistributedFileSystem exten @Override public void close() throws IOException { try { - super.processDeleteOnExit(); - dfs.close(); - } finally { + dfs.closeOutputStreams(false); super.close(); + } finally { + dfs.close(); } } @@ -591,6 +624,16 @@ public class DistributedFileSystem exten public void saveNamespace() throws AccessControlException, IOException { dfs.saveNamespace(); } + + /** + * Rolls the edit log on the active NameNode. + * Requires super-user privileges. + * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits() + * @return the transaction ID of the newly created segment + */ + public long rollEdits() throws AccessControlException, IOException { + return dfs.rollEdits(); + } /** * enable/disable/check restoreFaileStorage @@ -619,11 +662,6 @@ public class DistributedFileSystem exten dfs.finalizeUpgrade(); } - public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action - ) throws IOException { - return dfs.distributedUpgradeProgress(action); - } - /* * Requests the namenode to dump data strcutures into specified * file. @@ -765,14 +803,6 @@ public class DistributedFileSystem exten return getDelegationToken(renewer.toString()); } - @Override // FileSystem - public List> getDelegationTokens(String renewer) throws IOException { - List> tokenList = new ArrayList>(); - Token token = this.getDelegationToken(renewer); - tokenList.add(token); - return tokenList; - } - /** * Renew an existing delegation token. * Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Fri Oct 19 02:25:55 2012 @@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro import org.apache.hadoop.ha.ZKFCProtocol; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends new Service("security.inter.datanode.protocol.acl", InterDatanodeProtocol.class), new Service("security.namenode.protocol.acl", NamenodeProtocol.class), + new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class), new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL, HAServiceProtocol.class), new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL, Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Fri Oct 19 02:25:55 2012 @@ -105,4 +105,9 @@ public class HdfsConfiguration extends C deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES); deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID); } + + public static void main(String[] args) { + init(); + Configuration.dumpDeprecatedKeys(); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Fri Oct 19 02:25:55 2012 @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URI; @@ -113,6 +114,7 @@ public class HftpFileSystem extends File protected static final ThreadLocal df = new ThreadLocal() { + @Override protected SimpleDateFormat initialValue() { return getDateFormat(); } @@ -240,19 +242,22 @@ public class HftpFileSystem extends File //Renew TGT if needed ugi.reloginFromKeytab(); return ugi.doAs(new PrivilegedExceptionAction>() { + @Override public Token run() throws IOException { final String nnHttpUrl = nnSecureUri.toString(); Credentials c; try { c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer); - } catch (Exception e) { - LOG.info("Couldn't get a delegation token from " + nnHttpUrl + - " using http."); - if(LOG.isDebugEnabled()) { - LOG.debug("error was ", e); + } catch (IOException e) { + if (e.getCause() instanceof ConnectException) { + LOG.warn("Couldn't connect to " + nnHttpUrl + + ", assuming security is disabled"); + return null; } - //Maybe the server is in unsecure mode (that's bad but okay) - return null; + if (LOG.isDebugEnabled()) { + LOG.debug("Exception getting delegation token", e); + } + throw e; } for (Token t : c.getAllTokens()) { if(LOG.isDebugEnabled()) { @@ -340,19 +345,28 @@ public class HftpFileSystem extends File super(url); } - @Override protected HttpURLConnection openConnection() throws IOException { return (HttpURLConnection)URLUtils.openConnection(url); } /** Use HTTP Range header for specifying offset. */ @Override - protected HttpURLConnection openConnection(final long offset) throws IOException { + protected HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException { final HttpURLConnection conn = openConnection(); conn.setRequestMethod("GET"); if (offset != 0L) { conn.setRequestProperty("Range", "bytes=" + offset + "-"); } + conn.connect(); + + //Expects HTTP_OK or HTTP_PARTIAL response codes. + final int code = conn.getResponseCode(); + if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) { + throw new IOException("HTTP_PARTIAL expected, received " + code); + } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) { + throw new IOException("HTTP_OK expected, received " + code); + } return conn; } } @@ -366,22 +380,6 @@ public class HftpFileSystem extends File this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); } - /** Expects HTTP_OK and HTTP_PARTIAL response codes. */ - @Override - protected void checkResponseCode(final HttpURLConnection connection - ) throws IOException { - final int code = connection.getResponseCode(); - if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) { - // We asked for a byte range but did not receive a partial content - // response... - throw new IOException("HTTP_PARTIAL expected, received " + code); - } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) { - // We asked for all bytes from the beginning but didn't receive a 200 - // response (none of the other 2xx codes are valid here) - throw new IOException("HTTP_OK expected, received " + code); - } - } - @Override protected URL getResolvedUrl(final HttpURLConnection connection) { return connection.getURL(); @@ -402,6 +400,7 @@ public class HftpFileSystem extends File ArrayList fslist = new ArrayList(); + @Override public void startElement(String ns, String localname, String qname, Attributes attrs) throws SAXException { if ("listing".equals(qname)) return; @@ -541,6 +540,7 @@ public class HftpFileSystem extends File public void setWorkingDirectory(Path f) { } /** This optional operation is not yet supported. */ + @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { throw new IOException("Not supported"); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Fri Oct 19 02:25:55 2012 @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.security.KeyStore; import java.security.cert.X509Certificate; @@ -42,6 +41,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.web.URLUtils; +import org.apache.hadoop.util.Time; /** * An implementation of a protocol for accessing filesystems over HTTPS. The @@ -164,8 +164,7 @@ public class HsftpFileSystem extends Hft final int warnDays = ExpWarnDays; if (warnDays > 0) { // make sure only check once ExpWarnDays = 0; - long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY - + System.currentTimeMillis(); + long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY + Time.now(); X509Certificate[] clientCerts = (X509Certificate[]) conn .getLocalCertificates(); if (clientCerts != null) { @@ -175,7 +174,7 @@ public class HsftpFileSystem extends Hft StringBuilder sb = new StringBuilder(); sb.append("\n Client certificate " + cert.getSubjectX500Principal().getName()); - int dayOffSet = (int) ((expTime - System.currentTimeMillis()) / MM_SECONDS_PER_DAY); + int dayOffSet = (int) ((expTime - Time.now()) / MM_SECONDS_PER_DAY); sb.append(" have " + dayOffSet + " days to expire"); LOG.warn(sb.toString()); } @@ -189,6 +188,7 @@ public class HsftpFileSystem extends Hft * Dummy hostname verifier that is used to bypass hostname checking */ protected static class DummyHostnameVerifier implements HostnameVerifier { + @Override public boolean verify(String hostname, SSLSession session) { return true; } @@ -198,12 +198,15 @@ public class HsftpFileSystem extends Hft * Dummy trustmanager that is used to trust all server certificates */ protected static class DummyTrustManager implements X509TrustManager { + @Override public void checkClientTrusted(X509Certificate[] chain, String authType) { } + @Override public void checkServerTrusted(X509Certificate[] chain, String authType) { } + @Override public X509Certificate[] getAcceptedIssuers() { return null; }