hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [1/3] hadoop git commit: HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles Lamb via Colin P. McCabe)
Date Mon, 03 Jul 2017 19:53:09 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 9c7585020 -> 53907b66c


HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles
Lamb via Colin P. McCabe)

(cherry picked from commit ffce9a3413277a69444fcb890460c885de56db69)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/51f012ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/51f012ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/51f012ae

Branch: refs/heads/branch-2.7
Commit: 51f012aec4f219823bb277e48b09a51d64f43977
Parents: 9c75850
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Tue May 5 11:27:36 2015 -0700
Committer: Konstantin V Shvachko <shv@apache.org>
Committed: Mon Jul 3 11:32:45 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  38 ++++++
 .../server/namenode/NNThroughputBenchmark.java  | 136 +++++++++++++------
 3 files changed, 135 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/51f012ae/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b8ac822..3b708f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -105,6 +105,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-8549. Abort the balancer if an upgrade is in progress. (wang)
     Backport HDFS-11808 by Akira Ajisaka.
 
+    HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote
+    NameNode (Charles Lamb via Colin P. McCabe)
+
   OPTIMIZATIONS
 
     HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51f012ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 04040fc..14d007e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha
         .ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -96,6 +97,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -117,6 +119,7 @@ import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -1744,6 +1747,41 @@ public class DFSTestUtil {
   }
 
   /**
+   * Get the NamenodeProtocol RPC proxy for the NN associated with this
+   * DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the Namenode RPC proxy associated with this DFSClient object
+   */
+  @VisibleForTesting
+  public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf,
+      URI nameNodeUri, UserGroupInformation ugi)
+      throws IOException {
+    return NameNodeProxies.createNonHAProxy(conf,
+        NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false).
+        getProxy();
+  }
+
+  /**
+   * Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with
+   * this DFSClient object
+   *
+   * @param nameNodeUri the URI of the NN to get a proxy for.
+   *
+   * @return the RefreshUserMappingsProtocol RPC proxy associated with this
+   * DFSClient object
+   */
+  @VisibleForTesting
+  public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy(
+      Configuration conf, URI nameNodeUri) throws IOException {
+    final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
+    return NameNodeProxies.createProxy(conf,
+        nameNodeUri, RefreshUserMappingsProtocol.class,
+        nnFallbackToSimpleAuth).getProxy();
+  }
+
+  /**
    * Set the datanode dead
    */
   public static void setDatanodeDead(DatanodeInfo dn) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51f012ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index d2013de..7f79fab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -30,19 +31,24 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -53,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
@@ -64,6 +71,8 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
@@ -97,6 +106,9 @@ import org.apache.log4j.LogManager;
  * By default the refresh is never called.</li>
  * <li>-keepResults do not clean up the name-space after execution.</li>
  * <li>-useExisting do not recreate the name-space, use existing data.</li>
+ * <li>-namenode will run the test against a namenode in another
+ * process or on another host. If you use this option, the namenode
+ * must have dfs.namenode.fs-limits.min-block-size set to 16.</li>
  * </ol>
  * 
  * The benchmark first generates inputs for each thread so that the
@@ -112,11 +124,20 @@ public class NNThroughputBenchmark implements Tool {
   private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
   private static final int BLOCK_SIZE = 16;
   private static final String GENERAL_OPTIONS_USAGE = 
-    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]";
+    "     [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" +
+    " [-namenode <namenode URI>]\n" +
+    "     If using -namenode, set the namenode's" +
+    "         dfs.namenode.fs-limits.min-block-size to 16.";
 
   static Configuration config;
   static NameNode nameNode;
-  static NamenodeProtocols nameNodeProto;
+  static NamenodeProtocol nameNodeProto;
+  static ClientProtocol clientProto;
+  static DatanodeProtocol dataNodeProto;
+  static RefreshUserMappingsProtocol refreshUserMappingsProto;
+  static String bpid = null;
+
+  private String namenodeUri = null; // NN URI to use, if specified
 
   NNThroughputBenchmark(Configuration conf) throws IOException {
     config = conf;
@@ -265,7 +286,7 @@ public class NNThroughputBenchmark implements Tool {
         for(StatsDaemon d : daemons)
           d.start();
       } finally {
-        while(isInPorgress()) {
+        while(isInProgress()) {
           // try {Thread.sleep(500);} catch (InterruptedException e) {}
         }
         elapsedTime = Time.now() - start;
@@ -276,7 +297,7 @@ public class NNThroughputBenchmark implements Tool {
       }
     }
 
-    private boolean isInPorgress() {
+    private boolean isInProgress() {
       for(StatsDaemon d : daemons)
         if(d.isInProgress())
           return true;
@@ -284,10 +305,10 @@ public class NNThroughputBenchmark implements Tool {
     }
 
     void cleanUp() throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       if(!keepResults)
-        nameNodeProto.delete(getBaseDir(), true);
+        clientProto.delete(getBaseDir(), true);
     }
 
     int getNumOpsExecuted() {
@@ -361,6 +382,12 @@ public class NNThroughputBenchmark implements Tool {
         args.remove(ugrcIndex);
       }
 
+      try {
+        namenodeUri = StringUtils.popOptionWithArgument("-namenode", args);
+      } catch (IllegalArgumentException iae) {
+        printUsage();
+      }
+
       String type = args.get(1);
       if(OP_ALL_NAME.equals(type)) {
         type = getOpName();
@@ -419,7 +446,7 @@ public class NNThroughputBenchmark implements Tool {
     void benchmarkOne() throws IOException {
       for(int idx = 0; idx < opsPerThread; idx++) {
         if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0)
-          nameNodeProto.refreshUserToGroupsMappings();
+          refreshUserMappingsProto.refreshUserToGroupsMappings();
         long stat = statsOp.executeOp(daemonId, idx, arg1);
         localNumOpsExecuted++;
         localCumulativeTime += stat;
@@ -485,10 +512,10 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       long start = Time.now();
-      nameNodeProto.delete(BASE_DIR_NAME, true);
+      clientProto.delete(BASE_DIR_NAME, true);
       long end = Time.now();
       return end-start;
     }
@@ -554,7 +581,7 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       // int generatedFileIdx = 0;
       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
@@ -589,13 +616,13 @@ public class NNThroughputBenchmark implements Tool {
     throws IOException {
       long start = Time.now();
       // dummyActionNoSynch(fileIdx);
-      nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
+      clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
               .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, 
-          replication, BLOCK_SIZE, null);
+          replication, BLOCK_SIZE, CryptoProtocolVersion.supported());
       long end = Time.now();
       for(boolean written = !closeUponCreate; !written; 
-        written = nameNodeProto.complete(fileNames[daemonId][inputIdx],
+        written = clientProto.complete(fileNames[daemonId][inputIdx],
                                     clientName, null, INodeId.GRANDFATHER_INODE_ID));
       return end-start;
     }
@@ -658,7 +685,7 @@ public class NNThroughputBenchmark implements Tool {
     @Override
     void generateInputs(int[] opsPerThread) throws IOException {
       assert opsPerThread.length == numThreads : "Error opsPerThread.length";
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName());
       dirPaths = new String[numThreads][];
@@ -686,7 +713,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String clientName)
         throws IOException {
       long start = Time.now();
-      nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx],
+      clientProto.mkdirs(dirPaths[daemonId][inputIdx],
           FsPermission.getDefault(), true);
       long end = Time.now();
       return end-start;
@@ -758,11 +785,11 @@ public class NNThroughputBenchmark implements Tool {
       }
       // use the same files for open
       super.generateInputs(opsPerThread);
-      if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null
-          && nameNodeProto.getFileInfo(getBaseDir()) == null) {
-        nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir());
+      if(clientProto.getFileInfo(opCreate.getBaseDir()) != null
+          && clientProto.getFileInfo(getBaseDir()) == null) {
+        clientProto.rename(opCreate.getBaseDir(), getBaseDir());
       }
-      if(nameNodeProto.getFileInfo(getBaseDir()) == null) {
+      if(clientProto.getFileInfo(getBaseDir()) == null) {
         throw new IOException(getBaseDir() + " does not exist.");
       }
     }
@@ -774,7 +801,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+      clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
       long end = Time.now();
       return end-start;
     }
@@ -804,7 +831,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.delete(fileNames[daemonId][inputIdx], false);
+      clientProto.delete(fileNames[daemonId][inputIdx], false);
       long end = Time.now();
       return end-start;
     }
@@ -834,7 +861,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]);
+      clientProto.getFileInfo(fileNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
     }
@@ -878,7 +905,7 @@ public class NNThroughputBenchmark implements Tool {
     long executeOp(int daemonId, int inputIdx, String ignore) 
     throws IOException {
       long start = Time.now();
-      nameNodeProto.rename(fileNames[daemonId][inputIdx],
+      clientProto.rename(fileNames[daemonId][inputIdx],
                       destNames[daemonId][inputIdx]);
       long end = Time.now();
       return end-start;
@@ -934,14 +961,14 @@ public class NNThroughputBenchmark implements Tool {
           new DataStorage(nsInfo),
           new ExportedBlockKeys(), VersionInfo.getVersion());
       // register datanode
-      dnRegistration = nameNodeProto.registerDatanode(dnRegistration);
+      dnRegistration = dataNodeProto.registerDatanode(dnRegistration);
+      dnRegistration.setNamespaceInfo(nsInfo);
       //first block reports
       storage = new DatanodeStorage(DatanodeStorage.generateUuid());
       final StorageBlockReport[] reports = {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
-      nameNodeProto.blockReport(dnRegistration, 
-          nameNode.getNamesystem().getBlockPoolId(), reports,
+      dataNodeProto.blockReport(dnRegistration, bpid, reports,
               new BlockReportContext(1, 0, System.nanoTime()));
     }
 
@@ -954,7 +981,7 @@ public class NNThroughputBenchmark implements Tool {
       // TODO:FEDERATION currently a single block pool is supported
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
@@ -1003,7 +1030,7 @@ public class NNThroughputBenchmark implements Tool {
       // register datanode
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
-      DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration,
+      DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
@@ -1042,8 +1069,7 @@ public class NNThroughputBenchmark implements Tool {
                   null) };
           StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
               targetStorageID, rdBlocks) };
-          nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode
-              .getNamesystem().getBlockPoolId(), report);
+          dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report);
         }
       }
       return blocks.length;
@@ -1134,15 +1160,15 @@ public class NNThroughputBenchmark implements Tool {
       FileNameGenerator nameGenerator;
       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
       String clientName = getClientName(007);
-      nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
+      clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
           false);
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
-        nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,
+        clientProto.create(fileName, FsPermission.getDefault(), clientName,
             new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)),
true, replication,
-            BLOCK_SIZE, null);
+            BLOCK_SIZE, CryptoProtocolVersion.supported());
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
-        nameNodeProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
+        clientProto.complete(fileName, clientName, lastBlock, INodeId.GRANDFATHER_INODE_ID);
       }
       // prepare block reports
       for(int idx=0; idx < nrDatanodes; idx++) {
@@ -1165,8 +1191,8 @@ public class NNThroughputBenchmark implements Tool {
               ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
           StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
               datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
-          nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
-              .getBlock().getBlockPoolId(), report);
+          dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration,
+              bpid, report);
         }
         // IBRs are asynchronously processed by NameNode. The next
         // ClientProtocol#addBlock() may throw NotReplicatedYetException.
@@ -1184,7 +1210,7 @@ public class NNThroughputBenchmark implements Tool {
         String[] favoredNodes) throws IOException {
       for (int i = 0; i < 30; i++) {
         try {
-          return nameNodeProto.addBlock(src, clientName,
+          return clientProto.addBlock(src, clientName,
               previous, excludeNodes, fileId, favoredNodes);
         } catch (NotReplicatedYetException|RemoteException e) {
           if (e instanceof RemoteException) {
@@ -1218,8 +1244,7 @@ public class NNThroughputBenchmark implements Tool {
       long start = Time.now();
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
-      nameNodeProto.blockReport(dn.dnRegistration,
-          nameNode.getNamesystem().getBlockPoolId(), report,
+      dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
           new BlockReportContext(1, 0, System.nanoTime()));
       long end = Time.now();
       return end-start;
@@ -1350,7 +1375,7 @@ public class NNThroughputBenchmark implements Tool {
         LOG.info("Datanode " + dn + " is decommissioned.");
       }
       excludeFile.close();
-      nameNodeProto.refreshNodes();
+      clientProto.refreshNodes();
     }
 
     /**
@@ -1446,8 +1471,6 @@ public class NNThroughputBenchmark implements Tool {
 
     // Start the NameNode
     String[] argv = new String[] {};
-    nameNode = NameNode.createNameNode(argv, config);
-    nameNodeProto = nameNode.getRpcServer();
 
     List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
     OperationStatsBase opStat = null;
@@ -1488,6 +1511,29 @@ public class NNThroughputBenchmark implements Tool {
         opStat = new CleanAllStats(args);
         ops.add(opStat);
       }
+
+      if (namenodeUri == null) {
+        nameNode = NameNode.createNameNode(argv, config);
+        NamenodeProtocols nnProtos = nameNode.getRpcServer();
+        nameNodeProto = nnProtos;
+        clientProto = nnProtos;
+        dataNodeProto = nnProtos;
+        refreshUserMappingsProto = nnProtos;
+        bpid = nameNode.getNamesystem().getBlockPoolId();
+      } else {
+        FileSystem.setDefaultUri(getConf(), namenodeUri);
+        DistributedFileSystem dfs = (DistributedFileSystem)
+            FileSystem.get(getConf());
+        final URI nnUri = new URI(namenodeUri);
+        nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri,
+            UserGroupInformation.getCurrentUser());
+        clientProto = dfs.getClient().getNamenode();
+        dataNodeProto = new DatanodeProtocolClientSideTranslatorPB(
+            NameNode.getAddress(nnUri), config);
+        refreshUserMappingsProto =
+            DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri);
+        getBlockPoolId(dfs);
+      }
       if(ops.size() == 0)
         printUsage();
       // run each benchmark
@@ -1508,6 +1554,12 @@ public class NNThroughputBenchmark implements Tool {
     return 0;
   }
 
+  private void getBlockPoolId(DistributedFileSystem unused)
+    throws IOException {
+    final NamespaceInfo nsInfo = nameNodeProto.versionRequest();
+    bpid = nsInfo.getBlockPoolID();
+  }
+
   public static void main(String[] args) throws Exception {
     NNThroughputBenchmark bench = null;
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message