Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 38A8E17A07 for ; Tue, 5 May 2015 18:35:06 +0000 (UTC) Received: (qmail 18654 invoked by uid 500); 5 May 2015 18:35:06 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 18587 invoked by uid 500); 5 May 2015 18:35:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 18577 invoked by uid 99); 5 May 2015 18:35:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 18:35:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC40EE00E0; Tue, 5 May 2015 18:35:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cmccabe@apache.org To: common-commits@hadoop.apache.org Message-Id: <621f700a7678478e9b270cd15f898c39@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles Lamb via Colin P. McCabe) Date: Tue, 5 May 2015 18:35:05 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk e4c3b52c8 -> ffce9a341 HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote NameNode (Charles Lamb via Colin P. McCabe) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffce9a34 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffce9a34 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffce9a34 Branch: refs/heads/trunk Commit: ffce9a3413277a69444fcb890460c885de56db69 Parents: e4c3b52 Author: Colin Patrick Mccabe Authored: Tue May 5 11:27:36 2015 -0700 Committer: Colin Patrick Mccabe Committed: Tue May 5 11:34:58 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 40 ++++++ .../server/namenode/NNThroughputBenchmark.java | 136 +++++++++++++------ 3 files changed, 137 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c89e6fe..01de9b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED HDFS-7758. Retire FsDatasetSpi#getVolumes() and use FsDatasetSpi#getVolumeRefs() instead (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-7847. Modify NNThroughputBenchmark to be able to operate on a remote + NameNode (Charles Lamb via Colin P. McCabe) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index a8df991..cfee997 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -48,6 +48,7 @@ import java.lang.reflect.Modifier; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.URI; import java.net.URL; import java.net.URLConnection; import java.nio.ByteBuffer; @@ -64,6 +65,7 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -129,12 +131,14 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -147,6 +151,7 @@ import org.apache.log4j.Level; import org.junit.Assume; import org.mockito.internal.util.reflection.Whitebox; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -1756,6 +1761,41 @@ public class DFSTestUtil { } /** + * Get the NamenodeProtocol RPC proxy for the NN associated with this + * DFSClient object + * + * @param nameNodeUri the URI of the NN to get a proxy for. + * + * @return the Namenode RPC proxy associated with this DFSClient object + */ + @VisibleForTesting + public static NamenodeProtocol getNamenodeProtocolProxy(Configuration conf, + URI nameNodeUri, UserGroupInformation ugi) + throws IOException { + return NameNodeProxies.createNonHAProxy(conf, + NameNode.getAddress(nameNodeUri), NamenodeProtocol.class, ugi, false). + getProxy(); + } + + /** + * Get the RefreshUserMappingsProtocol RPC proxy for the NN associated with + * this DFSClient object + * + * @param nameNodeUri the URI of the NN to get a proxy for. + * + * @return the RefreshUserMappingsProtocol RPC proxy associated with this + * DFSClient object + */ + @VisibleForTesting + public static RefreshUserMappingsProtocol getRefreshUserMappingsProtocolProxy( + Configuration conf, URI nameNodeUri) throws IOException { + final AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false); + return NameNodeProxies.createProxy(conf, + nameNodeUri, RefreshUserMappingsProtocol.class, + nnFallbackToSimpleAuth).getProxy(); + } + + /** * Set the datanode dead */ public static void setDatanodeDead(DatanodeInfo dn) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffce9a34/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index db0185d..2964f9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; @@ -30,19 +31,24 @@ import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -53,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; @@ -63,6 +70,8 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.Groups; +import org.apache.hadoop.security.RefreshUserMappingsProtocol; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -96,6 +105,9 @@ import org.apache.log4j.LogManager; * By default the refresh is never called. *
  • -keepResults do not clean up the name-space after execution.
  • *
  • -useExisting do not recreate the name-space, use existing data.
  • + *
  • -namenode will run the test against a namenode in another + * process or on another host. If you use this option, the namenode + * must have dfs.namenode.fs-limits.min-block-size set to 16.
  • * * * The benchmark first generates inputs for each thread so that the @@ -111,11 +123,20 @@ public class NNThroughputBenchmark implements Tool { private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class); private static final int BLOCK_SIZE = 16; private static final String GENERAL_OPTIONS_USAGE = - " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]"; + " [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G] |" + + " [-namenode ]\n" + + " If using -namenode, set the namenode's" + + " dfs.namenode.fs-limits.min-block-size to 16."; static Configuration config; static NameNode nameNode; - static NamenodeProtocols nameNodeProto; + static NamenodeProtocol nameNodeProto; + static ClientProtocol clientProto; + static DatanodeProtocol dataNodeProto; + static RefreshUserMappingsProtocol refreshUserMappingsProto; + static String bpid = null; + + private String namenodeUri = null; // NN URI to use, if specified NNThroughputBenchmark(Configuration conf) throws IOException { config = conf; @@ -264,7 +285,7 @@ public class NNThroughputBenchmark implements Tool { for(StatsDaemon d : daemons) d.start(); } finally { - while(isInPorgress()) { + while(isInProgress()) { // try {Thread.sleep(500);} catch (InterruptedException e) {} } elapsedTime = Time.now() - start; @@ -275,7 +296,7 @@ public class NNThroughputBenchmark implements Tool { } } - private boolean isInPorgress() { + private boolean isInProgress() { for(StatsDaemon d : daemons) if(d.isInProgress()) return true; @@ -283,10 +304,10 @@ public class NNThroughputBenchmark implements Tool { } void cleanUp() throws IOException { - nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, + clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); if(!keepResults) - nameNodeProto.delete(getBaseDir(), true); + clientProto.delete(getBaseDir(), true); } int getNumOpsExecuted() { @@ -360,6 +381,12 @@ public class NNThroughputBenchmark implements Tool { args.remove(ugrcIndex); } + try { + namenodeUri = StringUtils.popOptionWithArgument("-namenode", args); + } catch (IllegalArgumentException iae) { + printUsage(); + } + String type = args.get(1); if(OP_ALL_NAME.equals(type)) { type = getOpName(); @@ -418,7 +445,7 @@ public class NNThroughputBenchmark implements Tool { void benchmarkOne() throws IOException { for(int idx = 0; idx < opsPerThread; idx++) { if((localNumOpsExecuted+1) % statsOp.ugcRefreshCount == 0) - nameNodeProto.refreshUserToGroupsMappings(); + refreshUserMappingsProto.refreshUserToGroupsMappings(); long stat = statsOp.executeOp(daemonId, idx, arg1); localNumOpsExecuted++; localCumulativeTime += stat; @@ -484,10 +511,10 @@ public class NNThroughputBenchmark implements Tool { @Override long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { - nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, + clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); long start = Time.now(); - nameNodeProto.delete(BASE_DIR_NAME, true); + clientProto.delete(BASE_DIR_NAME, true); long end = Time.now(); return end-start; } @@ -553,7 +580,7 @@ public class NNThroughputBenchmark implements Tool { @Override void generateInputs(int[] opsPerThread) throws IOException { assert opsPerThread.length == numThreads : "Error opsPerThread.length"; - nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, + clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); // int generatedFileIdx = 0; LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName()); @@ -588,13 +615,13 @@ public class NNThroughputBenchmark implements Tool { throws IOException { long start = Time.now(); // dummyActionNoSynch(fileIdx); - nameNodeProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), + clientProto.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, - replication, BLOCK_SIZE, null); + replication, BLOCK_SIZE, CryptoProtocolVersion.supported()); long end = Time.now(); for(boolean written = !closeUponCreate; !written; - written = nameNodeProto.complete(fileNames[daemonId][inputIdx], + written = clientProto.complete(fileNames[daemonId][inputIdx], clientName, null, HdfsConstants.GRANDFATHER_INODE_ID)); return end-start; } @@ -657,7 +684,7 @@ public class NNThroughputBenchmark implements Tool { @Override void generateInputs(int[] opsPerThread) throws IOException { assert opsPerThread.length == numThreads : "Error opsPerThread.length"; - nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, + clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); LOG.info("Generate " + numOpsRequired + " inputs for " + getOpName()); dirPaths = new String[numThreads][]; @@ -685,7 +712,7 @@ public class NNThroughputBenchmark implements Tool { long executeOp(int daemonId, int inputIdx, String clientName) throws IOException { long start = Time.now(); - nameNodeProto.mkdirs(dirPaths[daemonId][inputIdx], + clientProto.mkdirs(dirPaths[daemonId][inputIdx], FsPermission.getDefault(), true); long end = Time.now(); return end-start; @@ -757,11 +784,11 @@ public class NNThroughputBenchmark implements Tool { } // use the same files for open super.generateInputs(opsPerThread); - if(nameNodeProto.getFileInfo(opCreate.getBaseDir()) != null - && nameNodeProto.getFileInfo(getBaseDir()) == null) { - nameNodeProto.rename(opCreate.getBaseDir(), getBaseDir()); + if(clientProto.getFileInfo(opCreate.getBaseDir()) != null + && clientProto.getFileInfo(getBaseDir()) == null) { + clientProto.rename(opCreate.getBaseDir(), getBaseDir()); } - if(nameNodeProto.getFileInfo(getBaseDir()) == null) { + if(clientProto.getFileInfo(getBaseDir()) == null) { throw new IOException(getBaseDir() + " does not exist."); } } @@ -773,7 +800,7 @@ public class NNThroughputBenchmark implements Tool { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = Time.now(); - nameNodeProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); + clientProto.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE); long end = Time.now(); return end-start; } @@ -803,7 +830,7 @@ public class NNThroughputBenchmark implements Tool { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = Time.now(); - nameNodeProto.delete(fileNames[daemonId][inputIdx], false); + clientProto.delete(fileNames[daemonId][inputIdx], false); long end = Time.now(); return end-start; } @@ -833,7 +860,7 @@ public class NNThroughputBenchmark implements Tool { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = Time.now(); - nameNodeProto.getFileInfo(fileNames[daemonId][inputIdx]); + clientProto.getFileInfo(fileNames[daemonId][inputIdx]); long end = Time.now(); return end-start; } @@ -877,7 +904,7 @@ public class NNThroughputBenchmark implements Tool { long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { long start = Time.now(); - nameNodeProto.rename(fileNames[daemonId][inputIdx], + clientProto.rename(fileNames[daemonId][inputIdx], destNames[daemonId][inputIdx]); long end = Time.now(); return end-start; @@ -933,14 +960,14 @@ public class NNThroughputBenchmark implements Tool { new DataStorage(nsInfo), new ExportedBlockKeys(), VersionInfo.getVersion()); // register datanode - dnRegistration = nameNodeProto.registerDatanode(dnRegistration); + dnRegistration = dataNodeProto.registerDatanode(dnRegistration); + dnRegistration.setNamespaceInfo(nsInfo); //first block reports storage = new DatanodeStorage(DatanodeStorage.generateUuid()); final StorageBlockReport[] reports = { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; - nameNodeProto.blockReport(dnRegistration, - nameNode.getNamesystem().getBlockPoolId(), reports, + dataNodeProto.blockReport(dnRegistration, bpid, reports, new BlockReportContext(1, 0, System.nanoTime())); } @@ -953,7 +980,7 @@ public class NNThroughputBenchmark implements Tool { // TODO:FEDERATION currently a single block pool is supported StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; - DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, rep, + DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { @@ -1002,7 +1029,7 @@ public class NNThroughputBenchmark implements Tool { // register datanode StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; - DatanodeCommand[] cmds = nameNodeProto.sendHeartbeat(dnRegistration, + DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { @@ -1041,8 +1068,7 @@ public class NNThroughputBenchmark implements Tool { null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( targetStorageID, rdBlocks) }; - nameNodeProto.blockReceivedAndDeleted(receivedDNReg, nameNode - .getNamesystem().getBlockPoolId(), report); + dataNodeProto.blockReceivedAndDeleted(receivedDNReg, bpid, report); } } return blocks.length; @@ -1133,15 +1159,15 @@ public class NNThroughputBenchmark implements Tool { FileNameGenerator nameGenerator; nameGenerator = new FileNameGenerator(getBaseDir(), 100); String clientName = getClientName(007); - nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, + clientProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); for(int idx=0; idx < nrFiles; idx++) { String fileName = nameGenerator.getNextFileName("ThroughputBench"); - nameNodeProto.create(fileName, FsPermission.getDefault(), clientName, + clientProto.create(fileName, FsPermission.getDefault(), clientName, new EnumSetWritable(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, - BLOCK_SIZE, null); + BLOCK_SIZE, CryptoProtocolVersion.supported()); ExtendedBlock lastBlock = addBlocks(fileName, clientName); - nameNodeProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); + clientProto.complete(fileName, clientName, lastBlock, HdfsConstants.GRANDFATHER_INODE_ID); } // prepare block reports for(int idx=0; idx < nrDatanodes; idx++) { @@ -1153,7 +1179,7 @@ public class NNThroughputBenchmark implements Tool { throws IOException { ExtendedBlock prevBlock = null; for(int jdx = 0; jdx < blocksPerFile; jdx++) { - LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName, + LocatedBlock loc = clientProto.addBlock(fileName, clientName, prevBlock, null, HdfsConstants.GRANDFATHER_INODE_ID, null); prevBlock = loc.getBlock(); for(DatanodeInfo dnInfo : loc.getLocations()) { @@ -1164,8 +1190,8 @@ public class NNThroughputBenchmark implements Tool { ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) }; StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( datanodes[dnIdx].storage.getStorageID(), rdBlocks) }; - nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc - .getBlock().getBlockPoolId(), report); + dataNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, + bpid, report); } } return prevBlock; @@ -1186,8 +1212,7 @@ public class NNThroughputBenchmark implements Tool { long start = Time.now(); StorageBlockReport[] report = { new StorageBlockReport( dn.storage, dn.getBlockReportList()) }; - nameNodeProto.blockReport(dn.dnRegistration, - nameNode.getNamesystem().getBlockPoolId(), report, + dataNodeProto.blockReport(dn.dnRegistration, bpid, report, new BlockReportContext(1, 0, System.nanoTime())); long end = Time.now(); return end-start; @@ -1318,7 +1343,7 @@ public class NNThroughputBenchmark implements Tool { LOG.info("Datanode " + dn + " is decommissioned."); } excludeFile.close(); - nameNodeProto.refreshNodes(); + clientProto.refreshNodes(); } /** @@ -1414,8 +1439,6 @@ public class NNThroughputBenchmark implements Tool { // Start the NameNode String[] argv = new String[] {}; - nameNode = NameNode.createNameNode(argv, config); - nameNodeProto = nameNode.getRpcServer(); List ops = new ArrayList(); OperationStatsBase opStat = null; @@ -1456,6 +1479,29 @@ public class NNThroughputBenchmark implements Tool { opStat = new CleanAllStats(args); ops.add(opStat); } + + if (namenodeUri == null) { + nameNode = NameNode.createNameNode(argv, config); + NamenodeProtocols nnProtos = nameNode.getRpcServer(); + nameNodeProto = nnProtos; + clientProto = nnProtos; + dataNodeProto = nnProtos; + refreshUserMappingsProto = nnProtos; + bpid = nameNode.getNamesystem().getBlockPoolId(); + } else { + FileSystem.setDefaultUri(getConf(), namenodeUri); + DistributedFileSystem dfs = (DistributedFileSystem) + FileSystem.get(getConf()); + final URI nnUri = new URI(namenodeUri); + nameNodeProto = DFSTestUtil.getNamenodeProtocolProxy(config, nnUri, + UserGroupInformation.getCurrentUser()); + clientProto = dfs.getClient().getNamenode(); + dataNodeProto = new DatanodeProtocolClientSideTranslatorPB( + NameNode.getAddress(nnUri), config); + refreshUserMappingsProto = + DFSTestUtil.getRefreshUserMappingsProtocolProxy(config, nnUri); + getBlockPoolId(dfs); + } if(ops.size() == 0) printUsage(); // run each benchmark @@ -1476,6 +1522,12 @@ public class NNThroughputBenchmark implements Tool { return 0; } + private void getBlockPoolId(DistributedFileSystem unused) + throws IOException { + final NamespaceInfo nsInfo = nameNodeProto.versionRequest(); + bpid = nsInfo.getBlockPoolID(); + } + public static void main(String[] args) throws Exception { NNThroughputBenchmark bench = null; try {