hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [49/50] [abbrv] hadoop git commit: Merge branch 'trunk' into HDFS-7240
Date Mon, 01 Feb 2016 18:41:08 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java
index 3147767,0000000..c85a554
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerManager.java
@@@ -1,323 -1,0 +1,323 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.storagecontainer;
 +
 +import com.google.protobuf.BlockingService;
 +import org.apache.hadoop.ha.HAServiceProtocol;
 +import org.apache.hadoop.hdfs.DFSUtil;
 +import org.apache.hadoop.hdfs.DFSUtilClient;
 +import org.apache.hadoop.hdfs.protocol.*;
 +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
 +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
 +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlocksMap;
 +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 +import org.apache.hadoop.hdfs.server.namenode.NameNode;
 +import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 +import org.apache.hadoop.hdfs.server.protocol.*;
 +import org.apache.hadoop.ipc.ProtobufRpcEngine;
 +import org.apache.hadoop.ipc.RPC;
 +import org.apache.hadoop.ipc.WritableRpcEngine;
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.hadoop.ozone.OzoneConfiguration;
 +import org.apache.hadoop.storagecontainer.protocol.ContainerLocationProtocol;
 +import org.apache.hadoop.util.LightWeightGSet;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.List;
 +
 +import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 +
 +/**
 + * Service that allocates storage containers and tracks their
 + * location.
 + */
 +public class StorageContainerManager
 +    implements DatanodeProtocol, ContainerLocationProtocol {
 +
 +  public static final Logger LOG =
 +      LoggerFactory.getLogger(StorageContainerManager.class);
 +
 +  private final Namesystem ns = new StorageContainerNameService();
 +  private final BlockManager blockManager;
 +
 +  private long txnId = 234;
 +
 +  /** The RPC server that listens to requests from DataNodes. */
 +  private final RPC.Server serviceRpcServer;
 +  private final InetSocketAddress serviceRPCAddress;
 +
 +  /** The RPC server that listens to requests from clients. */
 +  private final RPC.Server clientRpcServer;
 +  private final InetSocketAddress clientRpcAddress;
 +
 +  public StorageContainerManager(OzoneConfiguration conf)
 +      throws IOException {
 +    BlocksMap containerMap = new BlocksMap(
 +        LightWeightGSet.computeCapacity(2.0, "BlocksMap"),
 +        new StorageContainerMap());
-     this.blockManager = new BlockManager(ns, conf, containerMap);
++    this.blockManager = new BlockManager(ns, false, conf, containerMap);
 +
 +    int handlerCount =
 +        conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
 +            DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
 +
 +    RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
 +        ProtobufRpcEngine.class);
 +
 +    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
 +        new DatanodeProtocolServerSideTranslatorPB(this);
 +    BlockingService dnProtoPbService =
 +        DatanodeProtocolProtos.DatanodeProtocolService
 +            .newReflectiveBlockingService(dnProtoPbTranslator);
 +
 +    WritableRpcEngine.ensureInitialized();
 +
 +    InetSocketAddress serviceRpcAddr = NameNode.getServiceAddress(conf, false);
 +    if (serviceRpcAddr != null) {
 +      String bindHost =
 +          conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
 +      if (bindHost == null || bindHost.isEmpty()) {
 +        bindHost = serviceRpcAddr.getHostName();
 +      }
 +      LOG.info("Service RPC server is binding to " + bindHost + ":" +
 +          serviceRpcAddr.getPort());
 +
 +      int serviceHandlerCount =
 +          conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
 +              DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
 +      serviceRpcServer = new RPC.Builder(conf)
 +          .setProtocol(
 +              org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB.class)
 +          .setInstance(dnProtoPbService)
 +          .setBindAddress(bindHost)
 +          .setPort(serviceRpcAddr.getPort())
 +          .setNumHandlers(serviceHandlerCount)
 +          .setVerbose(false)
 +          .setSecretManager(null)
 +          .build();
 +
 +      DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
 +          serviceRpcServer);
 +
 +      InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
 +      serviceRPCAddress = new InetSocketAddress(
 +          serviceRpcAddr.getHostName(), listenAddr.getPort());
 +      conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
 +          NetUtils.getHostPortString(serviceRPCAddress));
 +    } else {
 +      serviceRpcServer = null;
 +      serviceRPCAddress = null;
 +    }
 +
 +    InetSocketAddress rpcAddr = DFSUtilClient.getNNAddress(conf);
 +    String bindHost = conf.getTrimmed(DFS_NAMENODE_RPC_BIND_HOST_KEY);
 +    if (bindHost == null || bindHost.isEmpty()) {
 +      bindHost = rpcAddr.getHostName();
 +    }
 +    LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());
 +
 +    clientRpcServer = new RPC.Builder(conf)
 +        .setProtocol(
 +            org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB.class)
 +        .setInstance(dnProtoPbService)
 +        .setBindAddress(bindHost)
 +        .setPort(rpcAddr.getPort())
 +        .setNumHandlers(handlerCount)
 +        .setVerbose(false)
 +        .setSecretManager(null)
 +        .build();
 +
 +    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
 +        clientRpcServer);
 +
 +    // The rpc-server port can be ephemeral... ensure we have the correct info
 +    InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
 +    clientRpcAddress = new InetSocketAddress(
 +        rpcAddr.getHostName(), listenAddr.getPort());
 +    conf.set(FS_DEFAULT_NAME_KEY,
 +        DFSUtilClient.getNNUri(clientRpcAddress).toString());
 +  }
 +
 +  @Override
 +  public DatanodeRegistration registerDatanode(
 +      DatanodeRegistration registration) throws IOException {
 +    ns.writeLock();
 +    try {
 +      blockManager.getDatanodeManager().registerDatanode(registration);
 +    } finally {
 +      ns.writeUnlock();
 +    }
 +    return registration;
 +  }
 +
 +  @Override
 +  public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
 +      StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
 +      int xmitsInProgress, int xceiverCount, int failedVolumes,
 +      VolumeFailureSummary volumeFailureSummary,
 +               boolean requestFullBlockReportLease) throws IOException {
 +    ns.readLock();
 +    try {
 +      final int maxTransfer = blockManager.getMaxReplicationStreams()
 +          - xmitsInProgress;
 +      DatanodeCommand[] cmds = blockManager.getDatanodeManager()
 +          .handleHeartbeat(registration, reports, ns.getBlockPoolId(), 0, 0,
 +              xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
 +
 +      return new HeartbeatResponse(cmds,
 +          new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE,
 +              txnId), null, 0);
 +    } finally {
 +      ns.readUnlock();
 +    }
 +  }
 +
 +  @Override
 +  public DatanodeCommand blockReport(DatanodeRegistration registration,
 +      String poolId, StorageBlockReport[] reports,
 +      BlockReportContext context) throws IOException {
 +    for (int r = 0; r < reports.length; r++) {
 +      final BlockListAsLongs storageContainerList = reports[r].getBlocks();
 +      blockManager.processReport(registration, reports[r].getStorage(),
 +          storageContainerList, context, (r == reports.length - 1));
 +    }
 +    return null;
 +  }
 +
 +  @Override
 +  public DatanodeCommand cacheReport(DatanodeRegistration registration,
 +      String poolId, List<Long> blockIds) throws IOException {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  @Override
 +  public void blockReceivedAndDeleted(DatanodeRegistration registration,
 +      String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
 +      throws IOException {
 +    for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) {
 +      ns.writeLock();
 +      try {
 +        blockManager.processIncrementalBlockReport(registration, r);
 +      } finally {
 +        ns.writeUnlock();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void errorReport(DatanodeRegistration registration,
 +      int errorCode, String msg) throws IOException {
 +    String dnName =
 +        (registration == null) ? "Unknown DataNode" : registration.toString();
 +
 +    if (errorCode == DatanodeProtocol.NOTIFY) {
 +      LOG.info("Error report from " + dnName + ": " + msg);
 +      return;
 +    }
 +
 +    if (errorCode == DatanodeProtocol.DISK_ERROR) {
 +      LOG.warn("Disk error on " + dnName + ": " + msg);
 +    } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
 +      LOG.warn("Fatal disk error on " + dnName + ": " + msg);
 +      blockManager.getDatanodeManager().removeDatanode(registration);
 +    } else {
 +      LOG.info("Error report from " + dnName + ": " + msg);
 +    }
 +  }
 +
 +  @Override
 +  public NamespaceInfo versionRequest() throws IOException {
 +    ns.readLock();
 +    try {
 +      return unprotectedGetNamespaceInfo();
 +    } finally {
 +      ns.readUnlock();
 +    }
 +  }
 +
 +  private NamespaceInfo unprotectedGetNamespaceInfo() {
 +    return new NamespaceInfo(1, "random", "random", 2,
 +        NodeType.STORAGE_CONTAINER_SERVICE);
 +  }
 +
 +  @Override
 +  public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
 +    // It doesn't make sense to have LocatedBlock in this API.
 +    ns.writeLock();
 +    try {
 +      for (int i = 0; i < blocks.length; i++) {
 +        ExtendedBlock blk = blocks[i].getBlock();
 +        DatanodeInfo[] nodes = blocks[i].getLocations();
 +        String[] storageIDs = blocks[i].getStorageIDs();
 +        for (int j = 0; j < nodes.length; j++) {
 +          blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
 +              storageIDs == null ? null: storageIDs[j],
 +              "client machine reported it");
 +        }
 +      }
 +    } finally {
 +      ns.writeUnlock();
 +    }
 +  }
 +
 +  /**
 +   * Start client and service RPC servers.
 +   */
 +  void start() {
 +    clientRpcServer.start();
 +    if (serviceRpcServer != null) {
 +      serviceRpcServer.start();
 +    }
 +  }
 +
 +  /**
 +   * Wait until the RPC servers have shutdown.
 +   */
 +  void join() throws InterruptedException {
 +    clientRpcServer.join();
 +    if (serviceRpcServer != null) {
 +      serviceRpcServer.join();
 +    }
 +  }
 +
 +  @Override
 +  public void commitBlockSynchronization(ExtendedBlock block,
 +      long newgenerationstamp, long newlength, boolean closeFile,
 +      boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages)
 +      throws IOException {
 +    // Not needed for the purpose of object store
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  public static void main(String[] argv) throws IOException {
 +    OzoneConfiguration conf = new OzoneConfiguration();
 +    StorageContainerManager scm = new StorageContainerManager(conf);
 +    scm.start();
 +    try {
 +      scm.join();
 +    } catch (InterruptedException e) {
 +      e.printStackTrace();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java
index 13cff36,0000000..76e0bb8
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/storagecontainer/StorageContainerNameService.java
@@@ -1,177 -1,0 +1,154 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.storagecontainer;
 +
++import org.apache.commons.lang.NotImplementedException;
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 +import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 +import org.apache.hadoop.hdfs.server.namenode.NameNode;
 +import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 +import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 +import org.apache.hadoop.ipc.StandbyException;
 +import org.apache.hadoop.security.AccessControlException;
 +
 +import java.io.IOException;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +/**
 + * Namesystem implementation to be used by StorageContainerManager.
 + */
 +public class StorageContainerNameService implements Namesystem {
 +
 +  private ReentrantReadWriteLock coarseLock = new ReentrantReadWriteLock();
 +  private String blockPoolId;
 +  private volatile boolean serviceRunning = true;
 +
 +  public void shutdown() {
 +    serviceRunning = false;
 +  }
 +
 +  @Override
 +  public boolean isRunning() {
 +    return serviceRunning;
 +  }
 +
 +  @Override
 +  public void checkSuperuserPrivilege() throws AccessControlException {
 +    // TBD
 +  }
 +
 +  @Override
 +  public String getBlockPoolId() {
 +    return blockPoolId;
 +  }
 +
 +  public void setBlockPoolId(String id) {
 +    this.blockPoolId = id;
 +  }
 +
 +  @Override
-   public boolean isInStandbyState() {
-     // HA mode is not supported
-     return false;
-   }
- 
-   @Override
-   public boolean isGenStampInFuture(Block block) {
-     // HA mode is not supported
-     return false;
-   }
- 
-   @Override
 +  public BlockCollection getBlockCollection(long id) {
 +    return null;
 +  }
 +
 +  @Override
-   public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal) {
-     // TBD
-   }
- 
-   @Override
-   public void checkOperation(NameNode.OperationCategory read)
-     throws StandbyException {
-     // HA mode is not supported
++  public void startSecretManagerIfNecessary() {
++     throw new NotImplementedException();
 +  }
 +
 +  @Override
 +  public ErasureCodingPolicy getErasureCodingPolicyForPath(String src)
 +      throws IOException {
 +    return null;
 +  }
 +
 +  @Override
-   public boolean isInSnapshot(BlockInfo blockInfo) {
-     // Snapshots not supported
++  public boolean isInSnapshot(long blockCollectionID) {
 +    return false;
 +  }
 +
 +  @Override
 +  public CacheManager getCacheManager() {
 +    // Cache Management is not supported
 +    return null;
 +  }
 +
 +  @Override
 +  public HAContext getHAContext() {
 +    return null;
 +  }
 +
++  /**
++   * @return Whether the namenode is transitioning to active state and is in the
++   * middle of the starting active services.
++   */
++  @Override
++  public boolean inTransitionToActive() {
++    return false;
++  }
++
 +  @Override
 +  public void readLock() {
 +    coarseLock.readLock().lock();
 +  }
 +
 +  @Override
 +  public void readUnlock() {
 +    coarseLock.readLock().unlock();
 +  }
 +
 +  @Override
 +  public boolean hasReadLock() {
 +    return coarseLock.getReadHoldCount() > 0 || hasWriteLock();
 +  }
 +
 +  @Override
 +  public void writeLock() {
 +    coarseLock.writeLock().lock();
 +  }
 +
 +  @Override
 +  public void writeLockInterruptibly() throws InterruptedException {
 +    coarseLock.writeLock().lockInterruptibly();
 +  }
 +
 +  @Override
 +  public void writeUnlock() {
 +    coarseLock.writeLock().unlock();
 +  }
 +
 +  @Override
 +  public boolean hasWriteLock() {
 +    return coarseLock.isWriteLockedByCurrentThread();
 +  }
 +
 +  @Override
-   public void checkSafeMode() {
-     // TBD
-   }
- 
-   @Override
 +  public boolean isInSafeMode() {
 +    return false;
 +  }
 +
 +  @Override
 +  public boolean isInStartupSafeMode() {
 +    return false;
 +  }
 +
-   @Override
-   public void incrementSafeBlockCount(int replication, BlockInfo storedBlock) {
-   // Do nothing
-   }
- 
-   @Override
-   public void decrementSafeBlockCount(BlockInfo b) {
-     // Do nothing
-   }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HdfsServer.proto
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index a9429c7,212d2e6..2e7e37c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@@ -287,18 -289,28 +289,30 @@@ public class TestDataNodeHotSwapVolume
      // Verify the configuration value is appropriately set.
      String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
      String[] expectDataDirs = newDataDir.split(",");
 +    Arrays.sort(effectiveDataDirs);
 +    Arrays.sort(expectDataDirs);
      assertEquals(expectDataDirs.length, effectiveDataDirs.length);
+     List<StorageLocation> expectedStorageLocations = new ArrayList<>();
+     List<StorageLocation> effectiveStorageLocations = new ArrayList<>();
      for (int i = 0; i < expectDataDirs.length; i++) {
        StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
-       StorageLocation effectiveLocation =
-           StorageLocation.parse(effectiveDataDirs[i]);
-       assertEquals(expectLocation.getStorageType(),
-           effectiveLocation.getStorageType());
-       assertEquals(expectLocation.getFile().getCanonicalFile(),
-           effectiveLocation.getFile().getCanonicalFile());
+       StorageLocation effectiveLocation = StorageLocation
+           .parse(effectiveDataDirs[i]);
+       expectedStorageLocations.add(expectLocation);
+       effectiveStorageLocations.add(effectiveLocation);
      }
+     Comparator<StorageLocation> comparator = new Comparator<StorageLocation>()
{
+ 
+       @Override
+       public int compare(StorageLocation o1, StorageLocation o2) {
+         return o1.toString().compareTo(o2.toString());
+       }
+ 
+     };
+     Collections.sort(expectedStorageLocations, comparator);
+     Collections.sort(effectiveStorageLocations, comparator);
+     assertEquals("Effective volumes doesnt match expected",
+         expectedStorageLocations, effectiveStorageLocations);
  
      // Check that all newly created volumes are appropriately formatted.
      for (File volumeDir : newVolumeDirs) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
index 48f8cef,3af959c..3935d2c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@@ -25,9 -25,9 +25,10 @@@ import org.apache.hadoop.hdfs.net.*
  import org.apache.hadoop.hdfs.protocol.*;
  import org.apache.hadoop.hdfs.protocol.datatransfer.*;
  import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  import org.apache.hadoop.util.DataChecksum;
  import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 +import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
  
  import org.junit.Rule;
  import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16440b83/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 8781841,261a8b0..dc7366f
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@@ -440,4 -450,95 +450,5 @@@ public class TestFsDatasetImpl 
      assertSame(replica,
          BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
    }
+ 
 -  @Test
 -  public void testLoadingDfsUsedForVolumes() throws IOException,
 -      InterruptedException {
 -    long waitIntervalTime = 5000;
 -    // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
 -    // to avoid cache-dfsused time expired
 -    long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
 -    conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
 -        cachedDfsUsedIntervalTime);
 -
 -    long cacheDfsUsed = 1024;
 -    long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
 -
 -    assertEquals(cacheDfsUsed, dfsUsed);
 -  }
 -
 -  @Test
 -  public void testLoadingDfsUsedForVolumesExpired() throws IOException,
 -      InterruptedException {
 -    long waitIntervalTime = 5000;
 -    // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
 -    // to make cache-dfsused time expired
 -    long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
 -    conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
 -        cachedDfsUsedIntervalTime);
 -
 -    long cacheDfsUsed = 1024;
 -    long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
 -
 -    // Because the cache-dfsused expired and the dfsUsed will be recalculated
 -    assertTrue(cacheDfsUsed != dfsUsed);
 -  }
 -
 -  private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
 -      long waitIntervalTime) throws IOException, InterruptedException {
 -    List<NamespaceInfo> nsInfos = Lists.newArrayList();
 -    nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
 -
 -    String CURRENT_DIR = "current";
 -    String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
 -    String path = BASE_DIR + "/newData0";
 -    String pathUri = new Path(path).toUri().toString();
 -    StorageLocation loc = StorageLocation.parse(pathUri);
 -    Storage.StorageDirectory sd = createStorageDirectory(new File(path));
 -    DataStorage.VolumeBuilder builder =
 -        new DataStorage.VolumeBuilder(storage, sd);
 -    when(
 -        storage.prepareVolume(eq(datanode), eq(loc.getFile()),
 -            anyListOf(NamespaceInfo.class))).thenReturn(builder);
 -
 -    String cacheFilePath =
 -        String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
 -            CURRENT_DIR, DU_CACHE_FILE);
 -    File outFile = new File(cacheFilePath);
 -
 -    if (!outFile.getParentFile().exists()) {
 -      outFile.getParentFile().mkdirs();
 -    }
 -
 -    if (outFile.exists()) {
 -      outFile.delete();
 -    }
 -
 -    FakeTimer timer = new FakeTimer();
 -    try {
 -      try (Writer out =
 -          new OutputStreamWriter(new FileOutputStream(outFile),
 -              StandardCharsets.UTF_8)) {
 -        // Write the dfsUsed value and the time to cache file
 -        out.write(Long.toString(cacheDfsUsed) + " "
 -            + Long.toString(timer.now()));
 -        out.flush();
 -      }
 -    } catch (IOException ioe) {
 -    }
 -
 -    dataset.setTimer(timer);
 -    timer.advance(waitIntervalTime);
 -    dataset.addVolume(loc, nsInfos);
 -
 -    // Get the last volume which was just added before
 -    FsVolumeImpl newVolume;
 -    try (FsDatasetSpi.FsVolumeReferences volumes =
 -        dataset.getFsVolumeReferences()) {
 -      newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
 -    }
 -    long dfsUsed = newVolume.getDfsUsed();
 -
 -    return dfsUsed;
 -  }
  }


Mime
View raw message