Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC3881864E for ; Wed, 19 Aug 2015 18:43:49 +0000 (UTC) Received: (qmail 70418 invoked by uid 500); 19 Aug 2015 18:43:48 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 70223 invoked by uid 500); 19 Aug 2015 18:43:48 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 70060 invoked by uid 99); 19 Aug 2015 18:43:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 18:43:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4822AE7140; Wed, 19 Aug 2015 18:43:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Date: Wed, 19 Aug 2015 18:43:50 -0000 Message-Id: <3cf36ee80a0c4fef8c008f973872799d@git.apache.org> In-Reply-To: <4150006728b74c02b32c58a3b925b58e@git.apache.org> References: <4150006728b74c02b32c58a3b925b58e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: HDFS-8803. Move DfsClientConf to hdfs-client. Contributed by Mingliang Liu. HDFS-8803. Move DfsClientConf to hdfs-client. Contributed by Mingliang Liu. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3aac4758 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3aac4758 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3aac4758 Branch: refs/heads/trunk Commit: 3aac4758b007a56e3d66998d457b2156effca528 Parents: f61120d Author: Haohui Mai Authored: Wed Aug 19 11:28:05 2015 -0700 Committer: Haohui Mai Committed: Wed Aug 19 11:28:05 2015 -0700 ---------------------------------------------------------------------- .../hdfs/client/HdfsClientConfigKeys.java | 73 +- .../hadoop/hdfs/client/impl/DfsClientConf.java | 741 +++++++++++++++++++ .../hadoop/hdfs/client/impl/package-info.java | 18 + .../hadoop/hdfs/protocol/HdfsConstants.java | 7 + .../hadoop/hdfs/util/ByteArrayManager.java | 422 +++++++++++ .../apache/hadoop/hdfs/util/package-info.java | 18 + hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/BlockReaderFactory.java | 8 +- .../org/apache/hadoop/hdfs/ClientContext.java | 5 +- .../java/org/apache/hadoop/hdfs/DFSClient.java | 14 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 173 ++++- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 3 +- .../apache/hadoop/hdfs/HdfsConfiguration.java | 8 +- .../hadoop/hdfs/client/impl/DfsClientConf.java | 714 ------------------ .../hadoop/hdfs/server/balancer/Dispatcher.java | 4 +- .../hdfs/server/common/HdfsServerConstants.java | 6 - .../hadoop/hdfs/server/datanode/DNConf.java | 17 +- .../hadoop/hdfs/server/datanode/DataNode.java | 11 +- .../hdfs/server/datanode/DataXceiver.java | 5 +- .../server/datanode/SecureDataNodeStarter.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 8 +- .../hdfs/server/namenode/NamenodeFsck.java | 8 +- .../hdfs/shortcircuit/DomainSocketFactory.java | 4 +- .../hadoop/hdfs/util/ByteArrayManager.java | 418 ----------- .../org/apache/hadoop/hdfs/TestFiPipelines.java | 9 +- .../datanode/TestFiDataTransferProtocol.java | 3 +- .../datanode/TestFiDataTransferProtocol2.java | 5 +- .../hadoop/fs/TestEnhancedByteBufferAccess.java | 10 +- .../java/org/apache/hadoop/fs/TestUnbuffer.java | 7 +- .../fs/viewfs/TestViewFsDefaultValue.java | 8 +- .../apache/hadoop/hdfs/BlockReaderTestUtil.java | 6 +- .../org/apache/hadoop/hdfs/FileAppendTest4.java | 5 +- .../hadoop/hdfs/TestBlockReaderFactory.java | 4 +- .../hadoop/hdfs/TestBlockReaderLocal.java | 4 +- .../hadoop/hdfs/TestBlockReaderLocalLegacy.java | 6 +- .../TestClientProtocolForPipelineRecovery.java | 4 +- .../org/apache/hadoop/hdfs/TestConnCache.java | 5 +- .../hadoop/hdfs/TestDFSClientRetries.java | 8 +- .../apache/hadoop/hdfs/TestDFSInputStream.java | 2 +- .../hadoop/hdfs/TestDataTransferKeepalive.java | 8 +- .../hadoop/hdfs/TestDataTransferProtocol.java | 6 +- .../apache/hadoop/hdfs/TestDatanodeDeath.java | 5 +- .../hadoop/hdfs/TestDisableConnCache.java | 3 +- .../hadoop/hdfs/TestDistributedFileSystem.java | 7 +- .../org/apache/hadoop/hdfs/TestFileAppend2.java | 6 +- .../org/apache/hadoop/hdfs/TestFileAppend4.java | 5 +- .../apache/hadoop/hdfs/TestFileCreation.java | 11 +- .../java/org/apache/hadoop/hdfs/TestHFlush.java | 3 +- .../apache/hadoop/hdfs/TestParallelRead.java | 2 +- .../TestParallelShortCircuitLegacyRead.java | 4 +- .../TestParallelShortCircuitReadUnCached.java | 6 +- .../hadoop/hdfs/TestParallelUnixDomainRead.java | 2 +- .../org/apache/hadoop/hdfs/TestPipelines.java | 9 +- .../java/org/apache/hadoop/hdfs/TestPread.java | 2 +- .../java/org/apache/hadoop/hdfs/TestRead.java | 5 +- .../hadoop/hdfs/TestRemoteBlockReader.java | 4 +- .../blockmanagement/TestBlockTokenWithDFS.java | 6 +- .../server/datanode/TestBlockReplacement.java | 7 +- .../server/datanode/TestCachingStrategy.java | 7 +- .../datanode/TestDataNodeVolumeFailure.java | 6 +- .../fsdataset/impl/LazyPersistTestCase.java | 5 +- .../fsdataset/impl/TestDatanodeRestart.java | 7 +- .../shortcircuit/TestShortCircuitCache.java | 13 +- .../shortcircuit/TestShortCircuitLocalRead.java | 6 +- 64 files changed, 1591 insertions(+), 1331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 600c7ca..ccf5f4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -17,7 +17,12 @@ */ package org.apache.hadoop.hdfs.client; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.concurrent.TimeUnit; + /** Client configuration properties */ +@InterfaceAudience.Private public interface HdfsClientConfigKeys { long SECOND = 1000L; long MINUTE = 60 * SECOND; @@ -31,7 +36,7 @@ public interface HdfsClientConfigKeys { String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT = "^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$"; - static final String PREFIX = "dfs.client."; + String PREFIX = "dfs.client."; String DFS_NAMESERVICES = "dfs.nameservices"; int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; @@ -41,6 +46,72 @@ public interface HdfsClientConfigKeys { int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020; String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.namenode.kerberos.principal"; + String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; + int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; + String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = + "dfs.client.socketcache.capacity"; + int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; + String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = + "dfs.client.socketcache.expiryMsec"; + long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000; + String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; + boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false; + String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = + "dfs.client.cache.drop.behind.writes"; + String DFS_CLIENT_CACHE_DROP_BEHIND_READS = + "dfs.client.cache.drop.behind.reads"; + String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead"; + String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry"; + int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3; + String DFS_CLIENT_CONTEXT = "dfs.client.context"; + String DFS_CLIENT_CONTEXT_DEFAULT = "default"; + String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = + "dfs.client.file-block-storage-locations.num-threads"; + int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10; + String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = + "dfs.client.file-block-storage-locations.timeout.millis"; + int DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = 1000; + String DFS_CLIENT_USE_LEGACY_BLOCKREADER = + "dfs.client.use.legacy.blockreader"; + boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false; + String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = + "dfs.client.use.legacy.blockreader.local"; + boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false; + String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = + "dfs.client.datanode-restart.timeout"; + long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; + // Much code in hdfs is not yet updated to use these keys. + // the initial delay (unit is ms) for locateFollowingBlock, the delay time + // will increase exponentially(double) for each retry. + String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = + "dfs.client.max.block.acquire.failures"; + int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3; + String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; + String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; + String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; + int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = + "dfs.datanode.socket.write.timeout"; + String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = + "dfs.client.domain.socket.data.traffic"; + boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false; + String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path"; + String DFS_DOMAIN_SOCKET_PATH_DEFAULT = ""; + String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = + "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; + int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = + 60000; + String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = + "dfs.client.slow.io.warning.threshold.ms"; + long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; + String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS = + "dfs.client.key.provider.cache.expiry"; + long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = + TimeUnit.DAYS.toMillis(10); // 10 days + String DFS_HDFS_BLOCKS_METADATA_ENABLED = + "dfs.datanode.hdfs-blocks-metadata.enabled"; + boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; /** dfs.client.retry configuration properties */ interface Retry { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java new file mode 100644 index 0000000..52aed2f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -0,0 +1,741 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Mmap; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Read; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; + +/** + * DFSClient configuration. + */ +public class DfsClientConf { + private static final Logger LOG = LoggerFactory.getLogger(DfsClientConf + .class); + + private final int hdfsTimeout; // timeout value for a DFS operation. + + private final int maxFailoverAttempts; + private final int maxRetryAttempts; + private final int failoverSleepBaseMillis; + private final int failoverSleepMaxMillis; + private final int maxBlockAcquireFailures; + private final int datanodeSocketWriteTimeout; + private final int ioBufferSize; + private final ChecksumOpt defaultChecksumOpt; + private final int writePacketSize; + private final int writeMaxPackets; + private final ByteArrayManager.Conf writeByteArrayManagerConf; + private final int socketTimeout; + private final long excludedNodesCacheExpiry; + /** Wait time window (in msec) if BlockMissingException is caught. */ + private final int timeWindow; + private final int numCachedConnRetry; + private final int numBlockWriteRetry; + private final int numBlockWriteLocateFollowingRetry; + private final int blockWriteLocateFollowingInitialDelayMs; + private final long defaultBlockSize; + private final long prefetchSize; + private final short defaultReplication; + private final String taskId; + private final FsPermission uMask; + private final boolean connectToDnViaHostname; + private final int retryTimesForGetLastBlockLength; + private final int retryIntervalForGetLastBlockLength; + private final long datanodeRestartTimeout; + private final long slowIoWarningThresholdMs; + + private final ShortCircuitConf shortCircuitConf; + + private final long hedgedReadThresholdMillis; + private final int hedgedReadThreadpoolSize; + + public DfsClientConf(Configuration conf) { + // The hdfsTimeout is currently the same as the ipc timeout + hdfsTimeout = Client.getTimeout(conf); + + maxRetryAttempts = conf.getInt( + Retry.MAX_ATTEMPTS_KEY, + Retry.MAX_ATTEMPTS_DEFAULT); + timeWindow = conf.getInt( + Retry.WINDOW_BASE_KEY, + Retry.WINDOW_BASE_DEFAULT); + retryTimesForGetLastBlockLength = conf.getInt( + Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, + Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); + retryIntervalForGetLastBlockLength = conf.getInt( + Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_KEY, + Retry.INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT); + + maxFailoverAttempts = conf.getInt( + Failover.MAX_ATTEMPTS_KEY, + Failover.MAX_ATTEMPTS_DEFAULT); + failoverSleepBaseMillis = conf.getInt( + Failover.SLEEPTIME_BASE_KEY, + Failover.SLEEPTIME_BASE_DEFAULT); + failoverSleepMaxMillis = conf.getInt( + Failover.SLEEPTIME_MAX_KEY, + Failover.SLEEPTIME_MAX_DEFAULT); + + maxBlockAcquireFailures = conf.getInt( + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + datanodeSocketWriteTimeout = conf.getInt( + DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, + HdfsConstants.WRITE_TIMEOUT); + ioBufferSize = conf.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + defaultChecksumOpt = getChecksumOptFromConf(conf); + socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsConstants.READ_TIMEOUT); + /** dfs.write.packet.size is an internal config variable */ + writePacketSize = conf.getInt( + DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + writeMaxPackets = conf.getInt( + Write.MAX_PACKETS_IN_FLIGHT_KEY, + Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); + + final boolean byteArrayManagerEnabled = conf.getBoolean( + Write.ByteArrayManager.ENABLED_KEY, + Write.ByteArrayManager.ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + writeByteArrayManagerConf = null; + } else { + final int countThreshold = conf.getInt( + Write.ByteArrayManager.COUNT_THRESHOLD_KEY, + Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + Write.ByteArrayManager.COUNT_LIMIT_KEY, + Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + writeByteArrayManagerConf = new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + + defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, + DFS_BLOCK_SIZE_DEFAULT); + defaultReplication = (short) conf.getInt( + DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT); + taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); + excludedNodesCacheExpiry = conf.getLong( + Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY, + Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); + prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, + 10 * defaultBlockSize); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, + DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); + numBlockWriteRetry = conf.getInt( + BlockWrite.RETRIES_KEY, + BlockWrite.RETRIES_DEFAULT); + numBlockWriteLocateFollowingRetry = conf.getInt( + BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, + BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); + blockWriteLocateFollowingInitialDelayMs = conf.getInt( + BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_KEY, + BlockWrite.LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_MS_DEFAULT); + uMask = FsPermission.getUMask(conf); + connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, + DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + + datanodeRestartTimeout = conf.getLong( + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, + DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000; + slowIoWarningThresholdMs = conf.getLong( + DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + + shortCircuitConf = new ShortCircuitConf(conf); + + hedgedReadThresholdMillis = conf.getLong( + HedgedRead.THRESHOLD_MILLIS_KEY, + HedgedRead.THRESHOLD_MILLIS_DEFAULT); + hedgedReadThreadpoolSize = conf.getInt( + HedgedRead.THREADPOOL_SIZE_KEY, + HedgedRead.THREADPOOL_SIZE_DEFAULT); + } + + private DataChecksum.Type getChecksumType(Configuration conf) { + final String checksum = conf.get( + DFS_CHECKSUM_TYPE_KEY, + DFS_CHECKSUM_TYPE_DEFAULT); + try { + return DataChecksum.Type.valueOf(checksum); + } catch(IllegalArgumentException iae) { + LOG.warn("Bad checksum type: {}. Using default {}", checksum, + DFS_CHECKSUM_TYPE_DEFAULT); + return DataChecksum.Type.valueOf( + DFS_CHECKSUM_TYPE_DEFAULT); + } + } + + // Construct a checksum option from conf + private ChecksumOpt getChecksumOptFromConf(Configuration conf) { + DataChecksum.Type type = getChecksumType(conf); + int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, + DFS_BYTES_PER_CHECKSUM_DEFAULT); + return new ChecksumOpt(type, bytesPerChecksum); + } + + /** create a DataChecksum with the given option. */ + public DataChecksum createChecksum(ChecksumOpt userOpt) { + // Fill in any missing field with the default. + ChecksumOpt opt = ChecksumOpt.processChecksumOpt( + defaultChecksumOpt, userOpt); + DataChecksum dataChecksum = DataChecksum.newDataChecksum( + opt.getChecksumType(), + opt.getBytesPerChecksum()); + if (dataChecksum == null) { + throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt=" + + userOpt + ", default=" + defaultChecksumOpt + + ", effective=null"); + } + return dataChecksum; + } + + @VisibleForTesting + public int getBlockWriteLocateFollowingInitialDelayMs() { + return blockWriteLocateFollowingInitialDelayMs; + } + + /** + * @return the hdfsTimeout + */ + public int getHdfsTimeout() { + return hdfsTimeout; + } + + /** + * @return the maxFailoverAttempts + */ + public int getMaxFailoverAttempts() { + return maxFailoverAttempts; + } + + /** + * @return the maxRetryAttempts + */ + public int getMaxRetryAttempts() { + return maxRetryAttempts; + } + + /** + * @return the failoverSleepBaseMillis + */ + public int getFailoverSleepBaseMillis() { + return failoverSleepBaseMillis; + } + + /** + * @return the failoverSleepMaxMillis + */ + public int getFailoverSleepMaxMillis() { + return failoverSleepMaxMillis; + } + + /** + * @return the maxBlockAcquireFailures + */ + public int getMaxBlockAcquireFailures() { + return maxBlockAcquireFailures; + } + + /** + * @return the datanodeSocketWriteTimeout + */ + public int getDatanodeSocketWriteTimeout() { + return datanodeSocketWriteTimeout; + } + + /** + * @return the ioBufferSize + */ + public int getIoBufferSize() { + return ioBufferSize; + } + + /** + * @return the defaultChecksumOpt + */ + public ChecksumOpt getDefaultChecksumOpt() { + return defaultChecksumOpt; + } + + /** + * @return the writePacketSize + */ + public int getWritePacketSize() { + return writePacketSize; + } + + /** + * @return the writeMaxPackets + */ + public int getWriteMaxPackets() { + return writeMaxPackets; + } + + /** + * @return the writeByteArrayManagerConf + */ + public ByteArrayManager.Conf getWriteByteArrayManagerConf() { + return writeByteArrayManagerConf; + } + + /** + * @return the socketTimeout + */ + public int getSocketTimeout() { + return socketTimeout; + } + + /** + * @return the excludedNodesCacheExpiry + */ + public long getExcludedNodesCacheExpiry() { + return excludedNodesCacheExpiry; + } + + /** + * @return the timeWindow + */ + public int getTimeWindow() { + return timeWindow; + } + + /** + * @return the numCachedConnRetry + */ + public int getNumCachedConnRetry() { + return numCachedConnRetry; + } + + /** + * @return the numBlockWriteRetry + */ + public int getNumBlockWriteRetry() { + return numBlockWriteRetry; + } + + /** + * @return the numBlockWriteLocateFollowingRetry + */ + public int getNumBlockWriteLocateFollowingRetry() { + return numBlockWriteLocateFollowingRetry; + } + + /** + * @return the defaultBlockSize + */ + public long getDefaultBlockSize() { + return defaultBlockSize; + } + + /** + * @return the prefetchSize + */ + public long getPrefetchSize() { + return prefetchSize; + } + + /** + * @return the defaultReplication + */ + public short getDefaultReplication() { + return defaultReplication; + } + + /** + * @return the taskId + */ + public String getTaskId() { + return taskId; + } + + /** + * @return the uMask + */ + public FsPermission getUMask() { + return uMask; + } + + /** + * @return the connectToDnViaHostname + */ + public boolean isConnectToDnViaHostname() { + return connectToDnViaHostname; + } + + /** + * @return the retryTimesForGetLastBlockLength + */ + public int getRetryTimesForGetLastBlockLength() { + return retryTimesForGetLastBlockLength; + } + + /** + * @return the retryIntervalForGetLastBlockLength + */ + public int getRetryIntervalForGetLastBlockLength() { + return retryIntervalForGetLastBlockLength; + } + + /** + * @return the datanodeRestartTimeout + */ + public long getDatanodeRestartTimeout() { + return datanodeRestartTimeout; + } + + /** + * @return the slowIoWarningThresholdMs + */ + public long getSlowIoWarningThresholdMs() { + return slowIoWarningThresholdMs; + } + + /** + * @return the hedgedReadThresholdMillis + */ + public long getHedgedReadThresholdMillis() { + return hedgedReadThresholdMillis; + } + + /** + * @return the hedgedReadThreadpoolSize + */ + public int getHedgedReadThreadpoolSize() { + return hedgedReadThreadpoolSize; + } + + /** + * @return the shortCircuitConf + */ + public ShortCircuitConf getShortCircuitConf() { + return shortCircuitConf; + } + + /** + * Configuration for short-circuit reads. + */ + public static class ShortCircuitConf { + private static final Logger LOG = DfsClientConf.LOG; + + private final int socketCacheCapacity; + private final long socketCacheExpiry; + + private final boolean useLegacyBlockReader; + private final boolean useLegacyBlockReaderLocal; + private final String domainSocketPath; + private final boolean skipShortCircuitChecksums; + + private final int shortCircuitBufferSize; + private final boolean shortCircuitLocalReads; + private final boolean domainSocketDataTraffic; + private final int shortCircuitStreamsCacheSize; + private final long shortCircuitStreamsCacheExpiryMs; + private final int shortCircuitSharedMemoryWatcherInterruptCheckMs; + + private final boolean shortCircuitMmapEnabled; + private final int shortCircuitMmapCacheSize; + private final long shortCircuitMmapCacheExpiryMs; + private final long shortCircuitMmapCacheRetryTimeout; + private final long shortCircuitCacheStaleThresholdMs; + + private final long keyProviderCacheExpiryMs; + + public ShortCircuitConf(Configuration conf) { + socketCacheCapacity = conf.getInt( + DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, + DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + socketCacheExpiry = conf.getLong( + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); + + useLegacyBlockReader = conf.getBoolean( + DFS_CLIENT_USE_LEGACY_BLOCKREADER, + DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); + useLegacyBlockReaderLocal = conf.getBoolean( + DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, + DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT); + shortCircuitLocalReads = conf.getBoolean( + Read.ShortCircuit.KEY, + Read.ShortCircuit.DEFAULT); + domainSocketDataTraffic = conf.getBoolean( + DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, + DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT); + domainSocketPath = conf.getTrimmed( + DFS_DOMAIN_SOCKET_PATH_KEY, + DFS_DOMAIN_SOCKET_PATH_DEFAULT); + + LOG.debug(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL + + " = {}", useLegacyBlockReaderLocal); + LOG.debug(Read.ShortCircuit.KEY + + " = {}", shortCircuitLocalReads); + LOG.debug(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC + + " = {}", domainSocketDataTraffic); + LOG.debug(DFS_DOMAIN_SOCKET_PATH_KEY + + " = {}", domainSocketPath); + + skipShortCircuitChecksums = conf.getBoolean( + Read.ShortCircuit.SKIP_CHECKSUM_KEY, + Read.ShortCircuit.SKIP_CHECKSUM_DEFAULT); + shortCircuitBufferSize = conf.getInt( + Read.ShortCircuit.BUFFER_SIZE_KEY, + Read.ShortCircuit.BUFFER_SIZE_DEFAULT); + shortCircuitStreamsCacheSize = conf.getInt( + Read.ShortCircuit.STREAMS_CACHE_SIZE_KEY, + Read.ShortCircuit.STREAMS_CACHE_SIZE_DEFAULT); + shortCircuitStreamsCacheExpiryMs = conf.getLong( + Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY, + Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_DEFAULT); + shortCircuitMmapEnabled = conf.getBoolean( + Mmap.ENABLED_KEY, + Mmap.ENABLED_DEFAULT); + shortCircuitMmapCacheSize = conf.getInt( + Mmap.CACHE_SIZE_KEY, + Mmap.CACHE_SIZE_DEFAULT); + shortCircuitMmapCacheExpiryMs = conf.getLong( + Mmap.CACHE_TIMEOUT_MS_KEY, + Mmap.CACHE_TIMEOUT_MS_DEFAULT); + shortCircuitMmapCacheRetryTimeout = conf.getLong( + Mmap.RETRY_TIMEOUT_MS_KEY, + Mmap.RETRY_TIMEOUT_MS_DEFAULT); + shortCircuitCacheStaleThresholdMs = conf.getLong( + ShortCircuit.REPLICA_STALE_THRESHOLD_MS_KEY, + ShortCircuit.REPLICA_STALE_THRESHOLD_MS_DEFAULT); + shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); + + keyProviderCacheExpiryMs = conf.getLong( + DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, + DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT); + } + + /** + * @return the socketCacheCapacity + */ + public int getSocketCacheCapacity() { + return socketCacheCapacity; + } + + /** + * @return the socketCacheExpiry + */ + public long getSocketCacheExpiry() { + return socketCacheExpiry; + } + + public boolean isUseLegacyBlockReaderLocal() { + return useLegacyBlockReaderLocal; + } + + public String getDomainSocketPath() { + return domainSocketPath; + } + + public boolean isShortCircuitLocalReads() { + return shortCircuitLocalReads; + } + + public boolean isDomainSocketDataTraffic() { + return domainSocketDataTraffic; + } + /** + * @return the useLegacyBlockReader + */ + public boolean isUseLegacyBlockReader() { + return useLegacyBlockReader; + } + + /** + * @return the skipShortCircuitChecksums + */ + public boolean isSkipShortCircuitChecksums() { + return skipShortCircuitChecksums; + } + + /** + * @return the shortCircuitBufferSize + */ + public int getShortCircuitBufferSize() { + return shortCircuitBufferSize; + } + + /** + * @return the shortCircuitStreamsCacheSize + */ + public int getShortCircuitStreamsCacheSize() { + return shortCircuitStreamsCacheSize; + } + + /** + * @return the shortCircuitStreamsCacheExpiryMs + */ + public long getShortCircuitStreamsCacheExpiryMs() { + return shortCircuitStreamsCacheExpiryMs; + } + + /** + * @return the shortCircuitSharedMemoryWatcherInterruptCheckMs + */ + public int getShortCircuitSharedMemoryWatcherInterruptCheckMs() { + return shortCircuitSharedMemoryWatcherInterruptCheckMs; + } + + /** + * @return the shortCircuitMmapEnabled + */ + public boolean isShortCircuitMmapEnabled() { + return shortCircuitMmapEnabled; + } + + /** + * @return the shortCircuitMmapCacheSize + */ + public int getShortCircuitMmapCacheSize() { + return shortCircuitMmapCacheSize; + } + + /** + * @return the shortCircuitMmapCacheExpiryMs + */ + public long getShortCircuitMmapCacheExpiryMs() { + return shortCircuitMmapCacheExpiryMs; + } + + /** + * @return the shortCircuitMmapCacheRetryTimeout + */ + public long getShortCircuitMmapCacheRetryTimeout() { + return shortCircuitMmapCacheRetryTimeout; + } + + /** + * @return the shortCircuitCacheStaleThresholdMs + */ + public long getShortCircuitCacheStaleThresholdMs() { + return shortCircuitCacheStaleThresholdMs; + } + + /** + * @return the keyProviderCacheExpiryMs + */ + public long getKeyProviderCacheExpiryMs() { + return keyProviderCacheExpiryMs; + } + + public String confAsString() { + StringBuilder builder = new StringBuilder(); + builder.append("shortCircuitStreamsCacheSize = "). + append(shortCircuitStreamsCacheSize). + append(", shortCircuitStreamsCacheExpiryMs = "). + append(shortCircuitStreamsCacheExpiryMs). + append(", shortCircuitMmapCacheSize = "). + append(shortCircuitMmapCacheSize). + append(", shortCircuitMmapCacheExpiryMs = "). + append(shortCircuitMmapCacheExpiryMs). + append(", shortCircuitMmapCacheRetryTimeout = "). + append(shortCircuitMmapCacheRetryTimeout). + append(", shortCircuitCacheStaleThresholdMs = "). + append(shortCircuitCacheStaleThresholdMs). + append(", socketCacheCapacity = "). + append(socketCacheCapacity). + append(", socketCacheExpiry = "). + append(socketCacheExpiry). + append(", shortCircuitLocalReads = "). + append(shortCircuitLocalReads). + append(", useLegacyBlockReaderLocal = "). + append(useLegacyBlockReaderLocal). + append(", domainSocketDataTraffic = "). + append(domainSocketDataTraffic). + append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). + append(shortCircuitSharedMemoryWatcherInterruptCheckMs). + append(", keyProviderCacheExpiryMs = "). + append(keyProviderCacheExpiryMs); + + return builder.toString(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/package-info.java new file mode 100644 index 0000000..44a8b45 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 443576d..d5f4d53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -78,6 +78,13 @@ public final class HdfsConstants { public static final String CLIENT_NAMENODE_PROTOCOL_NAME = "org.apache.hadoop.hdfs.protocol.ClientProtocol"; + // Timeouts for communicating with DataNode for streaming writes/reads + public static final int READ_TIMEOUT = 60 * 1000; + public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; + public static final int WRITE_TIMEOUT = 8 * 60 * 1000; + //for write pipeline + public static final int WRITE_TIMEOUT_EXTENSION = 5 * 1000; + // SafeMode actions public enum SafeModeAction { SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java new file mode 100644 index 0000000..a9adb7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/ByteArrayManager.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage byte array creation and release. + */ +@InterfaceAudience.Private +public abstract class ByteArrayManager { + static final Logger LOG = LoggerFactory.getLogger(ByteArrayManager.class); + private static final ThreadLocal DEBUG_MESSAGE = + new ThreadLocal() { + protected StringBuilder initialValue() { + return new StringBuilder(); + } + }; + + private static void logDebugMessage() { + final StringBuilder b = DEBUG_MESSAGE.get(); + LOG.debug(b.toString()); + b.setLength(0); + } + + static final int MIN_ARRAY_LENGTH = 32; + static final byte[] EMPTY_BYTE_ARRAY = {}; + + /** + * @return the least power of two greater than or equal to n, i.e. return + * the least integer x with x >= n and x a power of two. + * + * @throws HadoopIllegalArgumentException + * if n <= 0. + */ + public static int leastPowerOfTwo(final int n) { + if (n <= 0) { + throw new HadoopIllegalArgumentException("n = " + n + " <= 0"); + } + + final int highestOne = Integer.highestOneBit(n); + if (highestOne == n) { + return n; // n is a power of two. + } + final int roundUp = highestOne << 1; + if (roundUp < 0) { + final long overflow = ((long) highestOne) << 1; + throw new ArithmeticException( + "Overflow: for n = " + n + ", the least power of two (the least" + + " integer x with x >= n and x a power of two) = " + + overflow + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE); + } + return roundUp; + } + + /** + * A counter with a time stamp so that it is reset automatically + * if there is no increment for the time period. + */ + static class Counter { + private final long countResetTimePeriodMs; + private long count = 0L; + private long timestamp = Time.monotonicNow(); + + Counter(long countResetTimePeriodMs) { + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + + synchronized long getCount() { + return count; + } + + /** + * Increment the counter, and reset it if there is no increment + * for a certain time period. + * + * @return the new count. + */ + synchronized long increment() { + final long now = Time.monotonicNow(); + if (now - timestamp > countResetTimePeriodMs) { + count = 0; // reset the counter + } + timestamp = now; + return ++count; + } + } + + /** A map from integers to counters. */ + static final class CounterMap { + /** @see ByteArrayManager.Conf#countResetTimePeriodMs */ + private final long countResetTimePeriodMs; + private final Map map = new HashMap<>(); + + private CounterMap(long countResetTimePeriodMs) { + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + + /** + * @return the counter for the given key; + * and create a new counter if it does not exist. + */ + synchronized Counter get(final Integer key, final boolean + createIfNotExist) { + Counter count = map.get(key); + if (count == null && createIfNotExist) { + count = new Counter(countResetTimePeriodMs); + map.put(key, count); + } + return count; + } + } + + /** Manage byte arrays with the same fixed length. */ + static class FixedLengthManager { + private final int byteArrayLength; + private final int maxAllocated; + private final Queue freeQueue = new LinkedList<>(); + + private int numAllocated = 0; + + FixedLengthManager(int arrayLength, int maxAllocated) { + this.byteArrayLength = arrayLength; + this.maxAllocated = maxAllocated; + } + + /** + * Allocate a byte array. + * + * If the number of allocated arrays >= maximum, the current thread is + * blocked until the number of allocated arrays drops to below the maximum. + * + * The byte array allocated by this method must be returned for recycling + * via the {@link FixedLengthManager#recycle(byte[])} method. + */ + synchronized byte[] allocate() throws InterruptedException { + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", ").append(this); + } + for(; numAllocated >= maxAllocated;) { + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(": wait ..."); + logDebugMessage(); + } + + wait(); + + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append("wake up: ").append(this); + } + } + numAllocated++; + + final byte[] array = freeQueue.poll(); + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", recycled? ").append(array != null); + } + return array != null? array : new byte[byteArrayLength]; + } + + /** + * Recycle the given byte array, which must have the same length as the + * array length managed by this object. + * + * The byte array may or may not be allocated + * by the {@link FixedLengthManager#allocate()} method. + */ + synchronized int recycle(byte[] array) { + Preconditions.checkNotNull(array); + Preconditions.checkArgument(array.length == byteArrayLength); + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", ").append(this); + } + + notify(); + numAllocated--; + if (numAllocated < 0) { + // it is possible to drop below 0 since + // some byte arrays may not be created by the allocate() method. + numAllocated = 0; + } + + if (freeQueue.size() < maxAllocated - numAllocated) { + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", freeQueue.offer"); + } + freeQueue.offer(array); + } + return freeQueue.size(); + } + + @Override + public synchronized String toString() { + return "[" + byteArrayLength + ": " + numAllocated + "/" + + maxAllocated + ", free=" + freeQueue.size() + "]"; + } + } + + /** A map from array lengths to byte array managers. */ + static class ManagerMap { + private final int countLimit; + private final Map map = new HashMap<>(); + + ManagerMap(int countLimit) { + this.countLimit = countLimit; + } + + /** @return the manager for the given array length. */ + synchronized FixedLengthManager get(final Integer arrayLength, + final boolean createIfNotExist) { + FixedLengthManager manager = map.get(arrayLength); + if (manager == null && createIfNotExist) { + manager = new FixedLengthManager(arrayLength, countLimit); + map.put(arrayLength, manager); + } + return manager; + } + } + + /** + * Configuration for ByteArrayManager. + */ + public static class Conf { + /** + * The count threshold for each array length so that a manager is created + * only after the allocation count exceeds the threshold. + */ + private final int countThreshold; + /** + * The maximum number of arrays allowed for each array length. + */ + private final int countLimit; + /** + * The time period in milliseconds that the allocation count for each array + * length is reset to zero if there is no increment. + */ + private final long countResetTimePeriodMs; + + public Conf(int countThreshold, int countLimit, long + countResetTimePeriodMs) { + this.countThreshold = countThreshold; + this.countLimit = countLimit; + this.countResetTimePeriodMs = countResetTimePeriodMs; + } + } + + /** + * Create a byte array for the given length, where the length of + * the returned array is larger than or equal to the given length. + * + * The current thread may be blocked if some resource is unavailable. + * + * The byte array created by this method must be released + * via the {@link ByteArrayManager#release(byte[])} method. + * + * @return a byte array with length larger than or equal to the given length. + */ + public abstract byte[] newByteArray(int size) throws InterruptedException; + + /** + * Release the given byte array. + * + * The byte array may or may not be created + * by the {@link ByteArrayManager#newByteArray(int)} method. + * + * @return the number of free array. + */ + public abstract int release(byte[] array); + + public static ByteArrayManager newInstance(Conf conf) { + return conf == null? new NewByteArrayWithoutLimit(): new Impl(conf); + } + + /** + * A dummy implementation which simply calls new byte[]. + */ + static class NewByteArrayWithoutLimit extends ByteArrayManager { + @Override + public byte[] newByteArray(int size) throws InterruptedException { + return new byte[size]; + } + + @Override + public int release(byte[] array) { + return 0; + } + } + + /** + * Manage byte array allocation and provide a mechanism for recycling the byte + * array objects. + */ + static class Impl extends ByteArrayManager { + private final Conf conf; + + private final CounterMap counters; + private final ManagerMap managers; + + Impl(Conf conf) { + this.conf = conf; + this.counters = new CounterMap(conf.countResetTimePeriodMs); + this.managers = new ManagerMap(conf.countLimit); + } + + /** + * Allocate a byte array, where the length of the allocated array + * is the least power of two of the given length + * unless the given length is less than {@link #MIN_ARRAY_LENGTH}. + * In such case, the returned array length is equal to {@link + * #MIN_ARRAY_LENGTH}. + * + * If the number of allocated arrays exceeds the capacity, + * the current thread is blocked until + * the number of allocated arrays drops to below the capacity. + * + * The byte array allocated by this method must be returned for recycling + * via the {@link Impl#release(byte[])} method. + * + * @return a byte array with length larger than or equal to the given + * length. + */ + @Override + public byte[] newByteArray(final int arrayLength) + throws InterruptedException { + Preconditions.checkArgument(arrayLength >= 0); + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append("allocate(").append(arrayLength).append(")"); + } + + final byte[] array; + if (arrayLength == 0) { + array = EMPTY_BYTE_ARRAY; + } else { + final int powerOfTwo = arrayLength <= MIN_ARRAY_LENGTH? + MIN_ARRAY_LENGTH: leastPowerOfTwo(arrayLength); + final long count = counters.get(powerOfTwo, true).increment(); + final boolean aboveThreshold = count > conf.countThreshold; + // create a new manager only if the count is above threshold. + final FixedLengthManager manager = + managers.get(powerOfTwo, aboveThreshold); + + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(": count=").append(count) + .append(aboveThreshold? ", aboveThreshold": ", belowThreshold"); + } + array = manager != null? manager.allocate(): new byte[powerOfTwo]; + } + + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", return byte[") + .append(array.length).append("]"); + logDebugMessage(); + } + return array; + } + + /** + * Recycle the given byte array. + * + * The byte array may or may not be allocated + * by the {@link Impl#newByteArray(int)} method. + * + * This is a non-blocking call. + */ + @Override + public int release(final byte[] array) { + Preconditions.checkNotNull(array); + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get() + .append("recycle: array.length=").append(array.length); + } + + final int freeQueueSize; + if (array.length == 0) { + freeQueueSize = -1; + } else { + final FixedLengthManager manager = managers.get(array.length, false); + freeQueueSize = manager == null? -1: manager.recycle(array); + } + + if (LOG.isDebugEnabled()) { + DEBUG_MESSAGE.get().append(", freeQueueSize=").append(freeQueueSize); + logDebugMessage(); + } + return freeQueueSize; + } + + CounterMap getCounters() { + return counters; + } + + ManagerMap getManagers() { + return managers; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/package-info.java new file mode 100644 index 0000000..5ba3de0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c463f12..b7fbc23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -814,6 +814,8 @@ Release 2.8.0 - UNRELEASED HDFS-8911. NameNode Metric : Add Editlog counters as a JMX metric. (Anu Engineer via Arpit Agarwal) + HDFS-8803. Move DfsClientConf to hdfs-client. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 714cd68..96044e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -91,7 +91,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { /** * Injects failures into specific operations during unit tests. */ - private final FailureInjector failureInjector; + private static FailureInjector failureInjector = new FailureInjector(); /** * The file name, for logging and debugging purposes. @@ -187,7 +187,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { public BlockReaderFactory(DfsClientConf conf) { this.conf = conf; - this.failureInjector = conf.getShortCircuitConf().brfFailureInjector; this.remainingCacheTries = conf.getNumCachedConnRetry(); } @@ -278,6 +277,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator { return this; } + @VisibleForTesting + public static void setFailureInjectorForTesting(FailureInjector injector) { + failureInjector = injector; + } + /** * Build a BlockReader with the given options. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 6359def..bf11463 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory; @@ -138,8 +139,8 @@ public class ClientContext { */ @VisibleForTesting public static ClientContext getFromConf(Configuration conf) { - return get(conf.get(DFSConfigKeys.DFS_CLIENT_CONTEXT, - DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), + return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT, + HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), new DfsClientConf(conf)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index e1c8a8a..99dbb19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -18,11 +18,11 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; import java.io.BufferedOutputStream; import java.io.DataInputStream; @@ -437,12 +437,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ int getDatanodeWriteTimeout(int numNodes) { final int t = dfsClientConf.getDatanodeSocketWriteTimeout(); - return t > 0? t + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; + return t > 0? t + HdfsConstants.WRITE_TIMEOUT_EXTENSION*numNodes: 0; } int getDatanodeReadTimeout(int numNodes) { final int t = dfsClientConf.getSocketTimeout(); - return t > 0? HdfsServerConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; + return t > 0? HdfsConstants.READ_TIMEOUT_EXTENSION*numNodes + t: 0; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f3fc037..9b14168 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -44,12 +44,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size"; public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096; - public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; - public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String DFS_BYTES_PER_CHECKSUM_KEY = + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; + public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; public static final String DFS_USER_HOME_DIR_PREFIX_KEY = "dfs.user.home.dir.prefix"; public static final String DFS_USER_HOME_DIR_PREFIX_DEFAULT = "/user"; - public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; - public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; + public static final String DFS_CHECKSUM_TYPE_KEY = HdfsClientConfigKeys + .DFS_CHECKSUM_TYPE_KEY; + public static final String DFS_CHECKSUM_TYPE_DEFAULT = + HdfsClientConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT; + public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = + HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; + public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = + HdfsClientConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT = HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT; @@ -487,7 +495,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction"; public static final float DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT = 0.75f; - public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout"; + public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = + HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; public static final String DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup"; public static final String DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins"; public static final String DFS_WEB_UGI_KEY = "dfs.web.ugi"; @@ -498,8 +507,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths"; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT = "/dev/shm,/tmp"; - public static final String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; - public static final int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; + public static final String + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = + HdfsClientConfigKeys + .DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; + public static final int + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = + HdfsClientConfigKeys + .DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT; public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file"; public static final String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = HdfsClientConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY; @@ -540,8 +555,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { 0.6f; public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user"; - public static final String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path"; - public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = ""; + public static final String DFS_DOMAIN_SOCKET_PATH_KEY = + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; + public static final String DFS_DOMAIN_SOCKET_PATH_DEFAULT = + HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; public static final String DFS_STORAGE_POLICY_ENABLED_KEY = "dfs.storage.policy.enabled"; public static final boolean DFS_STORAGE_POLICY_ENABLED_DEFAULT = true; @@ -963,60 +980,136 @@ public class DFSConfigKeys extends CommonConfigurationKeys { = HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = + HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; + @Deprecated + public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; - public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; - public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + @Deprecated + public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; + @Deprecated + public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; + @Deprecated + public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; + @Deprecated + public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; - public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; - public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; - public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; - public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; - public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000; + @Deprecated + public static final String DFS_CLIENT_USE_DN_HOSTNAME = + HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; + @Deprecated + public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = + HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES; + @Deprecated + public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = + HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS; + @Deprecated + public static final String DFS_CLIENT_CACHE_READAHEAD = + HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD; + @Deprecated + public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = + HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; + @Deprecated + public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; - public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname"; - public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false; - public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes"; - public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads"; - public static final String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead"; - public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry"; - public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3; + @Deprecated + public static final String DFS_CLIENT_CONTEXT = HdfsClientConfigKeys + .DFS_CLIENT_CONTEXT; + @Deprecated + public static final String DFS_CLIENT_CONTEXT_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT; + @Deprecated + public static final String + DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = + HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS; + @Deprecated + public static final int + DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = + HdfsClientConfigKeys + .DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT; + @Deprecated + public static final String + DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS = + HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS; + @Deprecated + public static final int + DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT = + HdfsClientConfigKeys + .DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS_DEFAULT; - public static final String DFS_CLIENT_CONTEXT = "dfs.client.context"; - public static final String DFS_CLIENT_CONTEXT_DEFAULT = "default"; - - public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout"; - public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30; + @Deprecated + public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = + HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; + @Deprecated + public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource"; public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml"; public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth"; public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; + // Much code in hdfs is not yet updated to use these keys. // the initial delay (unit is ms) for locateFollowingBlock, the delay time will increase exponentially(double) for each retry. - public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures"; - public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3; + @Deprecated + public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = + HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; + @Deprecated + public static final int DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT; - public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader"; - public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false; - public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = "dfs.client.use.legacy.blockreader.local"; - public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT = false; + @Deprecated + public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADER = + HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; + @Deprecated + public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL = + HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL; + @Deprecated + public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT + = HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL_DEFAULT; public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces"; - public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic"; - public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false; + @Deprecated + public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = + HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; + @Deprecated + public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; // The number of NN response dropped by client proactively in each RPC call. // For testing NN retry cache, we can set this property with positive value. public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number"; public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0; + @Deprecated public static final String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = - "dfs.client.slow.io.warning.threshold.ms"; - public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; + HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY; + + @Deprecated + public static final long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = + HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT; + @Deprecated public static final String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS = - "dfs.client.key.provider.cache.expiry"; + HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS; + @Deprecated public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT = - TimeUnit.DAYS.toMillis(10); // 10 days + HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index c16aef2..acb24f3 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -202,7 +203,7 @@ public class DFSOutputStream extends FSOutputSummer } if (blockSize % bytesPerChecksum != 0) { throw new HadoopIllegalArgumentException("Invalid values: " - + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + ") must divide block size (=" + blockSize + ")."); } this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3aac4758/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java index 99e0e8e..ef9f27a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java @@ -93,7 +93,7 @@ public class HdfsConfiguration extends Configuration { new DeprecationDelta("dfs.secondary.http.address", DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY), new DeprecationDelta("dfs.socket.timeout", - DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY), + HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY), new DeprecationDelta("fs.checkpoint.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY), new DeprecationDelta("fs.checkpoint.edits.dir", @@ -127,17 +127,19 @@ public class HdfsConfiguration extends Configuration { new DeprecationDelta("dfs.permissions.supergroup", DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY), new DeprecationDelta("dfs.write.packet.size", - DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY), + HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY), new DeprecationDelta("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY), new DeprecationDelta("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), new DeprecationDelta("io.bytes.per.checksum", - DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY), + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY), new DeprecationDelta("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES), new DeprecationDelta("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID), + new DeprecationDelta("dfs.client.file-block-storage-locations.timeout", + HdfsClientConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS), }); }