Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 72521 invoked from network); 7 Dec 2006 18:13:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 7 Dec 2006 18:13:04 -0000 Received: (qmail 22998 invoked by uid 500); 7 Dec 2006 18:13:12 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 22912 invoked by uid 500); 7 Dec 2006 18:13:12 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 22901 invoked by uid 99); 7 Dec 2006 18:13:12 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Dec 2006 10:13:12 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Dec 2006 10:13:02 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id F1A921A984E; Thu, 7 Dec 2006 10:12:20 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r483588 - in /lucene/hadoop/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/NNBench.java Date: Thu, 07 Dec 2006 18:12:20 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061207181220.F1A921A984E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Dec 7 10:12:19 2006 New Revision: 483588 URL: http://svn.apache.org/viewvc?view=rev&rev=483588 Log: HADOOP-763. Change DFS namenode benchmark to not use MapReduce. Contributed by Nigel. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483588&r1=483587&r2=483588 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 10:12:19 2006 @@ -8,6 +8,9 @@ 2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped input files. (Hairong Kuang via cutting) + 3. HADOOP-763. Change DFS namenode benchmark to not use MapReduce. + (Nigel Daley via cutting) + Release 0.9.0 - 2006-12-01 Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=483588&r1=483587&r2=483588 ============================================================================== --- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original) +++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Dec 7 10:12:19 2006 @@ -20,206 +20,269 @@ import java.io.IOException; import java.util.Date; -import java.util.Iterator; -import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.util.Progressable; /** - * This program uses map/reduce to run a distributed job where there is - * no interaction between the tasks. Each task creates a configurable - * number of files. Each file has a configurable number of bytes - * written to it, then it is closed, re-opened, and read from, and - * re-closed. This program functions as a stress-test and benchmark - * for namenode, especially when the number of bytes written to - * each file is small. + * This program executes a specified operation that applies load to + * the NameNode. Possible operations include create/writing files, + * opening/reading files, renaming files, and deleting files. * - * @author Milind Bhandarkar + * When run simultaneously on multiple nodes, this program functions + * as a stress-test and benchmark for namenode, especially when + * the number of bytes written to each file is small. + * + * @author Nigel Daley */ -public class NNBench extends MapReduceBase implements Reducer { - - public static class Map extends MapReduceBase implements Mapper { - private FileSystem fileSys = null; - private int numBytesToWrite; - private Random random = new Random(); - private String taskId = null; - private Path topDir = null; - - private void randomizeBytes(byte[] data, int offset, int length) { - for(int i=offset + length - 1; i >= offset; --i) { - data[i] = (byte) random.nextInt(256); +public class NNBench { + // variable initialzed from command line arguments + private static long startTime = 0; + private static int numFiles = 0; + private static long bytesPerBlock = 1; + private static long blocksPerFile = 0; + private static long bytesPerFile = 1; + private static Path baseDir = null; + + // variables initialized in main() + private static FileSystem fileSys = null; + private static Path taskDir = null; + private static String uniqueId = null; + private static byte[] buffer; + + /** + * Returns when the current number of seconds from the epoch equals + * the command line argument given by -startTime. + * This allows multiple instances of this program, running on clock + * synchronized nodes, to start at roughly the same time. + */ + static void barrier() { + long sleepTime; + while ((sleepTime = startTime - System.currentTimeMillis()) > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ex) { + } } } /** - * Given a number of files to create, create and open those files - * for both writing and reading a given number of bytes. + * Create and write to a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught */ - public void map(WritableComparable key, - Writable value, - OutputCollector output, - Reporter reporter) throws IOException { - int nFiles = ((IntWritable) value).get(); - Path taskDir = new Path(topDir, taskId); - if (!fileSys.mkdirs(taskDir)) { - throw new IOException("Mkdirs failed to create " + taskDir.toString()); - } - byte[] buffer = new byte[32768]; - for (int index = 0; index < nFiles; index++) { - FSDataOutputStream out = fileSys.create( - new Path(taskDir, Integer.toString(index))); - int toBeWritten = numBytesToWrite; + static int createWrite() { + int exceptions = 0; + FSOutputStream out = null; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // create file until is succeeds + try { + out = fileSys.createRaw( + new Path(taskDir, "" + index), false, (short)1, bytesPerBlock); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + long toBeWritten = bytesPerFile; while (toBeWritten > 0) { - int nbytes = Math.min(buffer.length, toBeWritten); - randomizeBytes(buffer, 0, nbytes); + int nbytes = (int) Math.min(buffer.length, toBeWritten); toBeWritten -= nbytes; - out.write(buffer, 0, nbytes); - reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) + - " bytes for "+ index +"th file."); + try { // only try once + out.write(buffer, 0, nbytes); + } catch (IOException ioe) { + exceptions++; + } } - out.close(); + do { // close file until is succeeds + try { + out.close(); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); } - for (int index = 0; index < nFiles; index++) { - FSDataInputStream in = fileSys.open( - new Path(taskDir, Integer.toString(index))); - int toBeRead = numBytesToWrite; - while (toBeRead > 0) { - int nbytes = Math.min(buffer.length, toBeRead); - toBeRead -= nbytes; - in.read(buffer, 0, nbytes); - reporter.setStatus("read " + (numBytesToWrite-toBeRead) + - " bytes for "+ index +"th file."); + return exceptions; + } + + /** + * Open and read a given number of files. + * + * @return the number of exceptions caught + */ + static int openRead() { + int exceptions = 0; + FSInputStream in = null; + for (int index = 0; index < numFiles; index++) { + try { + in = fileSys.openRaw(new Path(taskDir, "" + index)); + long toBeRead = bytesPerFile; + while (toBeRead > 0) { + int nbytes = (int) Math.min(buffer.length, toBeRead); + toBeRead -= nbytes; + try { // only try once + in.read(buffer, 0, nbytes); + } catch (IOException ioe) { + exceptions++; + } + } + in.close(); + } catch (IOException ioe) { + exceptions++; } - in.close(); } - fileSys.delete(taskDir); // clean up after yourself - } + return exceptions; + } + + /** + * Rename a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught + */ + static int rename() { + int exceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // rename file until is succeeds + try { + boolean result = fileSys.renameRaw( + new Path(taskDir, "" + index), new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + } + return exceptions; + } /** - * Save the values out of the configuaration that we need to write - * the data. + * Delete a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught */ - public void configure(JobConf job) { - try { - fileSys = FileSystem.get(job); - } catch (IOException e) { - throw new RuntimeException("Can't get default file system", e); - } - numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0); - topDir = new Path(job.get("test.nnbench.topdir", "/nnbench")); - taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString()); - } - - } - - public void reduce(WritableComparable key, - Iterator values, - OutputCollector output, - Reporter reporter) throws IOException { - // nothing - } - + static int delete() { + int exceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // delete file until is succeeds + try { + boolean result = fileSys.deleteRaw(new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + } + return exceptions; + } + /** - * This is the main routine for launching a distributed namenode stress test. - * It runs 10 maps/node. The reduce doesn't do anything. - * - * @throws IOException + * This launches a given namenode operation (-operation), + * starting at a given time (-startTime). The files used + * by the openRead, rename, and delete operations are the same files + * created by the createWrite operation. Typically, the program + * would be run four times, once for each operation in this order: + * createWrite, openRead, rename, delete. + * + *
+   * Usage: nnbench 
+   *          -operation 
+   *          -baseDir 
+   *          -startTime 
+ * + * @throws IOException indicates a problem with test startup */ public static void main(String[] args) throws IOException { - Configuration defaults = new Configuration(); - if (args.length != 3) { - System.out.println("Usage: nnbench "); - return; - } - Path outDir = new Path(args[0]); - int filesPerMap = Integer.parseInt(args[1]); - int numBytesPerFile = Integer.parseInt(args[2]); - - JobConf jobConf = new JobConf(defaults, NNBench.class); - jobConf.setJobName("nnbench"); - jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile); - jobConf.set("test.nnbench.topdir", args[0]); - - // turn off speculative execution, because DFS doesn't handle - // multiple writers to the same file. - jobConf.setSpeculativeExecution(false); - jobConf.setInputFormat(SequenceFileInputFormat.class); - jobConf.setOutputKeyClass(BytesWritable.class); - jobConf.setOutputValueClass(BytesWritable.class); - - jobConf.setMapperClass(Map.class); - jobConf.setReducerClass(NNBench.class); - - JobClient client = new JobClient(jobConf); - ClusterStatus cluster = client.getClusterStatus(); - int numMaps = cluster.getTaskTrackers() * - jobConf.getInt("test.nnbench.maps_per_host", 10); - jobConf.setNumMapTasks(numMaps); - System.out.println("Running " + numMaps + " maps."); - jobConf.setNumReduceTasks(1); - - Path tmpDir = new Path("random-work"); - Path inDir = new Path(tmpDir, "in"); - Path fakeOutDir = new Path(tmpDir, "out"); - FileSystem fileSys = FileSystem.get(jobConf); - if (fileSys.exists(outDir)) { - System.out.println("Error: Output directory " + outDir + - " already exists."); - return; - } - fileSys.delete(tmpDir); - if (!fileSys.mkdirs(inDir)) { - System.out.println("Error: Mkdirs failed to create " + - inDir.toString()); - return; - } - - for(int i=0; i < numMaps; ++i) { - Path file = new Path(inDir, "part"+i); - SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, - jobConf, file, - IntWritable.class, IntWritable.class, - CompressionType.NONE, - (Progressable)null); - writer.append(new IntWritable(0), new IntWritable(filesPerMap)); - writer.close(); + String version = "NameNodeBenchmark.0.3"; + System.out.println(version); + + String usage = + "Usage: nnbench " + + "-operation " + + "-baseDir " + + "-startTime