hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r607126 [3/3] - in /lucene/hadoop/trunk/src: java/org/apache/hadoop/dfs/FSNamesystem.java test/org/apache/hadoop/dfs/NNThroughputBenchmark.java test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java
Date Thu, 27 Dec 2007 21:39:13 GMT
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=607126&r1=607125&r2=607126&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Thu Dec 27 13:39:12 2007
@@ -1,787 +1,787 @@
-package org.apache.hadoop.dfs;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.Level;
-
-/**
- * Main class for a series of name-node benchmarks.
- * 
- * Each benchmark measures throughput and average execution time 
- * of a specific name-node operation, e.g. file creation or block reports.
- * 
- * The benchmark does not involve any other hadoop components
- * except for the name-node. Each operation is executed
- * by calling directly the respective name-node method.
- * The name-node here is real all other components are simulated.
- * 
- * Command line arguments for the benchmark include:<br>
- * 1) total number of operations to be performed,<br>
- * 2) number of threads to run these operations,<br>
- * 3) followed by operation specific input parameters.
- * 
- * Then the benchmark generates inputs for each thread so that the
- * input generation overhead does not effect the resulting statistics.
- * The number of operations performed by threads practically is the same. 
- * Precisely, the difference between the number of operations 
- * performed by any two threads does not exceed 1.
- * 
- * Then the benchmark executes the specified number of operations using 
- * the specified number of threads and outputs the resulting stats.
- */
-public class NNThroughputBenchmark {
-  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
-  private static final int BLOCK_SIZE = 16;
-
-  static Configuration config;
-  static NameNode nameNode;
-
-  NNThroughputBenchmark(Configuration conf) throws IOException {
-    config = conf;
-    // We do not need many handlers, since each thread simulates a handler
-    // by calling name-node methods directly
-    config.setInt("dfs.namenode.handler.count", 1);
-    // Start the NameNode
-    String[] args = new String[] {};
-    nameNode = NameNode.createNameNode(args, config);
-  }
-
-  void close() throws IOException {
-    nameNode.stop();
-  }
-
-  static void turnOffNameNodeLogging() {
-    // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
-    ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
-    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
-    ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
-    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
-  }
-
-  /**
-   * Base class for collecting operation statistics.
-   * 
-   * Overload this class in order to run statistics for a 
-   * specific name-node operation.
-   */
-  abstract class OperationStatsBase {
-    protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
-    protected static final String OP_ALL_NAME = "all";
-    protected static final String OP_ALL_USAGE = "-op all <other ops options>";
-
-    protected String baseDir;
-    protected short replication;
-    protected int  numThreads = 0;        // number of threads
-    protected int  numOpsRequired = 0;    // number of operations requested
-    protected int  numOpsExecuted = 0;    // number of operations executed
-    protected long cumulativeTime = 0;    // sum of times for each op
-    protected long elapsedTime = 0;       // time from start to finish
-
-    /**
-     * Operation name.
-     */
-    abstract String getOpName();
-
-    /**
-     * Parse command line arguments.
-     * 
-     * @param args arguments
-     * @throws IOException
-     */
-    abstract void parseArguments(String[] args) throws IOException;
-
-    /**
-     * Generate inputs for each daemon thread.
-     * 
-     * @param opsPerThread number of inputs for each thread.
-     * @throws IOException
-     */
-    abstract void generateInputs(int[] opsPerThread) throws IOException;
-
-    /**
-     * This corresponds to the arg1 argument of 
-     * {@link #executeOp(int, int, String)}, which can have different meanings
-     * depending on the operation performed.
-     * 
-     * @param daemonId
-     * @return the argument
-     */
-    abstract String getExecutionArgument(int daemonId);
-
-    /**
-     * Execute name-node operation.
-     * 
-     * @param daemonId id of the daemon calling this method.
-     * @param inputIdx serial index of the operation called by the deamon.
-     * @param arg1 operation specific argument.
-     * @return time of the individual name-node call.
-     * @throws IOException
-     */
-    abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
-
-    OperationStatsBase() {
-      baseDir = BASE_DIR_NAME + "/" + getOpName();
-      replication = (short) config.getInt("dfs.replication", 3);
-      numOpsRequired = 10;
-      numThreads = 3;
-    }
-
-    void benchmark() throws IOException {
-      List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
-      long start = 0;
-      try {
-        numOpsExecuted = 0;
-        cumulativeTime = 0;
-        if(numThreads < 1)
-          return;
-        int tIdx = 0; // thread index < nrThreads
-        int opsPerThread[] = new int[numThreads];
-        for(int opsScheduled = 0; opsScheduled < numOpsRequired; 
-                                  opsScheduled += opsPerThread[tIdx++]) {
-          // execute  in a separate thread
-          opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
-          if(opsPerThread[tIdx] == 0)
-            opsPerThread[tIdx] = 1;
-        }
-        // if numThreads > numOpsRequired then the remaining threads will do nothing
-        for(; tIdx < numThreads; tIdx++)
-          opsPerThread[tIdx] = 0;
-        turnOffNameNodeLogging();
-        generateInputs(opsPerThread);
-        for(tIdx=0; tIdx < numThreads; tIdx++)
-          daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
-        start = System.currentTimeMillis();
-        LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
-        for(StatsDaemon d : daemons)
-          d.start();
-      } finally {
-        while(isInPorgress(daemons)) {
-          // try {Thread.sleep(500);} catch (InterruptedException e) {}
-        }
-        elapsedTime = System.currentTimeMillis() - start;
-        for(StatsDaemon d : daemons) {
-          incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
-          // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
-        }
-      }
-    }
-
-    private boolean isInPorgress(List<StatsDaemon> daemons) {
-      for(StatsDaemon d : daemons)
-        if(d.isInPorgress())
-          return true;
-      return false;
-    }
-
-    void cleanUp() throws IOException {
-      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
-      nameNode.delete(getBaseDir());
-    }
-
-    int getNumOpsExecuted() {
-      return numOpsExecuted;
-    }
-
-    long getCumulativeTime() {
-      return cumulativeTime;
-    }
-
-    long getElapsedTime() {
-      return elapsedTime;
-    }
-
-    long getAverageTime() {
-      return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
-    }
-
-    double getOpsPerSecond() {
-      return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
-    }
-
-    String getBaseDir() {
-      return baseDir;
-    }
-
-    String getClientName(int idx) {
-      return getOpName() + "-client-" + idx;
-    }
-
-    void incrementStats(int ops, long time) {
-      numOpsExecuted += ops;
-      cumulativeTime += time;
-    }
-
-    /**
-     * Parse first 2 arguments, corresponding to the "-op" option.
-     * 
-     * @param args
-     * @return true if operation is all, which means that options not related
-     * to this operation should be ignored, or false otherwise, meaning
-     * that usage should be printed when an unrelated option is encountered.
-     * @throws IOException
-     */
-    protected boolean verifyOpArgument(String[] args) {
-      if(args.length < 2 || ! args[0].startsWith("-op"))
-        printUsage();
-      String type = args[1];
-      if(OP_ALL_NAME.equals(type)) {
-        type = getOpName();
-        return true;
-      }
-      if(!getOpName().equals(type))
-        printUsage();
-      return false;
-    }
-
-    void printResults() {
-      LOG.info("--- " + getOpName() + " stats  ---");
-      LOG.info("# operations: " + getNumOpsExecuted());
-      LOG.info("Elapsed Time: " + getElapsedTime());
-      LOG.info(" Ops per sec: " + getOpsPerSecond());
-      LOG.info("Average Time: " + getAverageTime());
-    }
-  }
-
-  /**
-   * One of the threads that perform stats operations.
-   */
-  private static class StatsDaemon extends Thread {
-    private int daemonId;
-    private int opsPerThread;
-    private String arg1;      // argument passed to executeOp()
-    private volatile int  localNumOpsExecuted = 0;
-    private volatile long localCumulativeTime = 0;
-    private OperationStatsBase statsOp;
-
-    StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
-      this.daemonId = daemonId;
-      this.opsPerThread = nrOps;
-      this.statsOp = op;
-      // this.clientName = statsOp.getClientName(daemonId);
-      setName(toString());
-    }
-
-    public void run() {
-      localNumOpsExecuted = 0;
-      localCumulativeTime = 0;
-      arg1 = statsOp.getExecutionArgument(daemonId);
-      try {
-        benchmarkOne();
-      } catch(IOException ex) {
-        LOG.error("StatsDaemon " + daemonId + " failed: \n" 
-            + StringUtils.stringifyException(ex));
-      }
-    }
-
-    public String toString() {
-      return "StatsDaemon-" + daemonId;
-    }
-
-    void benchmarkOne() throws IOException {
-      for(int idx = 0; idx < opsPerThread; idx++) {
-        long stat = statsOp.executeOp(daemonId, idx, arg1);
-        localNumOpsExecuted++;
-        localCumulativeTime += stat;
-      }
-    }
-
-    boolean isInPorgress() {
-      return localNumOpsExecuted < opsPerThread;
-    }
-  }
-
-  /**
-   * File name generator.
-   * 
-   * Each directory contains not more than a fixed number (filesPerDir) 
-   * of files and directories.
-   * When the number of files in one directory reaches the maximum,
-   * the generator creates a new directory and proceeds generating files in it.
-   * The generated namespace tree is balanced that is any path to a leaf
-   * file is not less than the height of the tree minus one.
-   */
-  private static class FileGenerator {
-    private static final int DEFAULT_FILES_PER_DIRECTORY = 32;
-    // Average file name size is 16.5 bytes
-    private static final String FILE_NAME_PREFFIX ="ThrouputBenchfile"; // 17 bytes
-    private static final String DIR_NAME_PREFFIX = "ThrouputBenchDir";  // 16 bytes
-    // private static final int NUM_CLIENTS = 100;
-
-    private int[] pathIndecies = new int[20]; // this will support up to 32**20 = 2**100 = 10**30 files
-    private String baseDir;
-    private String currentDir;
-    private int filesPerDirectory = DEFAULT_FILES_PER_DIRECTORY;
-    private long fileCount;
-
-    FileGenerator(String baseDir, int filesPerDir) {
-      this.baseDir = baseDir;
-      this.filesPerDirectory = filesPerDir;
-      reset();
-    }
-
-    String getNextDirName() {
-      int depth = 0;
-      while(pathIndecies[depth] >= 0)
-        depth++;
-      int level;
-      for(level = depth-1; 
-          level >= 0 && pathIndecies[level] == filesPerDirectory-1; level--)
-        pathIndecies[level] = 0;
-      if(level < 0)
-        pathIndecies[depth] = 0;
-      else
-        pathIndecies[level]++;
-      level = 0;
-      String next = baseDir;
-      while(pathIndecies[level] >= 0)
-        next = next + "/" + DIR_NAME_PREFFIX + pathIndecies[level++];
-      return next; 
-    }
-
-    synchronized String getNextFileName() {
-      long fNum = fileCount % filesPerDirectory;
-      if(fNum == 0) {
-        currentDir = getNextDirName();
-        // System.out.println("currentDir: " + currentDir);
-      }
-      String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
-      // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
-      fileCount++;
-      return fn;
-    }
-
-    private synchronized void reset() {
-      Arrays.fill(pathIndecies, -1);
-      fileCount = 0L;
-      currentDir = "";
-    }
-  }
-
-  /**
-   * File creation statistics.
-   * 
-   * Each thread creates the same (+ or -1) number of files.
-   * File names are pre-generated during initialization.
-   * The created files do not have blocks.
-   */
-  class CreateFileStats extends OperationStatsBase {
-    // Operation types
-    static final String OP_CREATE_NAME = "create";
-    static final String OP_CREATE_USAGE = 
-      "-op create [-threads T] [-files N] [-filesPerDir P]";
-
-    protected FileGenerator nameGenerator;
-    protected String[][] fileNames;
-
-    CreateFileStats(String[] args) {
-      super();
-      parseArguments(args);
-    }
-
-    String getOpName() {
-      return OP_CREATE_NAME;
-    }
-
-    void parseArguments(String[] args) {
-      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
-      int nrFilesPerDir = 4;
-      for (int i = 2; i < args.length; i++) {       // parse command line
-        if(args[i].equals("-files")) {
-          if(i+1 == args.length)  printUsage();
-          numOpsRequired = Integer.parseInt(args[++i]);
-        } else if(args[i].equals("-threads")) {
-          if(i+1 == args.length)  printUsage();
-          numThreads = Integer.parseInt(args[++i]);
-        } else if(args[i].equals("-filesPerDir")) {
-          if(i+1 == args.length)  printUsage();
-          nrFilesPerDir = Integer.parseInt(args[++i]);
-        } else if(!ignoreUnrelatedOptions)
-          printUsage();
-      }
-      nameGenerator = new FileGenerator(getBaseDir(), nrFilesPerDir);
-    }
-
-    void generateInputs(int[] opsPerThread) throws IOException {
-      assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
-      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
-      // int generatedFileIdx = 0;
-      fileNames = new String[numThreads][];
-      for(int idx=0; idx < numThreads; idx++) {
-        int threadOps = opsPerThread[idx];
-        fileNames[idx] = new String[threadOps];
-        for(int jdx=0; jdx < threadOps; jdx++)
-          fileNames[idx][jdx] = nameGenerator.getNextFileName();
-      }
-    }
-
-    void dummyActionNoSynch(int daemonId, int fileIdx) {
-      for(int i=0; i < 2000; i++)
-        fileNames[daemonId][fileIdx].contains(""+i);
-    }
-
-    /**
-     * returns client name
-     */
-    String getExecutionArgument(int daemonId) {
-      return getClientName(daemonId);
-    }
-
-    /**
-     * Do file create.
-     */
-    long executeOp(int daemonId, int inputIdx, String clientName) 
-    throws IOException {
-      long start = System.currentTimeMillis();
-      // dummyActionNoSynch(fileIdx);
-      nameNode.create(fileNames[daemonId][inputIdx], clientName, 
-                      true, replication, BLOCK_SIZE);
-      long end = System.currentTimeMillis();
-      return end-start;
-    }
-
-    void printResults() {
-      LOG.info("--- " + getOpName() + " inputs ---");
-      LOG.info("nrFiles = " + numOpsRequired);
-      LOG.info("nrThreads = " + numThreads);
-      LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
-      super.printResults();
-    }
-  }
-
-  /**
-   * Open file statistics.
-   * 
-   * Each thread creates the same (+ or -1) number of files.
-   * File names are pre-generated during initialization.
-   * The created files do not have blocks.
-   */
-  class OpenFileStats extends CreateFileStats {
-    // Operation types
-    static final String OP_OPEN_NAME = "open";
-    static final String OP_OPEN_USAGE = 
-      "-op open [-threads T] [-files N] [-filesPerDir P]";
-
-    OpenFileStats(String[] args) {
-      super(args);
-    }
-
-    String getOpName() {
-      return OP_OPEN_NAME;
-    }
-
-    void generateInputs(int[] opsPerThread) throws IOException {
-      // create files using opsPerThread
-      String[] createArgs = new String[] {
-              "-op", "create", 
-              "-threads", String.valueOf(this.numThreads), 
-              "-files", String.valueOf(numOpsRequired),
-              "-filesPerDir", String.valueOf(nameGenerator.filesPerDirectory)};
-      CreateFileStats opCreate =  new CreateFileStats(createArgs);
-      opCreate.benchmark();
-      nameNode.rename(opCreate.getBaseDir(), getBaseDir());
-      // use the same files for open
-      super.generateInputs(opsPerThread);
-    }
-
-    /**
-     * Do file open.
-     */
-    long executeOp(int daemonId, int inputIdx, String ignore) 
-    throws IOException {
-      long start = System.currentTimeMillis();
-      nameNode.open(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
-      long end = System.currentTimeMillis();
-      return end-start;
-    }
-  }
-
-  /**
-   * Minimal datanode simulator.
-   */
-  private static class TinyDatanode implements Comparable<String> {
-    private static final long DF_CAPACITY = 100*1024*1024;
-    private static final long DF_USED = 0;
-    DatanodeRegistration dnRegistration;
-    Block[] blocks;
-    int nrBlocks; // actual number of blocks
-
-    /**
-     * Get data-node in the form 
-     * <host name> : <port>
-     * where port is a 6 digit integer.
-     * This is necessary in order to provide lexocographic ordering.
-     * Host names are all the same, the ordering goes by port numbers.
-     */
-    private static String getNodeName(int port) throws IOException {
-      String machineName = DNS.getDefaultHost("default", "default");
-      String sPort = String.valueOf(100000 + port);
-      if(sPort.length() > 6)
-        throw new IOException("Too many data-nodes.");
-      return machineName + ":" + sPort;
-    }
-
-    TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
-      dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
-      this.blocks = new Block[blockCapacity];
-      this.nrBlocks = 0;
-    }
-
-    void register() throws IOException {
-      // get versions from the namenode
-      NamespaceInfo nsInfo = nameNode.versionRequest();
-      dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
-      DataNode.setNewStorageID(dnRegistration);
-      // get network location
-      String networkLoc = NetworkTopology.DEFAULT_RACK;
-      // register datanode
-      dnRegistration = nameNode.register(dnRegistration, networkLoc);
-    }
-
-    void sendHeartbeat() throws IOException {
-      // register datanode
-      DatanodeCommand cmd = nameNode.sendHeartbeat(
-          dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
-      if(cmd != null)
-        LOG.info("sendHeartbeat Name-node reply: " + cmd.getAction());
-    }
-
-    boolean addBlock(Block blk) {
-      if(nrBlocks == blocks.length) {
-        LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
-        return false;
-      }
-      blocks[nrBlocks] = blk;
-      nrBlocks++;
-      return true;
-    }
-
-    void formBlockReport() {
-      // fill remaining slots with blocks that do not exist
-      for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
-        blocks[idx] = new Block(blocks.length - idx, 0);
-    }
-
-    public int compareTo(String name) {
-      return dnRegistration.getName().compareTo(name);
-    }
-  }
-
-  /**
-   * Block report statistics.
-   * 
-   * Each thread here represents its own data-node.
-   * Data-nodes send the same block report each time.
-   * The block report may contain missing or non-existing blocks.
-   */
-  class BlockReportStats extends OperationStatsBase {
-    static final String OP_BLOCK_REPORT_NAME = "blockReport";
-    static final String OP_BLOCK_REPORT_USAGE = 
-      "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
-
-    private int blocksPerReport;
-    private int blocksPerFile;
-    private TinyDatanode[] datanodes; // array of data-nodes sorted by name
-
-    BlockReportStats(String[] args) {
-      super();
-      this.blocksPerReport = 100;
-      this.blocksPerFile = 10;
-      // set heartbeat interval to 3 min, so that expiration were 40 min
-      config.setLong("dfs.heartbeat.interval", 3 * 60);
-      parseArguments(args);
-      // adjust replication to the number of data-nodes
-      this.replication = (short)Math.min((int)replication, getNumDatanodes());
-    }
-
-    /**
-     * Each thread pretends its a data-node here.
-     */
-    private int getNumDatanodes() {
-      return numThreads;
-    }
-
-    String getOpName() {
-      return OP_BLOCK_REPORT_NAME;
-    }
-
-    void parseArguments(String[] args) {
-      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
-      for (int i = 2; i < args.length; i++) {       // parse command line
-        if(args[i].equals("-reports")) {
-          if(i+1 == args.length)  printUsage();
-          numOpsRequired = Integer.parseInt(args[++i]);
-        } else if(args[i].equals("-datanodes")) {
-          if(i+1 == args.length)  printUsage();
-          numThreads = Integer.parseInt(args[++i]);
-        } else if(args[i].equals("-blocksPerReport")) {
-          if(i+1 == args.length)  printUsage();
-          blocksPerReport = Integer.parseInt(args[++i]);
-        } else if(args[i].equals("-blocksPerFile")) {
-          if(i+1 == args.length)  printUsage();
-          blocksPerFile = Integer.parseInt(args[++i]);
-        } else if(!ignoreUnrelatedOptions)
-          printUsage();
-      }
-    }
-
-    void generateInputs(int[] ignore) throws IOException {
-      int nrDatanodes = getNumDatanodes();
-      int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes 
-                                    / replication);
-      int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
-      datanodes = new TinyDatanode[nrDatanodes];
-      // create data-nodes
-      String prevDNName = "";
-      for(int idx=0; idx < nrDatanodes; idx++) {
-        datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
-        datanodes[idx].register();
-        assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
-          : "Data-nodes must be sorted lexicographically.";
-        datanodes[idx].sendHeartbeat();
-        prevDNName = datanodes[idx].dnRegistration.getName();
-      }
-      // create files 
-      FileGenerator nameGenerator;
-      nameGenerator = new FileGenerator(getBaseDir(), 100);
-      String clientName = getClientName(007);
-      for(int idx=0; idx < nrFiles; idx++) {
-        String fileName = nameGenerator.getNextFileName();
-        nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
-        addBlocks(fileName, clientName);
-        nameNode.complete(fileName, clientName);
-      }
-      // prepare block reports
-      for(int idx=0; idx < nrDatanodes; idx++) {
-        datanodes[idx].formBlockReport();
-      }
-    }
-
-    private void addBlocks(String fileName, String clientName) throws IOException {
-      for(int jdx = 0; jdx < blocksPerFile; jdx++) {
-        LocatedBlock loc = nameNode.addBlock(fileName, clientName);
-        for(DatanodeInfo dnInfo : loc.getLocations()) {
-          int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
-          datanodes[dnIdx].addBlock(loc.getBlock());
-          nameNode.blockReceived(
-              datanodes[dnIdx].dnRegistration, 
-              new Block[] {loc.getBlock()},
-              new String[] {""});
-        }
-      }
-    }
-
-    /**
-     * Does not require the argument
-     */
-    String getExecutionArgument(int daemonId) {
-      return null;
-    }
-
-    long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
-      assert daemonId < numThreads : "Wrong daemonId.";
-      TinyDatanode dn = datanodes[daemonId];
-      long start = System.currentTimeMillis();
-      nameNode.blockReport(dn.dnRegistration, dn.blocks);
-      long end = System.currentTimeMillis();
-      return end-start;
-    }
-
-    /**
-     * Defines data-node name since client are data-nodes in this case.
-     */
-    @Override
-    String getClientName(int idx) {
-      return getOpName() + "-client-" + idx;
-    }
-
-    void printResults() {
-      String blockDistribution = "";
-      String delim = "(";
-      for(int idx=0; idx < getNumDatanodes(); idx++) {
-        blockDistribution += delim + datanodes[idx].nrBlocks;
-        delim = ", ";
-      }
-      blockDistribution += ")";
-      LOG.info("--- " + getOpName() + " inputs ---");
-      LOG.info("reports = " + numOpsRequired);
-      LOG.info("datanodes = " + numThreads + " " + blockDistribution);
-      LOG.info("blocksPerReport = " + blocksPerReport);
-      LOG.info("blocksPerFile = " + blocksPerFile);
-      super.printResults();
-    }
-  }
-
-  static void printUsage() {
-    System.err.println("Usage: NNThroughputBenchmark"
-        + "\n\t"    + OperationStatsBase.OP_ALL_USAGE
-        + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
-        + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
-        + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
-    );
-    System.exit(-1);
-  }
-
-  /**
-   * Main method of the benchmark.
-   * @param args command line parameters
-   */
-  public static void runBenchmark(Configuration conf, String[] args) throws Exception {
-    if(args.length < 2 || ! args[0].startsWith("-op"))
-      printUsage();
-
-    String type = args[1];
-    boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
-
-    NNThroughputBenchmark bench = null;
-    List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
-    OperationStatsBase opStat = null;
-    try {
-      bench = new NNThroughputBenchmark(conf);
-      if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
-        opStat = bench.new CreateFileStats(args);
-        ops.add(opStat);
-      }
-      if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
-        opStat = bench.new OpenFileStats(args);
-        ops.add(opStat);
-      }
-      if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
-        opStat = bench.new BlockReportStats(args);
-        ops.add(opStat);
-      }
-      if(ops.size() == 0)
-        printUsage();
-      // run each bencmark
-      for(OperationStatsBase op : ops) {
-        LOG.info("Starting benchmark: " + op.getOpName());
-        op.benchmark();
-        op.cleanUp();
-      }
-      // print statistics
-      for(OperationStatsBase op : ops) {
-        LOG.info("");
-        op.printResults();
-      }
-    } catch(Exception e) {
-      LOG.error(StringUtils.stringifyException(e));
-      throw e;
-    } finally {
-      if(bench != null)
-        bench.close();
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    runBenchmark(new Configuration(), args);
-  }
-}
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+
+/**
+ * Main class for a series of name-node benchmarks.
+ * 
+ * Each benchmark measures throughput and average execution time 
+ * of a specific name-node operation, e.g. file creation or block reports.
+ * 
+ * The benchmark does not involve any other hadoop components
+ * except for the name-node. Each operation is executed
+ * by calling directly the respective name-node method.
+ * The name-node here is real all other components are simulated.
+ * 
+ * Command line arguments for the benchmark include:<br>
+ * 1) total number of operations to be performed,<br>
+ * 2) number of threads to run these operations,<br>
+ * 3) followed by operation specific input parameters.
+ * 
+ * Then the benchmark generates inputs for each thread so that the
+ * input generation overhead does not effect the resulting statistics.
+ * The number of operations performed by threads practically is the same. 
+ * Precisely, the difference between the number of operations 
+ * performed by any two threads does not exceed 1.
+ * 
+ * Then the benchmark executes the specified number of operations using 
+ * the specified number of threads and outputs the resulting stats.
+ */
+public class NNThroughputBenchmark {
+  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
+  private static final int BLOCK_SIZE = 16;
+
+  static Configuration config;
+  static NameNode nameNode;
+
+  NNThroughputBenchmark(Configuration conf) throws IOException {
+    config = conf;
+    // We do not need many handlers, since each thread simulates a handler
+    // by calling name-node methods directly
+    config.setInt("dfs.namenode.handler.count", 1);
+    // Start the NameNode
+    String[] args = new String[] {};
+    nameNode = NameNode.createNameNode(args, config);
+  }
+
+  void close() throws IOException {
+    nameNode.stop();
+  }
+
+  static void turnOffNameNodeLogging() {
+    // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
+    ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
+    ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
+  }
+
+  /**
+   * Base class for collecting operation statistics.
+   * 
+   * Overload this class in order to run statistics for a 
+   * specific name-node operation.
+   */
+  abstract class OperationStatsBase {
+    protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
+    protected static final String OP_ALL_NAME = "all";
+    protected static final String OP_ALL_USAGE = "-op all <other ops options>";
+
+    protected String baseDir;
+    protected short replication;
+    protected int  numThreads = 0;        // number of threads
+    protected int  numOpsRequired = 0;    // number of operations requested
+    protected int  numOpsExecuted = 0;    // number of operations executed
+    protected long cumulativeTime = 0;    // sum of times for each op
+    protected long elapsedTime = 0;       // time from start to finish
+
+    /**
+     * Operation name.
+     */
+    abstract String getOpName();
+
+    /**
+     * Parse command line arguments.
+     * 
+     * @param args arguments
+     * @throws IOException
+     */
+    abstract void parseArguments(String[] args) throws IOException;
+
+    /**
+     * Generate inputs for each daemon thread.
+     * 
+     * @param opsPerThread number of inputs for each thread.
+     * @throws IOException
+     */
+    abstract void generateInputs(int[] opsPerThread) throws IOException;
+
+    /**
+     * This corresponds to the arg1 argument of 
+     * {@link #executeOp(int, int, String)}, which can have different meanings
+     * depending on the operation performed.
+     * 
+     * @param daemonId
+     * @return the argument
+     */
+    abstract String getExecutionArgument(int daemonId);
+
+    /**
+     * Execute name-node operation.
+     * 
+     * @param daemonId id of the daemon calling this method.
+     * @param inputIdx serial index of the operation called by the deamon.
+     * @param arg1 operation specific argument.
+     * @return time of the individual name-node call.
+     * @throws IOException
+     */
+    abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
+
+    OperationStatsBase() {
+      baseDir = BASE_DIR_NAME + "/" + getOpName();
+      replication = (short) config.getInt("dfs.replication", 3);
+      numOpsRequired = 10;
+      numThreads = 3;
+    }
+
+    void benchmark() throws IOException {
+      List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
+      long start = 0;
+      try {
+        numOpsExecuted = 0;
+        cumulativeTime = 0;
+        if(numThreads < 1)
+          return;
+        int tIdx = 0; // thread index < nrThreads
+        int opsPerThread[] = new int[numThreads];
+        for(int opsScheduled = 0; opsScheduled < numOpsRequired; 
+                                  opsScheduled += opsPerThread[tIdx++]) {
+          // execute  in a separate thread
+          opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
+          if(opsPerThread[tIdx] == 0)
+            opsPerThread[tIdx] = 1;
+        }
+        // if numThreads > numOpsRequired then the remaining threads will do nothing
+        for(; tIdx < numThreads; tIdx++)
+          opsPerThread[tIdx] = 0;
+        turnOffNameNodeLogging();
+        generateInputs(opsPerThread);
+        for(tIdx=0; tIdx < numThreads; tIdx++)
+          daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
+        start = System.currentTimeMillis();
+        LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
+        for(StatsDaemon d : daemons)
+          d.start();
+      } finally {
+        while(isInPorgress(daemons)) {
+          // try {Thread.sleep(500);} catch (InterruptedException e) {}
+        }
+        elapsedTime = System.currentTimeMillis() - start;
+        for(StatsDaemon d : daemons) {
+          incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
+          // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
+        }
+      }
+    }
+
+    private boolean isInPorgress(List<StatsDaemon> daemons) {
+      for(StatsDaemon d : daemons)
+        if(d.isInPorgress())
+          return true;
+      return false;
+    }
+
+    void cleanUp() throws IOException {
+      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+      nameNode.delete(getBaseDir());
+    }
+
+    int getNumOpsExecuted() {
+      return numOpsExecuted;
+    }
+
+    long getCumulativeTime() {
+      return cumulativeTime;
+    }
+
+    long getElapsedTime() {
+      return elapsedTime;
+    }
+
+    long getAverageTime() {
+      return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
+    }
+
+    double getOpsPerSecond() {
+      return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
+    }
+
+    String getBaseDir() {
+      return baseDir;
+    }
+
+    String getClientName(int idx) {
+      return getOpName() + "-client-" + idx;
+    }
+
+    void incrementStats(int ops, long time) {
+      numOpsExecuted += ops;
+      cumulativeTime += time;
+    }
+
+    /**
+     * Parse first 2 arguments, corresponding to the "-op" option.
+     * 
+     * @param args
+     * @return true if operation is all, which means that options not related
+     * to this operation should be ignored, or false otherwise, meaning
+     * that usage should be printed when an unrelated option is encountered.
+     * @throws IOException
+     */
+    protected boolean verifyOpArgument(String[] args) {
+      if(args.length < 2 || ! args[0].startsWith("-op"))
+        printUsage();
+      String type = args[1];
+      if(OP_ALL_NAME.equals(type)) {
+        type = getOpName();
+        return true;
+      }
+      if(!getOpName().equals(type))
+        printUsage();
+      return false;
+    }
+
+    void printResults() {
+      LOG.info("--- " + getOpName() + " stats  ---");
+      LOG.info("# operations: " + getNumOpsExecuted());
+      LOG.info("Elapsed Time: " + getElapsedTime());
+      LOG.info(" Ops per sec: " + getOpsPerSecond());
+      LOG.info("Average Time: " + getAverageTime());
+    }
+  }
+
+  /**
+   * One of the threads that perform stats operations.
+   */
+  private static class StatsDaemon extends Thread {
+    private int daemonId;
+    private int opsPerThread;
+    private String arg1;      // argument passed to executeOp()
+    private volatile int  localNumOpsExecuted = 0;
+    private volatile long localCumulativeTime = 0;
+    private OperationStatsBase statsOp;
+
+    StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
+      this.daemonId = daemonId;
+      this.opsPerThread = nrOps;
+      this.statsOp = op;
+      // this.clientName = statsOp.getClientName(daemonId);
+      setName(toString());
+    }
+
+    public void run() {
+      localNumOpsExecuted = 0;
+      localCumulativeTime = 0;
+      arg1 = statsOp.getExecutionArgument(daemonId);
+      try {
+        benchmarkOne();
+      } catch(IOException ex) {
+        LOG.error("StatsDaemon " + daemonId + " failed: \n" 
+            + StringUtils.stringifyException(ex));
+      }
+    }
+
+    public String toString() {
+      return "StatsDaemon-" + daemonId;
+    }
+
+    void benchmarkOne() throws IOException {
+      for(int idx = 0; idx < opsPerThread; idx++) {
+        long stat = statsOp.executeOp(daemonId, idx, arg1);
+        localNumOpsExecuted++;
+        localCumulativeTime += stat;
+      }
+    }
+
+    boolean isInPorgress() {
+      return localNumOpsExecuted < opsPerThread;
+    }
+  }
+
+  /**
+   * File name generator.
+   * 
+   * Each directory contains not more than a fixed number (filesPerDir) 
+   * of files and directories.
+   * When the number of files in one directory reaches the maximum,
+   * the generator creates a new directory and proceeds generating files in it.
+   * The generated namespace tree is balanced that is any path to a leaf
+   * file is not less than the height of the tree minus one.
+   */
+  private static class FileGenerator {
+    private static final int DEFAULT_FILES_PER_DIRECTORY = 32;
+    // Average file name size is 16.5 bytes
+    private static final String FILE_NAME_PREFFIX ="ThrouputBenchfile"; // 17 bytes
+    private static final String DIR_NAME_PREFFIX = "ThrouputBenchDir";  // 16 bytes
+    // private static final int NUM_CLIENTS = 100;
+
+    private int[] pathIndecies = new int[20]; // this will support up to 32**20 = 2**100 = 10**30 files
+    private String baseDir;
+    private String currentDir;
+    private int filesPerDirectory = DEFAULT_FILES_PER_DIRECTORY;
+    private long fileCount;
+
+    FileGenerator(String baseDir, int filesPerDir) {
+      this.baseDir = baseDir;
+      this.filesPerDirectory = filesPerDir;
+      reset();
+    }
+
+    String getNextDirName() {
+      int depth = 0;
+      while(pathIndecies[depth] >= 0)
+        depth++;
+      int level;
+      for(level = depth-1; 
+          level >= 0 && pathIndecies[level] == filesPerDirectory-1; level--)
+        pathIndecies[level] = 0;
+      if(level < 0)
+        pathIndecies[depth] = 0;
+      else
+        pathIndecies[level]++;
+      level = 0;
+      String next = baseDir;
+      while(pathIndecies[level] >= 0)
+        next = next + "/" + DIR_NAME_PREFFIX + pathIndecies[level++];
+      return next; 
+    }
+
+    synchronized String getNextFileName() {
+      long fNum = fileCount % filesPerDirectory;
+      if(fNum == 0) {
+        currentDir = getNextDirName();
+        // System.out.println("currentDir: " + currentDir);
+      }
+      String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
+      // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
+      fileCount++;
+      return fn;
+    }
+
+    private synchronized void reset() {
+      Arrays.fill(pathIndecies, -1);
+      fileCount = 0L;
+      currentDir = "";
+    }
+  }
+
+  /**
+   * File creation statistics.
+   * 
+   * Each thread creates the same (+ or -1) number of files.
+   * File names are pre-generated during initialization.
+   * The created files do not have blocks.
+   */
+  class CreateFileStats extends OperationStatsBase {
+    // Operation types
+    static final String OP_CREATE_NAME = "create";
+    static final String OP_CREATE_USAGE = 
+      "-op create [-threads T] [-files N] [-filesPerDir P]";
+
+    protected FileGenerator nameGenerator;
+    protected String[][] fileNames;
+
+    CreateFileStats(String[] args) {
+      super();
+      parseArguments(args);
+    }
+
+    String getOpName() {
+      return OP_CREATE_NAME;
+    }
+
+    void parseArguments(String[] args) {
+      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+      int nrFilesPerDir = 4;
+      for (int i = 2; i < args.length; i++) {       // parse command line
+        if(args[i].equals("-files")) {
+          if(i+1 == args.length)  printUsage();
+          numOpsRequired = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-threads")) {
+          if(i+1 == args.length)  printUsage();
+          numThreads = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-filesPerDir")) {
+          if(i+1 == args.length)  printUsage();
+          nrFilesPerDir = Integer.parseInt(args[++i]);
+        } else if(!ignoreUnrelatedOptions)
+          printUsage();
+      }
+      nameGenerator = new FileGenerator(getBaseDir(), nrFilesPerDir);
+    }
+
+    void generateInputs(int[] opsPerThread) throws IOException {
+      assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
+      nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+      // int generatedFileIdx = 0;
+      fileNames = new String[numThreads][];
+      for(int idx=0; idx < numThreads; idx++) {
+        int threadOps = opsPerThread[idx];
+        fileNames[idx] = new String[threadOps];
+        for(int jdx=0; jdx < threadOps; jdx++)
+          fileNames[idx][jdx] = nameGenerator.getNextFileName();
+      }
+    }
+
+    void dummyActionNoSynch(int daemonId, int fileIdx) {
+      for(int i=0; i < 2000; i++)
+        fileNames[daemonId][fileIdx].contains(""+i);
+    }
+
+    /**
+     * returns client name
+     */
+    String getExecutionArgument(int daemonId) {
+      return getClientName(daemonId);
+    }
+
+    /**
+     * Do file create.
+     */
+    long executeOp(int daemonId, int inputIdx, String clientName) 
+    throws IOException {
+      long start = System.currentTimeMillis();
+      // dummyActionNoSynch(fileIdx);
+      nameNode.create(fileNames[daemonId][inputIdx], clientName, 
+                      true, replication, BLOCK_SIZE);
+      long end = System.currentTimeMillis();
+      return end-start;
+    }
+
+    void printResults() {
+      LOG.info("--- " + getOpName() + " inputs ---");
+      LOG.info("nrFiles = " + numOpsRequired);
+      LOG.info("nrThreads = " + numThreads);
+      LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
+      super.printResults();
+    }
+  }
+
+  /**
+   * Open file statistics.
+   * 
+   * Each thread creates the same (+ or -1) number of files.
+   * File names are pre-generated during initialization.
+   * The created files do not have blocks.
+   */
+  class OpenFileStats extends CreateFileStats {
+    // Operation types
+    static final String OP_OPEN_NAME = "open";
+    static final String OP_OPEN_USAGE = 
+      "-op open [-threads T] [-files N] [-filesPerDir P]";
+
+    OpenFileStats(String[] args) {
+      super(args);
+    }
+
+    String getOpName() {
+      return OP_OPEN_NAME;
+    }
+
+    void generateInputs(int[] opsPerThread) throws IOException {
+      // create files using opsPerThread
+      String[] createArgs = new String[] {
+              "-op", "create", 
+              "-threads", String.valueOf(this.numThreads), 
+              "-files", String.valueOf(numOpsRequired),
+              "-filesPerDir", String.valueOf(nameGenerator.filesPerDirectory)};
+      CreateFileStats opCreate =  new CreateFileStats(createArgs);
+      opCreate.benchmark();
+      nameNode.rename(opCreate.getBaseDir(), getBaseDir());
+      // use the same files for open
+      super.generateInputs(opsPerThread);
+    }
+
+    /**
+     * Do file open.
+     */
+    long executeOp(int daemonId, int inputIdx, String ignore) 
+    throws IOException {
+      long start = System.currentTimeMillis();
+      nameNode.open(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+      long end = System.currentTimeMillis();
+      return end-start;
+    }
+  }
+
+  /**
+   * Minimal datanode simulator.
+   */
+  private static class TinyDatanode implements Comparable<String> {
+    private static final long DF_CAPACITY = 100*1024*1024;
+    private static final long DF_USED = 0;
+    DatanodeRegistration dnRegistration;
+    Block[] blocks;
+    int nrBlocks; // actual number of blocks
+
+    /**
+     * Get data-node in the form 
+     * <host name> : <port>
+     * where port is a 6 digit integer.
+     * This is necessary in order to provide lexocographic ordering.
+     * Host names are all the same, the ordering goes by port numbers.
+     */
+    private static String getNodeName(int port) throws IOException {
+      String machineName = DNS.getDefaultHost("default", "default");
+      String sPort = String.valueOf(100000 + port);
+      if(sPort.length() > 6)
+        throw new IOException("Too many data-nodes.");
+      return machineName + ":" + sPort;
+    }
+
+    TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
+      dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
+      this.blocks = new Block[blockCapacity];
+      this.nrBlocks = 0;
+    }
+
+    void register() throws IOException {
+      // get versions from the namenode
+      NamespaceInfo nsInfo = nameNode.versionRequest();
+      dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
+      DataNode.setNewStorageID(dnRegistration);
+      // get network location
+      String networkLoc = NetworkTopology.DEFAULT_RACK;
+      // register datanode
+      dnRegistration = nameNode.register(dnRegistration, networkLoc);
+    }
+
+    void sendHeartbeat() throws IOException {
+      // register datanode
+      DatanodeCommand cmd = nameNode.sendHeartbeat(
+          dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+      if(cmd != null)
+        LOG.info("sendHeartbeat Name-node reply: " + cmd.getAction());
+    }
+
+    boolean addBlock(Block blk) {
+      if(nrBlocks == blocks.length) {
+        LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
+        return false;
+      }
+      blocks[nrBlocks] = blk;
+      nrBlocks++;
+      return true;
+    }
+
+    void formBlockReport() {
+      // fill remaining slots with blocks that do not exist
+      for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
+        blocks[idx] = new Block(blocks.length - idx, 0);
+    }
+
+    public int compareTo(String name) {
+      return dnRegistration.getName().compareTo(name);
+    }
+  }
+
+  /**
+   * Block report statistics.
+   * 
+   * Each thread here represents its own data-node.
+   * Data-nodes send the same block report each time.
+   * The block report may contain missing or non-existing blocks.
+   */
+  class BlockReportStats extends OperationStatsBase {
+    static final String OP_BLOCK_REPORT_NAME = "blockReport";
+    static final String OP_BLOCK_REPORT_USAGE = 
+      "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
+
+    private int blocksPerReport;
+    private int blocksPerFile;
+    private TinyDatanode[] datanodes; // array of data-nodes sorted by name
+
+    BlockReportStats(String[] args) {
+      super();
+      this.blocksPerReport = 100;
+      this.blocksPerFile = 10;
+      // set heartbeat interval to 3 min, so that expiration were 40 min
+      config.setLong("dfs.heartbeat.interval", 3 * 60);
+      parseArguments(args);
+      // adjust replication to the number of data-nodes
+      this.replication = (short)Math.min((int)replication, getNumDatanodes());
+    }
+
+    /**
+     * Each thread pretends its a data-node here.
+     */
+    private int getNumDatanodes() {
+      return numThreads;
+    }
+
+    String getOpName() {
+      return OP_BLOCK_REPORT_NAME;
+    }
+
+    void parseArguments(String[] args) {
+      boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+      for (int i = 2; i < args.length; i++) {       // parse command line
+        if(args[i].equals("-reports")) {
+          if(i+1 == args.length)  printUsage();
+          numOpsRequired = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-datanodes")) {
+          if(i+1 == args.length)  printUsage();
+          numThreads = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-blocksPerReport")) {
+          if(i+1 == args.length)  printUsage();
+          blocksPerReport = Integer.parseInt(args[++i]);
+        } else if(args[i].equals("-blocksPerFile")) {
+          if(i+1 == args.length)  printUsage();
+          blocksPerFile = Integer.parseInt(args[++i]);
+        } else if(!ignoreUnrelatedOptions)
+          printUsage();
+      }
+    }
+
+    void generateInputs(int[] ignore) throws IOException {
+      int nrDatanodes = getNumDatanodes();
+      int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes 
+                                    / replication);
+      int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
+      datanodes = new TinyDatanode[nrDatanodes];
+      // create data-nodes
+      String prevDNName = "";
+      for(int idx=0; idx < nrDatanodes; idx++) {
+        datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
+        datanodes[idx].register();
+        assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
+          : "Data-nodes must be sorted lexicographically.";
+        datanodes[idx].sendHeartbeat();
+        prevDNName = datanodes[idx].dnRegistration.getName();
+      }
+      // create files 
+      FileGenerator nameGenerator;
+      nameGenerator = new FileGenerator(getBaseDir(), 100);
+      String clientName = getClientName(007);
+      for(int idx=0; idx < nrFiles; idx++) {
+        String fileName = nameGenerator.getNextFileName();
+        nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
+        addBlocks(fileName, clientName);
+        nameNode.complete(fileName, clientName);
+      }
+      // prepare block reports
+      for(int idx=0; idx < nrDatanodes; idx++) {
+        datanodes[idx].formBlockReport();
+      }
+    }
+
+    private void addBlocks(String fileName, String clientName) throws IOException {
+      for(int jdx = 0; jdx < blocksPerFile; jdx++) {
+        LocatedBlock loc = nameNode.addBlock(fileName, clientName);
+        for(DatanodeInfo dnInfo : loc.getLocations()) {
+          int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
+          datanodes[dnIdx].addBlock(loc.getBlock());
+          nameNode.blockReceived(
+              datanodes[dnIdx].dnRegistration, 
+              new Block[] {loc.getBlock()},
+              new String[] {""});
+        }
+      }
+    }
+
+    /**
+     * Does not require the argument
+     */
+    String getExecutionArgument(int daemonId) {
+      return null;
+    }
+
+    long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
+      assert daemonId < numThreads : "Wrong daemonId.";
+      TinyDatanode dn = datanodes[daemonId];
+      long start = System.currentTimeMillis();
+      nameNode.blockReport(dn.dnRegistration, dn.blocks);
+      long end = System.currentTimeMillis();
+      return end-start;
+    }
+
+    /**
+     * Defines data-node name since client are data-nodes in this case.
+     */
+    @Override
+    String getClientName(int idx) {
+      return getOpName() + "-client-" + idx;
+    }
+
+    void printResults() {
+      String blockDistribution = "";
+      String delim = "(";
+      for(int idx=0; idx < getNumDatanodes(); idx++) {
+        blockDistribution += delim + datanodes[idx].nrBlocks;
+        delim = ", ";
+      }
+      blockDistribution += ")";
+      LOG.info("--- " + getOpName() + " inputs ---");
+      LOG.info("reports = " + numOpsRequired);
+      LOG.info("datanodes = " + numThreads + " " + blockDistribution);
+      LOG.info("blocksPerReport = " + blocksPerReport);
+      LOG.info("blocksPerFile = " + blocksPerFile);
+      super.printResults();
+    }
+  }
+
+  static void printUsage() {
+    System.err.println("Usage: NNThroughputBenchmark"
+        + "\n\t"    + OperationStatsBase.OP_ALL_USAGE
+        + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
+        + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
+        + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
+    );
+    System.exit(-1);
+  }
+
+  /**
+   * Main method of the benchmark.
+   * @param args command line parameters
+   */
+  public static void runBenchmark(Configuration conf, String[] args) throws Exception {
+    if(args.length < 2 || ! args[0].startsWith("-op"))
+      printUsage();
+
+    String type = args[1];
+    boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
+
+    NNThroughputBenchmark bench = null;
+    List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
+    OperationStatsBase opStat = null;
+    try {
+      bench = new NNThroughputBenchmark(conf);
+      if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
+        opStat = bench.new CreateFileStats(args);
+        ops.add(opStat);
+      }
+      if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
+        opStat = bench.new OpenFileStats(args);
+        ops.add(opStat);
+      }
+      if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
+        opStat = bench.new BlockReportStats(args);
+        ops.add(opStat);
+      }
+      if(ops.size() == 0)
+        printUsage();
+      // run each bencmark
+      for(OperationStatsBase op : ops) {
+        LOG.info("Starting benchmark: " + op.getOpName());
+        op.benchmark();
+        op.cleanUp();
+      }
+      // print statistics
+      for(OperationStatsBase op : ops) {
+        LOG.info("");
+        op.printResults();
+      }
+    } catch(Exception e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if(bench != null)
+        bench.close();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    runBenchmark(new Configuration(), args);
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java?rev=607126&r1=607125&r2=607126&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java Thu Dec 27 13:39:12 2007
@@ -1,17 +1,17 @@
-package org.apache.hadoop.dfs;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-
-public class TestNNThroughputBenchmark extends TestCase {
-
-  /**
-   * This test runs all benchmarks defined in {@link NNThroughputBenchmark}.
-   */
-  public void testNNThroughput() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set("fs.default.name", "localhost:"+Integer.toString(50017));
-    NameNode.format(conf);
-    NNThroughputBenchmark.runBenchmark(conf, new String[] {"-op", "all"});
-  }
-}
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestNNThroughputBenchmark extends TestCase {
+
+  /**
+   * This test runs all benchmarks defined in {@link NNThroughputBenchmark}.
+   */
+  public void testNNThroughput() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "localhost:"+Integer.toString(50017));
+    NameNode.format(conf);
+    NNThroughputBenchmark.runBenchmark(conf, new String[] {"-op", "all"});
+  }
+}



Mime
View raw message