hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject hadoop git commit: HDFS-9895. Remove unnecessary conf cache from DataNode. Contributed by Xiaobing Zhou.
Date Thu, 15 Sep 2016 22:29:39 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk fcbac0021 -> ec3ea1887


HDFS-9895. Remove unnecessary conf cache from DataNode. Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec3ea188
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec3ea188
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec3ea188

Branch: refs/heads/trunk
Commit: ec3ea188753e2c138a64e10303c8751116dc4e80
Parents: fcbac00
Author: Arpit Agarwal <arp@apache.org>
Authored: Thu Sep 15 15:25:47 2016 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Thu Sep 15 15:25:47 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockScanner.java      |   4 +
 .../hadoop/hdfs/server/datanode/DNConf.java     |  95 +++++++------
 .../hadoop/hdfs/server/datanode/DataNode.java   | 133 ++++++++++---------
 .../server/datanode/TestBPOfferService.java     |   4 +-
 .../TestDataXceiverLazyPersistHint.java         |   5 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |   6 +-
 6 files changed, 130 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index be6aa83..456dcc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -165,6 +165,10 @@ public class BlockScanner {
     }
   }
 
+  public BlockScanner(DataNode datanode) {
+    this(datanode, datanode.getConf());
+  }
+
   public BlockScanner(DataNode datanode, Configuration conf) {
     this.datanode = datanode;
     this.conf = new Conf(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 757d7f5..823d05c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -56,6 +56,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTI
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT;
 
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -72,7 +73,6 @@ import java.util.concurrent.TimeUnit;
  */
 @InterfaceAudience.Private
 public class DNConf {
-  final Configuration conf;
   final int socketTimeout;
   final int socketWriteTimeout;
   final int socketKeepaliveTimeout;
@@ -120,73 +120,77 @@ public class DNConf {
   private final int volFailuresTolerated;
   private final int volsConfigured;
   private final int maxDataLength;
+  private Configurable dn;
 
-  public DNConf(Configuration conf) {
-    this.conf = conf;
-    socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+  public DNConf(final Configurable dn) {
+    this.dn = dn;
+    socketTimeout = getConf().getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsConstants.READ_TIMEOUT);
-    socketWriteTimeout = conf.getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+    socketWriteTimeout = getConf().getInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
         HdfsConstants.WRITE_TIMEOUT);
-    socketKeepaliveTimeout = conf.getInt(
+    socketKeepaliveTimeout = getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
         DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
-    this.transferSocketSendBufferSize = conf.getInt(
+    this.transferSocketSendBufferSize = getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
-    this.transferSocketRecvBufferSize = conf.getInt(
+    this.transferSocketRecvBufferSize = getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT);
-    this.tcpNoDelay = conf.getBoolean(
+    this.tcpNoDelay = getConf().getBoolean(
         DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY,
         DFSConfigKeys.DFS_DATA_TRANSFER_SERVER_TCPNODELAY_DEFAULT);
 
     /* Based on results on different platforms, we might need set the default
      * to false on some of them. */
-    transferToAllowed = conf.getBoolean(
+    transferToAllowed = getConf().getBoolean(
         DFS_DATANODE_TRANSFERTO_ALLOWED_KEY,
         DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
 
-    writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
+    writePacketSize = getConf().getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
         DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
     
-    readaheadLength = conf.getLong(
+    readaheadLength = getConf().getLong(
         HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
         HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
-    maxDataLength = conf.getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH,
+    maxDataLength = getConf().getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH,
         DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
-    dropCacheBehindWrites = conf.getBoolean(
+    dropCacheBehindWrites = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
-    syncBehindWrites = conf.getBoolean(
+    syncBehindWrites = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
-    syncBehindWritesInBackground = conf.getBoolean(
+    syncBehindWritesInBackground = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY,
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT);
-    dropCacheBehindReads = conf.getBoolean(
+    dropCacheBehindReads = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
-    connectToDnViaHostname = conf.getBoolean(
+    connectToDnViaHostname = getConf().getBoolean(
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
-    this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+    this.blockReportInterval = getConf().getLong(
+        DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
-    this.ibrInterval = conf.getLong(
+    this.ibrInterval = getConf().getLong(
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY,
         DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT);
-    this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
-                                            DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
-    this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+    this.blockReportSplitThreshold = getConf().getLong(
+        DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
+        DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
+    this.cacheReportInterval = getConf().getLong(
+        DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
         DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
 
-    this.dfsclientSlowIoWarningThresholdMs = conf.getLong(
+    this.dfsclientSlowIoWarningThresholdMs = getConf().getLong(
         HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
         HdfsClientConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
-    this.datanodeSlowIoWarningThresholdMs = conf.getLong(
+    this.datanodeSlowIoWarningThresholdMs = getConf().getLong(
         DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
         DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
 
-    long initBRDelay = conf.getTimeDuration(
+    long initBRDelay = getConf().getTimeDuration(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
         DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS) * 1000L;
     if (initBRDelay >= blockReportInterval) {
@@ -197,11 +201,11 @@ public class DNConf {
     }
     initialBlockReportDelayMs = initBRDelay;
     
-    heartBeatInterval = conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
+    heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS) * 1000L;
     long confLifelineIntervalMs =
-        conf.getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
-        3 * conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
+        getConf().getLong(DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY,
+        3 * getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS)) * 1000L;
     if (confLifelineIntervalMs <= heartBeatInterval) {
       confLifelineIntervalMs = 3 * heartBeatInterval;
@@ -215,47 +219,50 @@ public class DNConf {
     lifelineIntervalMs = confLifelineIntervalMs;
     
     // do we need to sync block file contents to disk when blockfile is closed?
-    this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
+    this.syncOnClose = getConf().getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY,
         DFS_DATANODE_SYNCONCLOSE_DEFAULT);
 
-    this.minimumNameNodeVersion = conf.get(DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
+    this.minimumNameNodeVersion = getConf().get(
+        DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_KEY,
         DFS_DATANODE_MIN_SUPPORTED_NAMENODE_VERSION_DEFAULT);
     
-    this.encryptDataTransfer = conf.getBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY,
+    this.encryptDataTransfer = getConf().getBoolean(
+        DFS_ENCRYPT_DATA_TRANSFER_KEY,
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-    this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+    this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
+    this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
     this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
-      conf);
-    this.ignoreSecurePortsForTesting = conf.getBoolean(
+      getConf());
+    this.ignoreSecurePortsForTesting = getConf().getBoolean(
         IGNORE_SECURE_PORTS_FOR_TESTING_KEY,
         IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT);
     
-    this.xceiverStopTimeout = conf.getLong(
+    this.xceiverStopTimeout = getConf().getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
 
-    this.maxLockedMemory = conf.getLong(
+    this.maxLockedMemory = getConf().getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
 
-    this.restartReplicaExpiry = conf.getLong(
+    this.restartReplicaExpiry = getConf().getLong(
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
 
-    this.allowNonLocalLazyPersist = conf.getBoolean(
+    this.allowNonLocalLazyPersist = getConf().getBoolean(
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
 
-    this.bpReadyTimeout = conf.getTimeDuration(
+    this.bpReadyTimeout = getConf().getTimeDuration(
         DFS_DATANODE_BP_READY_TIMEOUT_KEY,
         DFS_DATANODE_BP_READY_TIMEOUT_DEFAULT, TimeUnit.SECONDS);
 
     this.volFailuresTolerated =
-        conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
+        getConf().getInt(
+            DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
             DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
     String[] dataDirs =
-        conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+        getConf().getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
     this.volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
   }
 
@@ -270,7 +277,7 @@ public class DNConf {
    * @return Configuration the configuration
    */
   public Configuration getConf() {
-    return conf;
+    return this.dn.getConf();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 09ecac1..abff796 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -363,7 +363,6 @@ public class DataNode extends ReconfigurableBase
   private SecureResources secureResources = null;
   // dataDirs must be accessed while holding the DataNode lock.
   private List<StorageLocation> dataDirs;
-  private Configuration conf;
   private final String confVersion;
   private final long maxNumberOfBlocksToLog;
   private final boolean pipelineSupportECN;
@@ -419,7 +418,7 @@ public class DataNode extends ReconfigurableBase
     this.confVersion = null;
     this.usersWithLocalPathAccess = null;
     this.connectToDnViaHostname = false;
-    this.blockScanner = new BlockScanner(this, conf);
+    this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
     this.checkDiskErrorInterval =
         ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
@@ -438,7 +437,7 @@ public class DataNode extends ReconfigurableBase
     this.tracer = createTracer(conf);
     this.tracerConfigurationManager =
         new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
-    this.blockScanner = new BlockScanner(this, conf);
+    this.blockScanner = new BlockScanner(this);
     this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -487,7 +486,7 @@ public class DataNode extends ReconfigurableBase
     try {
       hostName = getHostName(conf);
       LOG.info("Configured hostname is " + hostName);
-      startDataNode(conf, dataDirs, resources);
+      startDataNode(dataDirs, resources);
     } catch (IOException ie) {
       shutdown();
       throw ie;
@@ -527,7 +526,7 @@ public class DataNode extends ReconfigurableBase
         try {
           LOG.info("Reconfiguring " + property + " to " + newVal);
           this.refreshVolumes(newVal);
-          return conf.get(DFS_DATANODE_DATA_DIR_KEY);
+          return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
         } catch (IOException e) {
           rootException = e;
         } finally {
@@ -650,7 +649,7 @@ public class DataNode extends ReconfigurableBase
 
     // Use the existing StorageLocation to detect storage type changes.
     Map<String, StorageLocation> existingLocations = new HashMap<>();
-    for (StorageLocation loc : getStorageLocations(this.conf)) {
+    for (StorageLocation loc : getStorageLocations(getConf())) {
       existingLocations.put(loc.getFile().getCanonicalPath(), loc);
     }
 
@@ -846,7 +845,7 @@ public class DataNode extends ReconfigurableBase
         it.remove();
       }
     }
-    conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
+    getConf().set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
 
     if (ioe != null) {
       throw ioe;
@@ -904,14 +903,14 @@ public class DataNode extends ReconfigurableBase
    * for information related to the different configuration options and
    * Http Policy is decided.
    */
-  private void startInfoServer(Configuration conf)
+  private void startInfoServer()
     throws IOException {
     // SecureDataNodeStarter will bind the privileged port to the channel if
     // the DN is started by JSVC, pass it along.
     ServerSocketChannel httpServerChannel = secureResources != null ?
         secureResources.getHttpServerChannel() : null;
 
-    this.httpServer = new DatanodeHttpServer(conf, this, httpServerChannel);
+    httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
     httpServer.start();
     if (httpServer.getHttpAddress() != null) {
       infoPort = httpServer.getHttpAddress().getPort();
@@ -933,24 +932,24 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
-  private void initIpcServer(Configuration conf) throws IOException {
+  private void initIpcServer() throws IOException {
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
-        conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
+        getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
     
     // Add all the RPC protocols that the Datanode implements    
-    RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
+    RPC.setProtocolEngine(getConf(), ClientDatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
     ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = 
           new ClientDatanodeProtocolServerSideTranslatorPB(this);
     BlockingService service = ClientDatanodeProtocolService
         .newReflectiveBlockingService(clientDatanodeProtocolXlator);
-    ipcServer = new RPC.Builder(conf)
+    ipcServer = new RPC.Builder(getConf())
         .setProtocol(ClientDatanodeProtocolPB.class)
         .setInstance(service)
         .setBindAddress(ipcAddr.getHostName())
         .setPort(ipcAddr.getPort())
         .setNumHandlers(
-            conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
+            getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
                 DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
         .setSecretManager(blockPoolTokenSecretManager).build();
 
@@ -958,29 +957,32 @@ public class DataNode extends ReconfigurableBase
         = new ReconfigurationProtocolServerSideTranslatorPB(this);
     service = ReconfigurationProtocolService
         .newReflectiveBlockingService(reconfigurationProtocolXlator);
-    DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class, service,
+    DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
         ipcServer);
 
     InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator = 
         new InterDatanodeProtocolServerSideTranslatorPB(this);
     service = InterDatanodeProtocolService
         .newReflectiveBlockingService(interDatanodeProtocolXlator);
-    DFSUtil.addPBProtocol(conf, InterDatanodeProtocolPB.class, service,
+    DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
         ipcServer);
 
     TraceAdminProtocolServerSideTranslatorPB traceAdminXlator =
         new TraceAdminProtocolServerSideTranslatorPB(this);
     BlockingService traceAdminService = TraceAdminService
         .newReflectiveBlockingService(traceAdminXlator);
-    DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService,
+    DFSUtil.addPBProtocol(
+        getConf(),
+        TraceAdminProtocolPB.class,
+        traceAdminService,
         ipcServer);
 
     LOG.info("Opened IPC server at " + ipcServer.getListenerAddress());
 
     // set service-level authorization security policy
-    if (conf.getBoolean(
+    if (getConf().getBoolean(
         CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
-      ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
+      ipcServer.refreshServiceAcl(getConf(), new HDFSPolicyProvider());
     }
   }
 
@@ -1071,17 +1073,17 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
-  private void initDataXceiver(Configuration conf) throws IOException {
+  private void initDataXceiver() throws IOException {
     // find free port or use privileged port provided
     TcpPeerServer tcpPeerServer;
     if (secureResources != null) {
       tcpPeerServer = new TcpPeerServer(secureResources);
     } else {
-      int backlogLength = conf.getInt(
+      int backlogLength = getConf().getInt(
           CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
           CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
       tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,
-          DataNode.getStreamingAddr(conf), backlogLength);
+          DataNode.getStreamingAddr(getConf()), backlogLength);
     }
     if (dnConf.getTransferSocketRecvBufferSize() > 0) {
       tcpPeerServer.setReceiveBufferSize(
@@ -1090,24 +1092,27 @@ public class DataNode extends ReconfigurableBase
     streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
-    xserver = new DataXceiverServer(tcpPeerServer, conf, this);
+    xserver = new DataXceiverServer(tcpPeerServer, getConf(), this);
     this.dataXceiverServer = new Daemon(threadGroup, xserver);
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
-    if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
-              HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
-        conf.getBoolean(HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
-              HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+    if (getConf().getBoolean(
+        HdfsClientConfigKeys.Read.ShortCircuit.KEY,
+        HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT) ||
+        getConf().getBoolean(
+            HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+            HdfsClientConfigKeys
+              .DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
       DomainPeerServer domainPeerServer =
-                getDomainPeerServer(conf, streamingAddr.getPort());
+                getDomainPeerServer(getConf(), streamingAddr.getPort());
       if (domainPeerServer != null) {
         this.localDataXceiverServer = new Daemon(threadGroup,
-            new DataXceiverServer(domainPeerServer, conf, this));
+            new DataXceiverServer(domainPeerServer, getConf(), this));
         LOG.info("Listening on UNIX domain socket: " +
             domainPeerServer.getBindPath());
       }
     }
-    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
+    this.shortCircuitRegistry = new ShortCircuitRegistry(getConf());
   }
 
   private static DomainPeerServer getDomainPeerServer(Configuration conf,
@@ -1288,26 +1293,23 @@ public class DataNode extends ReconfigurableBase
   /**
    * This method starts the data node with the specified conf.
    * 
-   * @param conf - the configuration
-   *  if conf's CONFIG_PROPERTY_SIMULATED property is set
-   *  then a simulated storage based data node is created.
+   * If conf's CONFIG_PROPERTY_SIMULATED property is set
+   * then a simulated storage based data node is created.
    * 
    * @param dataDirs - only for a non-simulated storage data node
    * @throws IOException
    */
-  void startDataNode(Configuration conf, 
-                     List<StorageLocation> dataDirs,
+  void startDataNode(List<StorageLocation> dataDirectories,
                      SecureResources resources
                      ) throws IOException {
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
     synchronized (this) {
-      this.dataDirs = dataDirs;
+      this.dataDirs = dataDirectories;
     }
-    this.conf = conf;
-    this.dnConf = new DNConf(conf);
-    checkSecureConfig(dnConf, conf, resources);
+    this.dnConf = new DNConf(this);
+    checkSecureConfig(dnConf, getConf(), resources);
 
     if (dnConf.maxLockedMemory > 0) {
       if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
@@ -1347,10 +1349,10 @@ public class DataNode extends ReconfigurableBase
     
     // global DN settings
     registerMXBean();
-    initDataXceiver(conf);
-    startInfoServer(conf);
+    initDataXceiver();
+    startInfoServer();
     pauseMonitor = new JvmPauseMonitor();
-    pauseMonitor.init(conf);
+    pauseMonitor.init(getConf());
     pauseMonitor.start();
   
     // BlockPoolTokenSecretManager is required to create ipc server.
@@ -1360,24 +1362,24 @@ public class DataNode extends ReconfigurableBase
     dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
     LOG.info("dnUserName = " + dnUserName);
     LOG.info("supergroup = " + supergroup);
-    initIpcServer(conf);
+    initIpcServer();
 
-    metrics = DataNodeMetrics.create(conf, getDisplayName());
+    metrics = DataNodeMetrics.create(getConf(), getDisplayName());
     metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
 
-    ecWorker = new ErasureCodingWorker(conf, this);
+    ecWorker = new ErasureCodingWorker(getConf(), this);
     blockRecoveryWorker = new BlockRecoveryWorker(this);
 
     blockPoolManager = new BlockPoolManager(this);
-    blockPoolManager.refreshNamenodes(conf);
+    blockPoolManager.refreshNamenodes(getConf());
 
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
-    saslClient = new SaslDataTransferClient(dnConf.conf, 
+    saslClient = new SaslDataTransferClient(dnConf.getConf(),
         dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
-    startMetricsLogger(conf);
+    startMetricsLogger();
   }
 
   /**
@@ -1592,10 +1594,10 @@ public class DataNode extends ReconfigurableBase
     // failures.
     checkDiskError();
 
-    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+    data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
-    initDirectoryScanner(conf);
-    initDiskBalancer(data, conf);
+    initDirectoryScanner(getConf());
+    initDiskBalancer(data, getConf());
   }
 
   List<BPOfferService> getAllBpOs() {
@@ -1616,10 +1618,10 @@ public class DataNode extends ReconfigurableBase
    */
   private void initStorage(final NamespaceInfo nsInfo) throws IOException {
     final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
-        = FsDatasetSpi.Factory.getFactory(conf);
+        = FsDatasetSpi.Factory.getFactory(getConf());
     
     if (!factory.isSimulated()) {
-      final StartupOption startOpt = getStartupOption(conf);
+      final StartupOption startOpt = getStartupOption(getConf());
       if (startOpt == null) {
         throw new IOException("Startup option not set.");
       }
@@ -1639,7 +1641,7 @@ public class DataNode extends ReconfigurableBase
 
     synchronized(this)  {
       if (data == null) {
-        data = factory.newInstance(this, storage, conf);
+        data = factory.newInstance(this, storage, getConf());
       }
     }
   }
@@ -1720,7 +1722,7 @@ public class DataNode extends ReconfigurableBase
    */
   DatanodeProtocolClientSideTranslatorPB connectToNN(
       InetSocketAddress nnAddr) throws IOException {
-    return new DatanodeProtocolClientSideTranslatorPB(nnAddr, conf);
+    return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
   }
 
   /**
@@ -1733,7 +1735,7 @@ public class DataNode extends ReconfigurableBase
   DatanodeLifelineProtocolClientSideTranslatorPB connectToLifelineNN(
       InetSocketAddress lifelineNnAddr) throws IOException {
     return new DatanodeLifelineProtocolClientSideTranslatorPB(lifelineNnAddr,
-        conf);
+        getConf());
   }
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
@@ -2388,7 +2390,7 @@ public class DataNode extends ReconfigurableBase
         unbufIn = saslStreams.in;
         
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtilClient.getSmallBufferSize(conf)));
+            DFSUtilClient.getSmallBufferSize(getConf())));
         in = new DataInputStream(unbufIn);
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, true, DataNode.this, null, cachingStrategy);
@@ -2508,7 +2510,7 @@ public class DataNode extends ReconfigurableBase
     }
     ipcServer.setTracer(tracer);
     ipcServer.start();
-    startPlugins(conf);
+    startPlugins(getConf());
   }
 
   /**
@@ -3051,8 +3053,8 @@ public class DataNode extends ReconfigurableBase
   @Override // ClientDatanodeProtocol
   public void refreshNamenodes() throws IOException {
     checkSuperuserPrivilege();
-    conf = new Configuration();
-    refreshNamenodes(conf);
+    setConf(new Configuration());
+    refreshNamenodes(getConf());
   }
   
   @Override // ClientDatanodeProtocol
@@ -3327,8 +3329,8 @@ public class DataNode extends ReconfigurableBase
                            Token<BlockTokenIdentifier> blockToken)
       throws IOException {
 
-    return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient,
-        NetUtils.getDefaultSocketFactory(getConf()), false,
+    return DFSUtilClient.connectToDN(datanodeID, timeout, getConf(),
+        saslClient, NetUtils.getDefaultSocketFactory(getConf()), false,
         getDataEncryptionKeyFactoryForBlock(block), blockToken);
   }
 
@@ -3341,7 +3343,7 @@ public class DataNode extends ReconfigurableBase
     final int numOobTypes = oobEnd - oobStart + 1;
     oobTimeouts = new long[numOobTypes];
 
-    final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
+    final String[] ele = getConf().get(DFS_DATANODE_OOB_TIMEOUT_KEY,
         DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
     for (int i = 0; i < numOobTypes; i++) {
       oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
@@ -3367,10 +3369,9 @@ public class DataNode extends ReconfigurableBase
    * Start a timer to periodically write DataNode metrics to the log file. This
    * behavior can be disabled by configuration.
    *
-   * @param metricConf
    */
-  protected void startMetricsLogger(Configuration metricConf) {
-    long metricsLoggerPeriodSec = metricConf.getInt(
+  protected void startMetricsLogger() {
+    long metricsLoggerPeriodSec = getConf().getInt(
         DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
         DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 29db702..2d50c75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -116,7 +116,7 @@ public class TestBPOfferService {
     File dnDataDir = new File(new File(TEST_BUILD_DATA, "dfs"), "data");
     conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
     Mockito.doReturn(conf).when(mockDn).getConf();
-    Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
+    Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
     .when(mockDn).getMetrics();
 
@@ -338,7 +338,7 @@ public class TestBPOfferService {
       new File(TEST_BUILD_DATA, "testBPInitErrorHandling"), "data");
     conf.set(DFS_DATANODE_DATA_DIR_KEY, dnDataDir.toURI().toString());
     Mockito.doReturn(conf).when(mockDn).getConf();
-    Mockito.doReturn(new DNConf(conf)).when(mockDn).getDnConf();
+    Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
     final AtomicInteger count = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index c21cc86..b2bfe49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -171,12 +171,13 @@ public class TestDataXceiverLazyPersistHint {
     conf.setBoolean(
         DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
         nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
-    DNConf dnConf = new DNConf(conf);
+
     DatanodeRegistration mockDnReg = mock(DatanodeRegistration.class);
     DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
     DataNode mockDn = mock(DataNode.class);
-    when(mockDn.getDnConf()).thenReturn(dnConf);
     when(mockDn.getConf()).thenReturn(conf);
+    DNConf dnConf = new DNConf(mockDn);
+    when(mockDn.getDnConf()).thenReturn(dnConf);
     when(mockDn.getMetrics()).thenReturn(mockMetrics);
     when(mockDn.getDNRegistrationForBP("Dummy-pool")).thenReturn(mockDnReg);
     return mockDn;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec3ea188/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index a330fbf..179b617 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -163,11 +163,11 @@ public class TestFsDatasetImpl {
     storage = mock(DataStorage.class);
     this.conf = new Configuration();
     this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
-    final DNConf dnConf = new DNConf(conf);
 
     when(datanode.getConf()).thenReturn(conf);
+    final DNConf dnConf = new DNConf(datanode);
     when(datanode.getDnConf()).thenReturn(dnConf);
-    final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+    final BlockScanner disabledBlockScanner = new BlockScanner(datanode);
     when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
     final ShortCircuitRegistry shortCircuitRegistry =
         new ShortCircuitRegistry(conf);
@@ -326,7 +326,7 @@ public class TestFsDatasetImpl {
     RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
         new RoundRobinVolumeChoosingPolicy<>();
     conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
-    final BlockScanner blockScanner = new BlockScanner(datanode, conf);
+    final BlockScanner blockScanner = new BlockScanner(datanode);
     final FsVolumeList volumeList = new FsVolumeList(
         Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
     final List<FsVolumeImpl> oldVolumes = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message