Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9166518AFA for ; Mon, 1 Feb 2016 18:40:52 +0000 (UTC) Received: (qmail 46832 invoked by uid 500); 1 Feb 2016 18:40:23 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 45458 invoked by uid 500); 1 Feb 2016 18:40:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43252 invoked by uid 99); 1 Feb 2016 18:40:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 18:40:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5EF22E383C; Mon, 1 Feb 2016 18:40:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Mon, 01 Feb 2016 18:41:08 -0000 Message-Id: In-Reply-To: <93b720fa24724542aac279387c3102ba@git.apache.org> References: <93b720fa24724542aac279387c3102ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/50] [abbrv] hadoop git commit: Merge branch 'trunk' into HDFS-7240 http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java index 3147767,0000000..c85a554 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java @@@ -1,323 -1,0 +1,323 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.storagecontainer; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlocksMap; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.WritableRpcEngine; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.storagecontainer.protocol.ContainerLocationProtocol; +import org.apache.hadoop.util.LightWeightGSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +/** + * Service that allocates storage containers and tracks their + * location. + */ +public class StorageContainerManager + implements DatanodeProtocol, ContainerLocationProtocol { + + public static final Logger LOG = + LoggerFactory.getLogger(StorageContainerManager.class); + + private final Namesystem ns = new StorageContainerNameService(); + private final BlockManager blockManager; + + private long txnId = 234; + + /** The RPC server that listens to requests from DataNodes. */ + private final RPC.Server serviceRpcServer; + private final InetSocketAddress serviceRPCAddress; + + /** The RPC server that listens to requests from clients. */ + private final RPC.Server clientRpcServer; + private final InetSocketAddress clientRpcAddress; + + public StorageContainerManager(OzoneConfiguration conf) + throws IOException { + BlocksMap containerMap = new BlocksMap( + LightWeightGSet.computeCapacity(2.0, "BlocksMap"), + new StorageContainerMap()); - this.blockManager = new BlockManager(ns, conf, containerMap); ++ this.blockManager = new BlockManager(ns, false, conf, containerMap); + + int handlerCount = + conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, + DFS_NAMENODE_HANDLER_COUNT_DEFAULT); + + RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, + ProtobufRpcEngine.class); + + DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = + new DatanodeProtocolServerSideTranslatorPB(this); + BlockingService dnProtoPbService = + DatanodeProtocolProtos.DatanodeProtocolService + .newReflectiveBlockingService(dnProtoPbTranslator); + + WritableRpcEngine.ensureInitialized(); + + InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false); + if (serviceRpcAddr != null) { + String bindHost = + conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = serviceRpcAddr.getHostName(); + } + LOG.info("Service RPC server is binding to " + bindHost + ":" + + serviceRpcAddr.getPort()); + + int serviceHandlerCount = + conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, + DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); + serviceRpcServer = new RPC.Builder(conf) + .setProtocol( + org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB.class) + .setInstance(dnProtoPbService) + .setBindAddress(bindHost) + .setPort(serviceRpcAddr.getPort()) + .setNumHandlers(serviceHandlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, + serviceRpcServer); + + InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); + serviceRPCAddress = new InetSocketAddress( + serviceRpcAddr.getHostName(), listenAddr.getPort()); + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, + NetUtils.getHostPortString(serviceRPCAddress)); + } else { + serviceRpcServer = null; + serviceRPCAddress = null; + } + + InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf); + String bindHost = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY); + if (bindHost == null || bindHost.isEmpty()) { + bindHost = rpcAddr.getHostName(); + } + LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort()); + + clientRpcServer = new RPC.Builder(conf) + .setProtocol( + org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB.class) + .setInstance(dnProtoPbService) + .setBindAddress(bindHost) + .setPort(rpcAddr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, + clientRpcServer); + + // The rpc-server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddr = clientRpcServer.getListenerAddress(); + clientRpcAddress = new InetSocketAddress( + rpcAddr.getHostName(), listenAddr.getPort()); + conf.set(FS_DEFAULT_NAME_KEY, + DFSUtilClient.getNNUri(clientRpcAddress).toString()); + } + + @Override + public DatanodeRegistration registerDatanode( + DatanodeRegistration registration) throws IOException { + ns.writeLock(); + try { + blockManager.getDatanodeManager().registerDatanode(registration); + } finally { + ns.writeUnlock(); + } + return registration; + } + + @Override + public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, + StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException { + ns.readLock(); + try { + final int maxTransfer = blockManager.getMaxReplicationStreams() + - xmitsInProgress; + DatanodeCommand[] cmds = blockManager.getDatanodeManager() + .handleHeartbeat(registration, reports, ns.getBlockPoolId(), 0, 0, + xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); + + return new HeartbeatResponse(cmds, + new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, + txnId), null, 0); + } finally { + ns.readUnlock(); + } + } + + @Override + public DatanodeCommand blockReport(DatanodeRegistration registration, + String poolId, StorageBlockReport[] reports, + BlockReportContext context) throws IOException { + for (int r = 0; r < reports.length; r++) { + final BlockListAsLongs storageContainerList = reports[r].getBlocks(); + blockManager.processReport(registration, reports[r].getStorage(), + storageContainerList, context, (r == reports.length - 1)); + } + return null; + } + + @Override + public DatanodeCommand cacheReport(DatanodeRegistration registration, + String poolId, List blockIds) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void blockReceivedAndDeleted(DatanodeRegistration registration, + String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks) + throws IOException { + for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) { + ns.writeLock(); + try { + blockManager.processIncrementalBlockReport(registration, r); + } finally { + ns.writeUnlock(); + } + } + } + + @Override + public void errorReport(DatanodeRegistration registration, + int errorCode, String msg) throws IOException { + String dnName = + (registration == null) ? "Unknown DataNode" : registration.toString(); + + if (errorCode == DatanodeProtocol.NOTIFY) { + LOG.info("Error report from " + dnName + ": " + msg); + return; + } + + if (errorCode == DatanodeProtocol.DISK_ERROR) { + LOG.warn("Disk error on " + dnName + ": " + msg); + } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { + LOG.warn("Fatal disk error on " + dnName + ": " + msg); + blockManager.getDatanodeManager().removeDatanode(registration); + } else { + LOG.info("Error report from " + dnName + ": " + msg); + } + } + + @Override + public NamespaceInfo versionRequest() throws IOException { + ns.readLock(); + try { + return unprotectedGetNamespaceInfo(); + } finally { + ns.readUnlock(); + } + } + + private NamespaceInfo unprotectedGetNamespaceInfo() { + return new NamespaceInfo(1, "random", "random", 2, + NodeType.STORAGE_CONTAINER_SERVICE); + } + + @Override + public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { + // It doesn't make sense to have LocatedBlock in this API. + ns.writeLock(); + try { + for (int i = 0; i < blocks.length; i++) { + ExtendedBlock blk = blocks[i].getBlock(); + DatanodeInfo[] nodes = blocks[i].getLocations(); + String[] storageIDs = blocks[i].getStorageIDs(); + for (int j = 0; j < nodes.length; j++) { + blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], + storageIDs == null ? null: storageIDs[j], + "client machine reported it"); + } + } + } finally { + ns.writeUnlock(); + } + } + + /** + * Start client and service RPC servers. + */ + void start() { + clientRpcServer.start(); + if (serviceRpcServer != null) { + serviceRpcServer.start(); + } + } + + /** + * Wait until the RPC servers have shutdown. + */ + void join() throws InterruptedException { + clientRpcServer.join(); + if (serviceRpcServer != null) { + serviceRpcServer.join(); + } + } + + @Override + public void commitBlockSynchronization(ExtendedBlock block, + long newgenerationstamp, long newlength, boolean closeFile, + boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) + throws IOException { + // Not needed for the purpose of object store + throw new UnsupportedOperationException(); + } + + public static void main(String[] argv) throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + StorageContainerManager scm = new StorageContainerManager(conf); + scm.start(); + try { + scm.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java index 13cff36,0000000..76e0bb8 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java @@@ -1,177 -1,0 +1,154 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.storagecontainer; + ++import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.namenode.CacheManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.AccessControlException; + +import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Namesystem implementation to be used by StorageContainerManager. + */ +public class StorageContainerNameService implements Namesystem { + + private ReentrantReadWriteLock coarseLock = new ReentrantReadWriteLock(); + private String blockPoolId; + private volatile boolean serviceRunning = true; + + public void shutdown() { + serviceRunning = false; + } + + @Override + public boolean isRunning() { + return serviceRunning; + } + + @Override + public void checkSuperuserPrivilege() throws AccessControlException { + // TBD + } + + @Override + public String getBlockPoolId() { + return blockPoolId; + } + + public void setBlockPoolId(String id) { + this.blockPoolId = id; + } + + @Override - public boolean isInStandbyState() { - // HA mode is not supported - return false; - } - - @Override - public boolean isGenStampInFuture(Block block) { - // HA mode is not supported - return false; - } - - @Override + public BlockCollection getBlockCollection(long id) { + return null; + } + + @Override - public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) { - // TBD - } - - @Override - public void checkOperation(NameNode.OperationCategory read) - throws StandbyException { - // HA mode is not supported ++ public void startSecretManagerIfNecessary() { ++ throw new NotImplementedException(); + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicyForPath(String src) + throws IOException { + return null; + } + + @Override - public boolean isInSnapshot(BlockInfo blockInfo) { - // Snapshots not supported ++ public boolean isInSnapshot(long blockCollectionID) { + return false; + } + + @Override + public CacheManager getCacheManager() { + // Cache Management is not supported + return null; + } + + @Override + public HAContext getHAContext() { + return null; + } + ++ /** ++ * @return Whether the namenode is transitioning to active state and is in the ++ * middle of the starting active services. ++ */ ++ @Override ++ public boolean inTransitionToActive() { ++ return false; ++ } ++ + @Override + public void readLock() { + coarseLock.readLock().lock(); + } + + @Override + public void readUnlock() { + coarseLock.readLock().unlock(); + } + + @Override + public boolean hasReadLock() { + return coarseLock.getReadHoldCount() > 0 || hasWriteLock(); + } + + @Override + public void writeLock() { + coarseLock.writeLock().lock(); + } + + @Override + public void writeLockInterruptibly() throws InterruptedException { + coarseLock.writeLock().lockInterruptibly(); + } + + @Override + public void writeUnlock() { + coarseLock.writeLock().unlock(); + } + + @Override + public boolean hasWriteLock() { + return coarseLock.isWriteLockedByCurrentThread(); + } + + @Override - public void checkSafeMode() { - // TBD - } - - @Override + public boolean isInSafeMode() { + return false; + } + + @Override + public boolean isInStartupSafeMode() { + return false; + } + - @Override - public void incrementSafeBlockCount(int replication, BlockInfo storedBlock) { - // Do nothing - } - - @Override - public void decrementSafeBlockCount(BlockInfo b) { - // Do nothing - } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index a9429c7,212d2e6..2e7e37c --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@@ -287,18 -289,28 +289,30 @@@ public class TestDataNodeHotSwapVolume // Verify the configuration value is appropriately set. String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); String[] expectDataDirs = newDataDir.split(","); + Arrays.sort(effectiveDataDirs); + Arrays.sort(expectDataDirs); assertEquals(expectDataDirs.length, effectiveDataDirs.length); + List expectedStorageLocations = new ArrayList<>(); + List effectiveStorageLocations = new ArrayList<>(); for (int i = 0; i < expectDataDirs.length; i++) { StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]); - StorageLocation effectiveLocation = - StorageLocation.parse(effectiveDataDirs[i]); - assertEquals(expectLocation.getStorageType(), - effectiveLocation.getStorageType()); - assertEquals(expectLocation.getFile().getCanonicalFile(), - effectiveLocation.getFile().getCanonicalFile()); + StorageLocation effectiveLocation = StorageLocation + .parse(effectiveDataDirs[i]); + expectedStorageLocations.add(expectLocation); + effectiveStorageLocations.add(effectiveLocation); } + Comparator comparator = new Comparator() { + + @Override + public int compare(StorageLocation o1, StorageLocation o2) { + return o1.toString().compareTo(o2.toString()); + } + + }; + Collections.sort(expectedStorageLocations, comparator); + Collections.sort(effectiveStorageLocations, comparator); + assertEquals("Effective volumes doesnt match expected", + expectedStorageLocations, effectiveStorageLocations); // Check that all newly created volumes are appropriately formatted. for (File volumeDir : newVolumeDirs) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index 48f8cef,3af959c..3935d2c --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@@ -25,9 -25,9 +25,10 @@@ import org.apache.hadoop.hdfs.net.* import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.datatransfer.*; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; + import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.util.DataChecksum; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 8781841,261a8b0..dc7366f --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@@ -440,4 -450,95 +450,5 @@@ public class TestFsDatasetImpl assertSame(replica, BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); } + - @Test - public void testLoadingDfsUsedForVolumes() throws IOException, - InterruptedException { - long waitIntervalTime = 5000; - // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime - // to avoid cache-dfsused time expired - long cachedDfsUsedIntervalTime = waitIntervalTime + 1000; - conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, - cachedDfsUsedIntervalTime); - - long cacheDfsUsed = 1024; - long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); - - assertEquals(cacheDfsUsed, dfsUsed); - } - - @Test - public void testLoadingDfsUsedForVolumesExpired() throws IOException, - InterruptedException { - long waitIntervalTime = 5000; - // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime - // to make cache-dfsused time expired - long cachedDfsUsedIntervalTime = waitIntervalTime - 1000; - conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, - cachedDfsUsedIntervalTime); - - long cacheDfsUsed = 1024; - long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); - - // Because the cache-dfsused expired and the dfsUsed will be recalculated - assertTrue(cacheDfsUsed != dfsUsed); - } - - private long getDfsUsedValueOfNewVolume(long cacheDfsUsed, - long waitIntervalTime) throws IOException, InterruptedException { - List nsInfos = Lists.newArrayList(); - nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1)); - - String CURRENT_DIR = "current"; - String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE; - String path = BASE_DIR + "/newData0"; - String pathUri = new Path(path).toUri().toString(); - StorageLocation loc = StorageLocation.parse(pathUri); - Storage.StorageDirectory sd = createStorageDirectory(new File(path)); - DataStorage.VolumeBuilder builder = - new DataStorage.VolumeBuilder(storage, sd); - when( - storage.prepareVolume(eq(datanode), eq(loc.getFile()), - anyListOf(NamespaceInfo.class))).thenReturn(builder); - - String cacheFilePath = - String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0], - CURRENT_DIR, DU_CACHE_FILE); - File outFile = new File(cacheFilePath); - - if (!outFile.getParentFile().exists()) { - outFile.getParentFile().mkdirs(); - } - - if (outFile.exists()) { - outFile.delete(); - } - - FakeTimer timer = new FakeTimer(); - try { - try (Writer out = - new OutputStreamWriter(new FileOutputStream(outFile), - StandardCharsets.UTF_8)) { - // Write the dfsUsed value and the time to cache file - out.write(Long.toString(cacheDfsUsed) + " " - + Long.toString(timer.now())); - out.flush(); - } - } catch (IOException ioe) { - } - - dataset.setTimer(timer); - timer.advance(waitIntervalTime); - dataset.addVolume(loc, nsInfos); - - // Get the last volume which was just added before - FsVolumeImpl newVolume; - try (FsDatasetSpi.FsVolumeReferences volumes = - dataset.getFsVolumeReferences()) { - newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1); - } - long dfsUsed = newVolume.getDfsUsed(); - - return dfsUsed; - } }