Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 54319 invoked from network); 21 May 2010 00:02:39 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 May 2010 00:02:39 -0000 Received: (qmail 52250 invoked by uid 500); 21 May 2010 00:02:39 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 52197 invoked by uid 500); 21 May 2010 00:02:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 52189 invoked by uid 99); 21 May 2010 00:02:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 May 2010 00:02:39 +0000 X-ASF-Spam-Status: No, hits=-1420.6 required=10.0 tests=ALL_TRUSTED,AWL,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 May 2010 00:02:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7927F238897F; Fri, 21 May 2010 00:02:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r946832 [1/3] - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/fs/slive/ Date: Fri, 21 May 2010 00:02:13 -0000 To: mapreduce-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100521000214.7927F238897F@eris.apache.org> Author: shv Date: Fri May 21 00:02:12 2010 New Revision: 946832 URL: http://svn.apache.org/viewvc?rev=946832&view=rev Log: MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. Contributed by Joshua Harlow. Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataVerifier.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataWriter.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DeleteOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DummyInputFormat.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Formatter.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Helper.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ListOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/MkdirOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ObserveableOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Operation.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationData.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationFactory.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationWeight.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Range.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReadOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RenameOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java (with props) hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java (with props) Modified: hadoop/mapreduce/trunk/CHANGES.txt Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=946832&r1=946831&r2=946832&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 21 00:02:12 2010 @@ -23,6 +23,9 @@ Trunk (unreleased changes) MAPREDUCE-1539. authorization checks for inter-server protocol (based on HADOOP-6600) (Boris Shkolnik via shv) + MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. + (Joshua Harlow via shv) + OPTIMIZATIONS BUG FIXES Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java Fri May 21 00:02:12 2010 @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput; +import org.apache.hadoop.fs.slive.OperationOutput.OutputType; + +/** + * Operation which selects a random file and appends a random amount of bytes + * (selected from the configuration for append size) to that file if it exists. + * + * This operation will capture statistics on success for bytes written, time + * taken (milliseconds), and success count and on failure it will capture the + * number of failures and the time taken (milliseconds) to fail. + */ +class AppendOp extends Operation { + + private static final Log LOG = LogFactory.getLog(AppendOp.class); + + AppendOp(ConfigExtractor cfg, Random rnd) { + super(AppendOp.class.getSimpleName(), cfg, rnd); + } + + /** + * Gets the file to append to + * + * @return Path + */ + protected Path getAppendFile() { + Path fn = getFinder().getFile(); + return fn; + } + + @Override // Operation + List run(FileSystem fs) { + List out = super.run(fs); + OutputStream os = null; + try { + Path fn = getAppendFile(); + // determine file status for file length requirement + // to know if should fill in partial bytes + Range appendSizeRange = getConfig().getAppendSize(); + if (getConfig().shouldAppendUseBlockSize()) { + appendSizeRange = getConfig().getBlockSize(); + } + long appendSize = Range.betweenPositive(getRandom(), appendSizeRange); + long timeTaken = 0, bytesAppended = 0; + DataWriter writer = new DataWriter(getRandom()); + LOG.info("Attempting to append to file at " + fn + " of size " + + Helper.toByteInfo(appendSize)); + { + // open + long startTime = Timer.now(); + os = fs.append(fn); + timeTaken += Timer.elapsed(startTime); + // append given length + GenerateOutput stats = writer.writeSegment(appendSize, os); + timeTaken += stats.getTimeTaken(); + bytesAppended += stats.getBytesWritten(); + // capture close time + startTime = Timer.now(); + os.close(); + os = null; + timeTaken += Timer.elapsed(startTime); + } + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.BYTES_WRITTEN, bytesAppended)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.OK_TIME_TAKEN, timeTaken)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.SUCCESSES, 1L)); + LOG.info("Appended " + Helper.toByteInfo(bytesAppended) + " to file " + + fn + " in " + timeTaken + " milliseconds"); + } catch (FileNotFoundException e) { + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.NOT_FOUND, 1L)); + LOG.warn("Error with appending", e); + } catch (IOException e) { + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.FAILURES, 1L)); + LOG.warn("Error with appending", e); + } finally { + if (os != null) { + try { + os.close(); + } catch (IOException e) { + LOG.warn("Error with closing append stream", e); + } + } + } + return out; + } +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java Fri May 21 00:02:12 2010 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.fs.slive.Constants.Distribution; +import org.apache.hadoop.fs.slive.Constants.OperationType; +import org.apache.hadoop.util.StringUtils; + +/** + * Class which abstracts the parsing of command line arguments for slive test + */ +class ArgumentParser { + + private Options optList; + private String[] argumentList; + private ParsedOutput parsed; + + /** + * Result of a parse is the following object + */ + static class ParsedOutput { + private CommandLine parsedData; + private ArgumentParser source; + private boolean needHelp; + + ParsedOutput(CommandLine parsedData, ArgumentParser source, + boolean needHelp) { + this.parsedData = parsedData; + this.source = source; + this.needHelp = needHelp; + } + + /** + * @return whether the calling object should call output help and exit + */ + boolean shouldOutputHelp() { + return needHelp; + } + + /** + * Outputs the formatted help to standard out + */ + void outputHelp() { + if (!shouldOutputHelp()) { + return; + } + if (source != null) { + HelpFormatter hlp = new HelpFormatter(); + hlp.printHelp(Constants.PROG_NAME + " " + Constants.PROG_VERSION, + source.getOptionList()); + } + } + + /** + * @param optName + * the option name to get the value for + * + * @return the option value or null if it does not exist + */ + String getValue(String optName) { + if (parsedData == null) { + return null; + } + return parsedData.getOptionValue(optName); + } + + public String toString() { + StringBuilder s = new StringBuilder(); + if (parsedData != null) { + Option[] ops = parsedData.getOptions(); + for (int i = 0; i < ops.length; ++i) { + s.append(ops[i].getOpt() + " = " + s.append(ops[i].getValue()) + ","); + } + } + return s.toString(); + } + + } + + ArgumentParser(String[] args) { + optList = getOptions(); + if (args == null) { + args = new String[] {}; + } + argumentList = args; + parsed = null; + } + + private Options getOptionList() { + return optList; + } + + /** + * Parses the command line options + * + * @return false if need to print help output + * + * @throws Exception + * when parsing fails + */ + ParsedOutput parse() throws Exception { + if (parsed == null) { + PosixParser parser = new PosixParser(); + CommandLine popts = parser.parse(getOptionList(), argumentList, true); + if (popts.hasOption(ConfigOption.HELP.getOpt())) { + parsed = new ParsedOutput(null, this, true); + } else { + parsed = new ParsedOutput(popts, this, false); + } + } + return parsed; + } + + /** + * @return the option set to be used in command line parsing + */ + private Options getOptions() { + Options cliopt = new Options(); + cliopt.addOption(ConfigOption.MAPS); + cliopt.addOption(ConfigOption.PACKET_SIZE); + cliopt.addOption(ConfigOption.OPS); + cliopt.addOption(ConfigOption.DURATION); + cliopt.addOption(ConfigOption.EXIT_ON_ERROR); + cliopt.addOption(ConfigOption.SLEEP_TIME); + cliopt.addOption(ConfigOption.FILES); + cliopt.addOption(ConfigOption.DIR_SIZE); + cliopt.addOption(ConfigOption.BASE_DIR); + cliopt.addOption(ConfigOption.RESULT_FILE); + cliopt.addOption(ConfigOption.CLEANUP); + { + String distStrs[] = new String[Distribution.values().length]; + Distribution distValues[] = Distribution.values(); + for (int i = 0; i < distValues.length; ++i) { + distStrs[i] = distValues[i].lowerName(); + } + String opdesc = String.format(Constants.OP_DESCR, StringUtils + .arrayToString(distStrs)); + for (OperationType type : OperationType.values()) { + String opname = type.lowerName(); + cliopt.addOption(new Option(opname, true, opdesc)); + } + } + cliopt.addOption(ConfigOption.REPLICATION_AM); + cliopt.addOption(ConfigOption.BLOCK_SIZE); + cliopt.addOption(ConfigOption.READ_SIZE); + cliopt.addOption(ConfigOption.WRITE_SIZE); + cliopt.addOption(ConfigOption.APPEND_SIZE); + cliopt.addOption(ConfigOption.RANDOM_SEED); + cliopt.addOption(ConfigOption.QUEUE_NAME); + cliopt.addOption(ConfigOption.HELP); + return cliopt; + } + +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java Fri May 21 00:02:12 2010 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.io.IOException; + +/** + * Exception used to signify file reading failures where headers are bad or an + * unexpected EOF occurs when it should not. + */ +class BadFileException extends IOException { + + private static final long serialVersionUID = 463201983951298129L; + + BadFileException(String msg) { + super(msg); + } + + BadFileException(String msg, Throwable e) { + super(msg, e); + } + +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java Fri May 21 00:02:12 2010 @@ -0,0 +1,731 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.text.NumberFormat; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.slive.Constants.OperationType; +import org.apache.hadoop.util.StringUtils; + +/** + * Simple access layer onto of a configuration object that extracts the slive + * specific configuration values needed for slive running + */ +class ConfigExtractor { + + private static final Log LOG = LogFactory.getLog(ConfigExtractor.class); + + private Configuration config; + + ConfigExtractor(Configuration cfg) { + this.config = cfg; + } + + /** + * @return the wrapped configuration that this extractor will use + */ + Configuration getConfig() { + return this.config; + } + + /** + * @return the location of where data should be written to + */ + Path getDataPath() { + Path base = getBaseDirectory(); + if (base == null) { + return null; + } + return new Path(base, Constants.DATA_DIR); + } + + /** + * @return the location of where the reducer should write its data to + */ + Path getOutputPath() { + Path base = getBaseDirectory(); + if (base == null) { + return null; + } + return new Path(base, Constants.OUTPUT_DIR); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * + * @return the base directory where output & data should be stored using + * primary,config,default (in that order) + */ + Path getBaseDirectory(String primary) { + String path = primary; + if (path == null) { + path = config.get(ConfigOption.BASE_DIR.getCfgOption()); + } + if (path == null) { + path = ConfigOption.BASE_DIR.getDefault(); + } + if (path == null) { + return null; + } + return new Path(path); + } + + /** + * @return the base directory using only config and default values + */ + Path getBaseDirectory() { + return getBaseDirectory(null); + } + + /** + * @return whether the mapper or reducer should exit when they get there first + * error using only config and default values + */ + boolean shouldExitOnFirstError() { + return shouldExitOnFirstError(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * + * @return the boolean of whether the mapper/reducer should exit when they + * first error from primary,config,default (in that order) + */ + boolean shouldExitOnFirstError(String primary) { + String val = primary; + if (val == null) { + val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption()); + } + if (val == null) { + return ConfigOption.EXIT_ON_ERROR.getDefault(); + } + return Boolean.parseBoolean(val); + } + + /** + * @return the number of reducers to use + */ + Integer getReducerAmount() { + return 1; + } + + /** + * @return the number of mappers to use using config and default values for + * lookup + */ + Integer getMapAmount() { + return getMapAmount(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the reducer amount to use + */ + Integer getMapAmount(String primary) { + return getInteger(primary, ConfigOption.MAPS); + } + + /** + * @return the duration in seconds (or null or Integer.MAX for no limit) using + * the configuration and default as lookup + */ + Integer getDuration() { + return getDuration(null); + } + + /** + * @return the duration in milliseconds or null if no limit using config and + * default as lookup + */ + Integer getDurationMilliseconds() { + Integer seconds = getDuration(); + if (seconds == null || seconds == Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + int milliseconds = (seconds * 1000); + if (milliseconds < 0) { + milliseconds = 0; + } + return milliseconds; + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the duration in seconds (or null or Integer.MAX for no limit) + */ + Integer getDuration(String primary) { + return getInteger(primary, ConfigOption.DURATION); + } + + /** + * @return the total number of operations to run using config and default as + * lookup + */ + Integer getOpCount() { + return getOpCount(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the total number of operations to run + */ + Integer getOpCount(String primary) { + return getInteger(primary, ConfigOption.OPS); + } + + /** + * @return the total number of files per directory using config and default as + * lookup + */ + Integer getDirSize() { + return getDirSize(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the total number of files per directory + */ + Integer getDirSize(String primary) { + return getInteger(primary, ConfigOption.DIR_SIZE); + } + + /** + * @param primary + * the primary string to attempt to convert into a integer + * @param opt + * the option to use as secondary + default if no primary given + * @return a parsed integer + */ + private Integer getInteger(String primary, ConfigOption opt) { + String value = primary; + if (value == null) { + value = config.get(opt.getCfgOption()); + } + if (value == null) { + return opt.getDefault(); + } + return Integer.parseInt(value); + } + + /** + * @return the total number of files allowed using configuration and default + * for lookup + */ + Integer getTotalFiles() { + return getTotalFiles(null); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the total number of files allowed + */ + Integer getTotalFiles(String primary) { + return getInteger(primary, ConfigOption.FILES); + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the random seed start point or null if none + */ + Long getRandomSeed(String primary) { + String seed = primary; + if (seed == null) { + seed = config.get(ConfigOption.RANDOM_SEED.getCfgOption()); + } + if (seed == null) { + return null; + } + return Long.parseLong(seed); + } + + /** + * @return the random seed start point or null if none using config and then + * default as lookup + */ + Long getRandomSeed() { + return getRandomSeed(null); + } + + /** + * @return the result file location or null if none using config and then + * default as lookup + */ + String getResultFile() { + return getResultFile(null); + } + + /** + * Gets the grid queue name to run on using config and default only + * + * @return String + */ + String getQueueName() { + return getQueueName(null); + } + + /** + * Gets the grid queue name to run on using the primary string or config or + * default + * + * @param primary + * + * @return String + */ + String getQueueName(String primary) { + String q = primary; + if (q == null) { + q = config.get(ConfigOption.QUEUE_NAME.getCfgOption()); + } + if (q == null) { + q = ConfigOption.QUEUE_NAME.getDefault(); + } + return q; + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the result file location + */ + String getResultFile(String primary) { + String fn = primary; + if (fn == null) { + fn = config.get(ConfigOption.RESULT_FILE.getCfgOption()); + } + if (fn == null) { + fn = ConfigOption.RESULT_FILE.getDefault(); + } + return fn; + } + + /** + * @param primary + * primary the initial string to be used for the value of this + * configuration option (if not provided then config and then the + * default are used) + * @return the integer range allowed for the block size + */ + Range getBlockSize(String primary) { + return getMinMaxBytes(ConfigOption.BLOCK_SIZE, primary); + } + + /** + * @return the integer range allowed for the block size using config and + * default for lookup + */ + Range getBlockSize() { + return getBlockSize(null); + } + + /** + * @param cfgopt + * the configuration option to use for config and default lookup + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the parsed short range from primary, config, default + */ + private Range getMinMaxShort(ConfigOption cfgopt, String primary) { + String sval = primary; + if (sval == null) { + sval = config.get(cfgopt.getCfgOption()); + } + Range range = null; + if (sval != null) { + String pieces[] = Helper.getTrimmedStrings(sval); + if (pieces.length == 2) { + String min = pieces[0]; + String max = pieces[1]; + short minVal = Short.parseShort(min); + short maxVal = Short.parseShort(max); + if (minVal > maxVal) { + short tmp = minVal; + minVal = maxVal; + maxVal = tmp; + } + range = new Range(minVal, maxVal); + } + } + if (range == null) { + Short def = cfgopt.getDefault(); + if (def != null) { + range = new Range(def, def); + } + } + return range; + } + + /** + * @param cfgopt + * the configuration option to use for config and default lookup + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the parsed long range from primary, config, default + */ + private Range getMinMaxLong(ConfigOption cfgopt, String primary) { + String sval = primary; + if (sval == null) { + sval = config.get(cfgopt.getCfgOption()); + } + Range range = null; + if (sval != null) { + String pieces[] = Helper.getTrimmedStrings(sval); + if (pieces.length == 2) { + String min = pieces[0]; + String max = pieces[1]; + long minVal = Long.parseLong(min); + long maxVal = Long.parseLong(max); + if (minVal > maxVal) { + long tmp = minVal; + minVal = maxVal; + maxVal = tmp; + } + range = new Range(minVal, maxVal); + } + } + if (range == null) { + Long def = cfgopt.getDefault(); + if (def != null) { + range = new Range(def, def); + } + } + return range; + } + + /** + * @param cfgopt + * the configuration option to use for config and default lookup + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the parsed integer byte range from primary, config, default + */ + private Range getMinMaxBytes(ConfigOption cfgopt, String primary) { + String sval = primary; + if (sval == null) { + sval = config.get(cfgopt.getCfgOption()); + } + Range range = null; + if (sval != null) { + String pieces[] = Helper.getTrimmedStrings(sval); + if (pieces.length == 2) { + String min = pieces[0]; + String max = pieces[1]; + long tMin = StringUtils.TraditionalBinaryPrefix.string2long(min); + long tMax = StringUtils.TraditionalBinaryPrefix.string2long(max); + if (tMin > tMax) { + long tmp = tMin; + tMin = tMax; + tMax = tmp; + } + range = new Range(tMin, tMax); + } + } + if (range == null) { + Long def = cfgopt.getDefault(); + if (def != null) { + range = new Range(def, def); + } + } + return range; + } + + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the replication range + */ + Range getReplication(String primary) { + return getMinMaxShort(ConfigOption.REPLICATION_AM, primary); + } + + /** + * @return the replication range using config and default for lookup + */ + Range getReplication() { + return getReplication(null); + } + + /** + * @return the map of operations to perform using config (percent may be null + * if unspecified) + */ + Map getOperations() { + Map operations = new HashMap(); + for (OperationType type : OperationType.values()) { + String opname = type.lowerName(); + String keyname = String.format(Constants.OP, opname); + String kval = config.get(keyname); + if (kval == null) { + continue; + } + operations.put(type, new OperationData(kval)); + } + return operations; + } + + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the append byte size range (or null if none) + */ + Range getAppendSize(String primary) { + return getMinMaxBytes(ConfigOption.APPEND_SIZE, primary); + } + + /** + * @return the append byte size range (or null if none) using config and + * default for lookup + */ + Range getAppendSize() { + return getAppendSize(null); + } + + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the sleep range (or null if none) + */ + Range getSleepRange(String primary) { + return getMinMaxLong(ConfigOption.SLEEP_TIME, primary); + } + + /** + * @return the sleep range (or null if none) using config and default for + * lookup + */ + Range getSleepRange() { + return getSleepRange(null); + } + + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the write byte size range (or null if none) + */ + Range getWriteSize(String primary) { + return getMinMaxBytes(ConfigOption.WRITE_SIZE, primary); + } + + /** + * @return the write byte size range (or null if none) using config and + * default for lookup + */ + Range getWriteSize() { + return getWriteSize(null); + } + + /** + * Returns whether the write range should use the block size range + * + * @return true|false + */ + boolean shouldWriteUseBlockSize() { + Range writeRange = getWriteSize(); + if (writeRange == null + || (writeRange.getLower() == writeRange.getUpper() && (writeRange + .getUpper() == Long.MAX_VALUE))) { + return true; + } + return false; + } + + /** + * Returns whether the append range should use the block size range + * + * @return true|false + */ + boolean shouldAppendUseBlockSize() { + Range appendRange = getAppendSize(); + if (appendRange == null + || (appendRange.getLower() == appendRange.getUpper() && (appendRange + .getUpper() == Long.MAX_VALUE))) { + return true; + } + return false; + } + + /** + * Returns whether the read range should use the entire file + * + * @return true|false + */ + boolean shouldReadFullFile() { + Range readRange = getReadSize(); + if (readRange == null + || (readRange.getLower() == readRange.getUpper() && (readRange + .getUpper() == Long.MAX_VALUE))) { + return true; + } + return false; + } + + /** + * @param primary + * the initial string to be used for the value of this configuration + * option (if not provided then config and then the default are used) + * @return the read byte size range (or null if none) + */ + Range getReadSize(String primary) { + return getMinMaxBytes(ConfigOption.READ_SIZE, primary); + } + + /** + * Gets the bytes per checksum (if it exists or null if not) + * + * @return Long + */ + Long getByteCheckSum() { + String val = config.get(Constants.BYTES_PER_CHECKSUM); + if(val == null) { + return null; + } + return Long.parseLong(val); + } + + /** + * @return the read byte size range (or null if none) using config and default + * for lookup + */ + Range getReadSize() { + return getReadSize(null); + } + + /** + * Dumps out the given options for the given config extractor + * + * @param cfg + * the config to write to the log + */ + static void dumpOptions(ConfigExtractor cfg) { + if (cfg == null) { + return; + } + LOG.info("Base directory = " + cfg.getBaseDirectory()); + LOG.info("Data directory = " + cfg.getDataPath()); + LOG.info("Output directory = " + cfg.getOutputPath()); + LOG.info("Result file = " + cfg.getResultFile()); + LOG.info("Grid queue = " + cfg.getQueueName()); + LOG.info("Should exit on first error = " + cfg.shouldExitOnFirstError()); + { + String duration = "Duration = "; + if (cfg.getDurationMilliseconds() == Integer.MAX_VALUE) { + duration += "unlimited"; + } else { + duration += cfg.getDurationMilliseconds() + " milliseconds"; + } + LOG.info(duration); + } + LOG.info("Map amount = " + cfg.getMapAmount()); + LOG.info("Operation amount = " + cfg.getOpCount()); + LOG.info("Total file limit = " + cfg.getTotalFiles()); + LOG.info("Total dir file limit = " + cfg.getDirSize()); + { + String read = "Read size = "; + if (cfg.shouldReadFullFile()) { + read += "entire file"; + } else { + read += cfg.getReadSize() + " bytes"; + } + LOG.info(read); + } + { + String write = "Write size = "; + if (cfg.shouldWriteUseBlockSize()) { + write += "blocksize"; + } else { + write += cfg.getWriteSize() + " bytes"; + } + LOG.info(write); + } + { + String append = "Append size = "; + if (cfg.shouldAppendUseBlockSize()) { + append += "blocksize"; + } else { + append += cfg.getAppendSize() + " bytes"; + } + LOG.info(append); + } + { + String bsize = "Block size = "; + bsize += cfg.getBlockSize() + " bytes"; + LOG.info(bsize); + } + if (cfg.getRandomSeed() != null) { + LOG.info("Random seed = " + cfg.getRandomSeed()); + } + if (cfg.getSleepRange() != null) { + LOG.info("Sleep range = " + cfg.getSleepRange() + " milliseconds"); + } + LOG.info("Replication amount = " + cfg.getReplication()); + LOG.info("Operations are:"); + NumberFormat percFormatter = Formatter.getPercentFormatter(); + Map operations = cfg.getOperations(); + for (OperationType type : operations.keySet()) { + String name = type.name(); + LOG.info(name); + OperationData opInfo = operations.get(type); + LOG.info(" " + opInfo.getDistribution().name()); + if (opInfo.getPercent() != null) { + LOG.info(" " + percFormatter.format(opInfo.getPercent())); + } else { + LOG.info(" ???"); + } + } + } + +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java Fri May 21 00:02:12 2010 @@ -0,0 +1,554 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput; +import org.apache.hadoop.fs.slive.Constants.Distribution; +import org.apache.hadoop.fs.slive.Constants.OperationType; +import org.apache.hadoop.util.StringUtils; + +/** + * Class which merges options given from a config file and the command line and + * performs some basic verification of the data retrieved and sets the verified + * values back into the configuration object for return + */ +class ConfigMerger { + /** + * Exception that represents config problems... + */ + static class ConfigException extends IOException { + + private static final long serialVersionUID = 2047129184917444550L; + + ConfigException(String msg) { + super(msg); + } + + ConfigException(String msg, Throwable e) { + super(msg, e); + } + } + + /** + * Merges the given command line parsed output with the given configuration + * object and returns the new configuration object with the correct options + * overwritten + * + * @param opts + * the parsed command line option output + * @param base + * the base configuration to merge with + * @return merged configuration object + * @throws ConfigException + * when configuration errors or verification occur + */ + Configuration getMerged(ParsedOutput opts, Configuration base) + throws ConfigException { + return handleOptions(opts, base); + } + + /** + * Gets the base set of operations to use + * + * @return Map + */ + private Map getBaseOperations() { + Map base = new HashMap(); + // add in all the operations + // since they will all be applied unless changed + OperationType[] types = OperationType.values(); + for (OperationType type : types) { + base.put(type, new OperationData(Distribution.UNIFORM, null)); + } + return base; + } + + /** + * Handles the specific task of merging operations from the command line or + * extractor object into the base configuration provided + * + * @param opts + * the parsed command line option output + * @param base + * the base configuration to merge with + * @param extractor + * the access object to fetch operations from if none from the + * command line + * @return merged configuration object + * @throws ConfigException + * when verification fails + */ + private Configuration handleOperations(ParsedOutput opts, Configuration base, + ConfigExtractor extractor) throws ConfigException { + // get the base set to start off with + Map operations = getBaseOperations(); + // merge with what is coming from config + Map cfgOperations = extractor.getOperations(); + for (OperationType opType : cfgOperations.keySet()) { + operations.put(opType, cfgOperations.get(opType)); + } + // see if any coming in from the command line + for (OperationType opType : OperationType.values()) { + String opName = opType.lowerName(); + String opVal = opts.getValue(opName); + if (opVal != null) { + operations.put(opType, new OperationData(opVal)); + } + } + // remove those with <= zero percent + { + Map cleanedOps = new HashMap(); + for (OperationType opType : operations.keySet()) { + OperationData data = operations.get(opType); + if (data.getPercent() == null || data.getPercent() > 0.0d) { + cleanedOps.put(opType, data); + } + } + operations = cleanedOps; + } + if (operations.isEmpty()) { + throw new ConfigException("No operations provided!"); + } + // verify and adjust + double currPct = 0; + int needFill = 0; + for (OperationType type : operations.keySet()) { + OperationData op = operations.get(type); + if (op.getPercent() != null) { + currPct += op.getPercent(); + } else { + needFill++; + } + } + if (currPct > 1) { + throw new ConfigException( + "Unable to have accumlative percent greater than 100%"); + } + if (needFill > 0 && currPct < 1) { + double leftOver = 1.0 - currPct; + Map mpcp = new HashMap(); + for (OperationType type : operations.keySet()) { + OperationData op = operations.get(type); + if (op.getPercent() == null) { + op = new OperationData(op.getDistribution(), (leftOver / needFill)); + } + mpcp.put(type, op); + } + operations = mpcp; + } else if (needFill == 0 && currPct < 1) { + // redistribute + double leftOver = 1.0 - currPct; + Map mpcp = new HashMap(); + double each = leftOver / operations.keySet().size(); + for (OperationType t : operations.keySet()) { + OperationData op = operations.get(t); + op = new OperationData(op.getDistribution(), (op.getPercent() + each)); + mpcp.put(t, op); + } + operations = mpcp; + } else if (needFill > 0 && currPct >= 1) { + throw new ConfigException(needFill + + " unfilled operations but no percentage left to fill with"); + } + // save into base + for (OperationType opType : operations.keySet()) { + String opName = opType.lowerName(); + OperationData opData = operations.get(opType); + String distr = opData.getDistribution().lowerName(); + String ratio = new Double(opData.getPercent() * 100.0d).toString(); + base.set(String.format(Constants.OP, opName), opData.toString()); + base.set(String.format(Constants.OP_DISTR, opName), distr); + base.set(String.format(Constants.OP_PERCENT, opName), ratio); + } + return base; + } + + /** + * Handles merging all options and verifying from the given command line + * output and the given base configuration and returns the merged + * configuration + * + * @param opts + * the parsed command line option output + * @param base + * the base configuration to merge with + * @return the merged configuration + * @throws ConfigException + */ + private Configuration handleOptions(ParsedOutput opts, Configuration base) + throws ConfigException { + // ensure variables are overwritten and verified + ConfigExtractor extractor = new ConfigExtractor(base); + // overwrite the map amount and check to ensure > 0 + { + Integer mapAmount = null; + try { + mapAmount = extractor.getMapAmount(opts.getValue(ConfigOption.MAPS + .getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging map amount", e); + } + if (mapAmount != null) { + if (mapAmount <= 0) { + throw new ConfigException( + "Map amount can not be less than or equal to zero"); + } + base.set(ConfigOption.MAPS.getCfgOption(), mapAmount.toString()); + } + } + // overwrite the duration amount and ensure > 0 + { + Integer duration = null; + try { + duration = extractor.getDuration(opts.getValue(ConfigOption.DURATION + .getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging duration", e); + } + if (duration != null) { + if (duration <= 0) { + throw new ConfigException( + "Duration can not be less than or equal to zero"); + } + base.set(ConfigOption.DURATION.getCfgOption(), duration.toString()); + } + } + // overwrite the operation amount and ensure > 0 + { + Integer operationAmount = null; + try { + operationAmount = extractor.getOpCount(opts.getValue(ConfigOption.OPS + .getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging operation amount", e); + } + if (operationAmount != null) { + if (operationAmount <= 0) { + throw new ConfigException( + "Operation amount can not be less than or equal to zero"); + } + base.set(ConfigOption.OPS.getCfgOption(), operationAmount.toString()); + } + } + // overwrite the exit on error setting + { + try { + boolean exitOnError = extractor.shouldExitOnFirstError(opts + .getValue(ConfigOption.EXIT_ON_ERROR.getOpt())); + base.setBoolean(ConfigOption.EXIT_ON_ERROR.getCfgOption(), exitOnError); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging exit on error value", e); + } + } + // verify and set file limit and ensure > 0 + { + Integer fileAm = null; + try { + fileAm = extractor.getTotalFiles(opts.getValue(ConfigOption.FILES + .getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging total file limit amount", e); + } + if (fileAm != null) { + if (fileAm <= 0) { + throw new ConfigException( + "File amount can not be less than or equal to zero"); + } + base.set(ConfigOption.FILES.getCfgOption(), fileAm.toString()); + } + } + // set the grid queue to run on + { + try { + String qname = extractor.getQueueName(opts + .getValue(ConfigOption.QUEUE_NAME.getOpt())); + if (qname != null) { + base.set(ConfigOption.QUEUE_NAME.getCfgOption(), qname); + } + } catch (Exception e) { + throw new ConfigException("Error extracting & merging queue name", e); + } + } + // verify and set the directory limit and ensure > 0 + { + Integer directoryLimit = null; + try { + directoryLimit = extractor.getDirSize(opts + .getValue(ConfigOption.DIR_SIZE.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging directory file limit", e); + } + if (directoryLimit != null) { + if (directoryLimit <= 0) { + throw new ConfigException( + "Directory file limit can not be less than or equal to zero"); + } + base.set(ConfigOption.DIR_SIZE.getCfgOption(), directoryLimit + .toString()); + } + } + // set the base directory + { + Path basedir = null; + try { + basedir = extractor.getBaseDirectory(opts + .getValue(ConfigOption.BASE_DIR.getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging base directory", + e); + } + if (basedir != null) { + // always ensure in slive dir + basedir = new Path(basedir, Constants.BASE_DIR); + base.set(ConfigOption.BASE_DIR.getCfgOption(), basedir.toString()); + } + } + // set the result file + { + String fn = null; + try { + fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE + .getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging result file", e); + } + if (fn != null) { + base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn); + } + } + { + String fn = null; + try { + fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE + .getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging result file", e); + } + if (fn != null) { + base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn); + } + } + // set the operations + { + try { + base = handleOperations(opts, base, extractor); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging operations", e); + } + } + // set the replication amount range + { + Range replicationAm = null; + try { + replicationAm = extractor.getReplication(opts + .getValue(ConfigOption.REPLICATION_AM.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging replication amount range", e); + } + if (replicationAm != null) { + int minRepl = base.getInt(Constants.MIN_REPLICATION, 1); + if (replicationAm.getLower() < minRepl) { + throw new ConfigException( + "Replication amount minimum is less than property configured minimum " + + minRepl); + } + if (replicationAm.getLower() > replicationAm.getUpper()) { + throw new ConfigException( + "Replication amount minimum is greater than its maximum"); + } + if (replicationAm.getLower() <= 0) { + throw new ConfigException( + "Replication amount minimum must be greater than zero"); + } + base.set(ConfigOption.REPLICATION_AM.getCfgOption(), replicationAm + .toString()); + } + } + // set the sleep range + { + Range sleepRange = null; + try { + sleepRange = extractor.getSleepRange(opts + .getValue(ConfigOption.SLEEP_TIME.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging sleep size range", e); + } + if (sleepRange != null) { + if (sleepRange.getLower() > sleepRange.getUpper()) { + throw new ConfigException( + "Sleep range minimum is greater than its maximum"); + } + if (sleepRange.getLower() <= 0) { + throw new ConfigException( + "Sleep range minimum must be greater than zero"); + } + base.set(ConfigOption.SLEEP_TIME.getCfgOption(), sleepRange.toString()); + } + } + // set the packet size if given + { + String pSize = opts.getValue(ConfigOption.PACKET_SIZE.getOpt()); + if (pSize == null) { + pSize = ConfigOption.PACKET_SIZE.getDefault(); + } + if (pSize != null) { + try { + Long packetSize = StringUtils.TraditionalBinaryPrefix + .string2long(pSize); + base.set(ConfigOption.PACKET_SIZE.getCfgOption(), packetSize + .toString()); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging write packet size", e); + } + } + } + // set the block size range + { + Range blockSize = null; + try { + blockSize = extractor.getBlockSize(opts + .getValue(ConfigOption.BLOCK_SIZE.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging block size range", e); + } + if (blockSize != null) { + if (blockSize.getLower() > blockSize.getUpper()) { + throw new ConfigException( + "Block size minimum is greater than its maximum"); + } + if (blockSize.getLower() <= 0) { + throw new ConfigException( + "Block size minimum must be greater than zero"); + } + // ensure block size is a multiple of BYTES_PER_CHECKSUM + // if a value is set in the configuration + Long bytesPerChecksum = extractor.getByteCheckSum(); + if (bytesPerChecksum != null) { + if ((blockSize.getLower() % bytesPerChecksum) != 0) { + throw new ConfigException( + "Blocksize lower bound must be a multiple of " + + bytesPerChecksum); + } + if ((blockSize.getUpper() % bytesPerChecksum) != 0) { + throw new ConfigException( + "Blocksize upper bound must be a multiple of " + + bytesPerChecksum); + } + } + base.set(ConfigOption.BLOCK_SIZE.getCfgOption(), blockSize.toString()); + } + } + // set the read size range + { + Range readSize = null; + try { + readSize = extractor.getReadSize(opts.getValue(ConfigOption.READ_SIZE + .getOpt())); + } catch (Exception e) { + throw new ConfigException("Error extracting & merging read size range", + e); + } + if (readSize != null) { + if (readSize.getLower() > readSize.getUpper()) { + throw new ConfigException( + "Read size minimum is greater than its maximum"); + } + if (readSize.getLower() < 0) { + throw new ConfigException( + "Read size minimum must be greater than or equal to zero"); + } + base.set(ConfigOption.READ_SIZE.getCfgOption(), readSize.toString()); + } + } + // set the write size range + { + Range writeSize = null; + try { + writeSize = extractor.getWriteSize(opts + .getValue(ConfigOption.WRITE_SIZE.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging write size range", e); + } + if (writeSize != null) { + if (writeSize.getLower() > writeSize.getUpper()) { + throw new ConfigException( + "Write size minimum is greater than its maximum"); + } + if (writeSize.getLower() < 0) { + throw new ConfigException( + "Write size minimum must be greater than or equal to zero"); + } + base.set(ConfigOption.WRITE_SIZE.getCfgOption(), writeSize.toString()); + } + } + // set the append size range + { + Range appendSize = null; + try { + appendSize = extractor.getAppendSize(opts + .getValue(ConfigOption.APPEND_SIZE.getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging append size range", e); + } + if (appendSize != null) { + if (appendSize.getLower() > appendSize.getUpper()) { + throw new ConfigException( + "Append size minimum is greater than its maximum"); + } + if (appendSize.getLower() < 0) { + throw new ConfigException( + "Append size minimum must be greater than or equal to zero"); + } + base + .set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString()); + } + } + // set the seed + { + Long seed = null; + try { + seed = extractor.getRandomSeed(opts.getValue(ConfigOption.RANDOM_SEED + .getOpt())); + } catch (Exception e) { + throw new ConfigException( + "Error extracting & merging random number seed", e); + } + if (seed != null) { + base.set(ConfigOption.RANDOM_SEED.getCfgOption(), seed.toString()); + } + } + return base; + } +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java Fri May 21 00:02:12 2010 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import org.apache.commons.cli.Option; + +/** + * Class which extends the basic option object and adds in the configuration id + * and a default value so a central place can be used for retrieval of these as + * needed + */ +class ConfigOption extends Option { + + private static final long serialVersionUID = 7218954906367671150L; + + // config starts with this prefix + private static final String SLIVE_PREFIX = "slive"; + + // command line options and descriptions and config option name + static final ConfigOption MAPS = new ConfigOption( + "maps", true, "Number of maps", SLIVE_PREFIX + " .maps", 10); + + static final ConfigOption OPS = new ConfigOption( + "ops", true, "Max number of operations per map", SLIVE_PREFIX + + ".map.ops", 1000); + + static final ConfigOption DURATION = new ConfigOption( + "duration", true, + "Duration of a map task in seconds (MAX_INT for no limit)", SLIVE_PREFIX + + ".duration", Integer.MAX_VALUE); + + static final ConfigOption EXIT_ON_ERROR = new ConfigOption( + "exitOnError", false, "Exit on first error", SLIVE_PREFIX + + ".exit.on.error", false); + + static final ConfigOption FILES = new ConfigOption( + "files", true, "Max total number of files", + SLIVE_PREFIX + ".total.files", 10); + + static final ConfigOption DIR_SIZE = new ConfigOption( + "dirSize", true, "Max files per directory", SLIVE_PREFIX + ".dir.size", + 32); + + static final ConfigOption BASE_DIR = new ConfigOption( + "baseDir", true, "Base directory path", SLIVE_PREFIX + ".base.dir", + "/test/slive"); + + static final ConfigOption RESULT_FILE = new ConfigOption( + "resFile", true, "Result file name", SLIVE_PREFIX + ".result.file", + "part-0000"); + + static final ConfigOption REPLICATION_AM = new ConfigOption( + "replication", true, "Min,max value for replication amount", SLIVE_PREFIX + + ".file.replication", (short) 3); + + static final ConfigOption BLOCK_SIZE = new ConfigOption( + "blockSize", true, "Min,max for dfs file block size", SLIVE_PREFIX + + ".block.size", 64L * Constants.MEGABYTES); + + static final ConfigOption READ_SIZE = new ConfigOption( + "readSize", true, + "Min,max for size to read (min=max=MAX_LONG=read entire file)", + SLIVE_PREFIX + ".op.read.size", null); + + static final ConfigOption WRITE_SIZE = new ConfigOption( + "writeSize", true, + "Min,max for size to write (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX + + ".op.write.size", null); + + static final ConfigOption SLEEP_TIME = new ConfigOption( + "sleep", + true, + "Min,max for millisecond of random sleep to perform (between operations)", + SLIVE_PREFIX + ".op.sleep.range", null); + + static final ConfigOption APPEND_SIZE = new ConfigOption( + "appendSize", true, + "Min,max for size to append (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX + + ".op.append.size", null); + + static final ConfigOption RANDOM_SEED = new ConfigOption( + "seed", true, "Random number seed", SLIVE_PREFIX + ".seed", null); + + // command line only options + static final Option HELP = new Option("help", false, + "Usage information"); + + static final Option CLEANUP = new Option("cleanup", true, + "Cleanup & remove directory after reporting"); + + // non slive specific settings + static final ConfigOption QUEUE_NAME = new ConfigOption( + "queue", true, "Queue name", "mapred.job.queue.name", "default"); + + static final ConfigOption PACKET_SIZE = new ConfigOption( + "packetSize", true, "Dfs write packet size", "dfs.write.packet.size", + null); + + /** + * Hadoop configuration property name + */ + private String cfgOption; + + /** + * Default value if no value is located by other means + */ + private T defaultValue; + + ConfigOption(String cliOption, boolean hasArg, String description, + String cfgOption, T def) { + super(cliOption, hasArg, description); + this.cfgOption = cfgOption; + this.defaultValue = def; + } + + /** + * @return the configuration option name to lookup in Configuration objects + * for this option + */ + String getCfgOption() { + return cfgOption; + } + + /** + * @return the default object for this option + */ + T getDefault() { + return defaultValue; + } + +} \ No newline at end of file Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java Fri May 21 00:02:12 2010 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +/** + * Constants used in various places in slive + */ +class Constants { + + /** + * This class should be static members only - no construction allowed + */ + private Constants() { + } + + /** + * The distributions supported (or that maybe supported) + */ + enum Distribution { + BEG, END, UNIFORM, MID; + String lowerName() { + return this.name().toLowerCase(); + } + } + + /** + * Allowed operation types + */ + enum OperationType { + READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE; + String lowerName() { + return this.name().toLowerCase(); + } + } + + // program info + static final String PROG_NAME = SliveTest.class.getSimpleName(); + static final String PROG_VERSION = "0.0.1"; + + // useful constants + static final int MEGABYTES = 1048576; + + // must be a multiple of + // BYTES_PER_LONG - used for reading and writing buffer sizes + static final int BUFFERSIZE = 64 * 1024; + + // 8 bytes per long + static final int BYTES_PER_LONG = 8; + + // used for finding the reducer file for a given number + static final String REDUCER_FILE = "part-%s"; + + // this is used to ensure the blocksize is a multiple of this config setting + static final String BYTES_PER_CHECKSUM = "io.bytes.per.checksum"; + + // min replication setting for verification + static final String MIN_REPLICATION = "dfs.namenode.replication.min"; + + // used for getting an option description given a set of distributions + // to substitute + static final String OP_DESCR = "pct,distribution where distribution is one of %s"; + + // keys for looking up a specific operation in the hadoop config + static final String OP_PERCENT = "slive.op.%s.pct"; + static final String OP = "slive.op.%s"; + static final String OP_DISTR = "slive.op.%s.dist"; + + // path constants + static final String BASE_DIR = "slive"; + static final String DATA_DIR = "data"; + static final String OUTPUT_DIR = "output"; + + // whether whenever data is written a flush should occur + static final boolean FLUSH_WRITES = false; + +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java Fri May 21 00:02:12 2010 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput; +import org.apache.hadoop.fs.slive.OperationOutput.OutputType; + +/** + * Operation which selects a random file and a random number of bytes to create + * that file with (from the write size option) and selects a random block size + * (from the block size option) and a random replication amount (from the + * replication option) and attempts to create a file with those options. + * + * This operation will capture statistics on success for bytes written, time + * taken (milliseconds), and success count and on failure it will capture the + * number of failures and the time taken (milliseconds) to fail. + */ +class CreateOp extends Operation { + + private static final Log LOG = LogFactory.getLog(CreateOp.class); + + private static int DEF_IO_BUFFER_SIZE = 4096; + + private static final String IO_BUF_CONFIG = ("io.file.buffer.size"); + + CreateOp(ConfigExtractor cfg, Random rnd) { + super(CreateOp.class.getSimpleName(), cfg, rnd); + } + + /** + * Returns the block size to use (aligned to nearest BYTES_PER_CHECKSUM if + * configuration says a value exists) - this will avoid the warnings caused by + * this not occurring and the file will not be created if it is not correct... + * + * @return long + */ + private long determineBlockSize() { + Range blockSizeRange = getConfig().getBlockSize(); + long blockSize = Range.betweenPositive(getRandom(), blockSizeRange); + Long byteChecksum = getConfig().getByteCheckSum(); + if (byteChecksum == null) { + return blockSize; + } + // adjust to nearest multiple + long full = (blockSize / byteChecksum) * byteChecksum; + long toFull = blockSize - full; + if (toFull >= (byteChecksum / 2)) { + full += byteChecksum; + } + // adjust if over extended + if (full > blockSizeRange.getUpper()) { + full = blockSizeRange.getUpper(); + } + if (full < blockSizeRange.getLower()) { + full = blockSizeRange.getLower(); + } + return full; + } + + /** + * Gets the replication amount + * + * @return short + */ + private short determineReplication() { + Range replicationAmountRange = getConfig().getReplication(); + Range repRange = new Range(replicationAmountRange.getLower() + .longValue(), replicationAmountRange.getUpper().longValue()); + short replicationAmount = (short) Range.betweenPositive(getRandom(), + repRange); + return replicationAmount; + } + + /** + * Gets the output buffering size to use + * + * @return int + */ + private int getBufferSize() { + return getConfig().getConfig().getInt(IO_BUF_CONFIG, DEF_IO_BUFFER_SIZE); + } + + /** + * Gets the file to create + * + * @return Path + */ + protected Path getCreateFile() { + Path fn = getFinder().getFile(); + return fn; + } + + @Override // Operation + List run(FileSystem fs) { + List out = super.run(fs); + FSDataOutputStream os = null; + try { + Path fn = getCreateFile(); + Range writeSizeRange = getConfig().getWriteSize(); + long writeSize = 0; + long blockSize = determineBlockSize(); + short replicationAmount = determineReplication(); + if (getConfig().shouldWriteUseBlockSize()) { + writeSizeRange = getConfig().getBlockSize(); + } + writeSize = Range.betweenPositive(getRandom(), writeSizeRange); + long bytesWritten = 0; + long timeTaken = 0; + int bufSize = getBufferSize(); + boolean overWrite = false; + DataWriter writer = new DataWriter(getRandom()); + LOG.info("Attempting to create file at " + fn + " of size " + + Helper.toByteInfo(writeSize) + " using blocksize " + + Helper.toByteInfo(blockSize) + " and replication amount " + + replicationAmount); + { + // open & create + long startTime = Timer.now(); + os = fs.create(fn, overWrite, bufSize, replicationAmount, blockSize); + timeTaken += Timer.elapsed(startTime); + // write the given length + GenerateOutput stats = writer.writeSegment(writeSize, os); + bytesWritten += stats.getBytesWritten(); + timeTaken += stats.getTimeTaken(); + // capture close time + startTime = Timer.now(); + os.close(); + os = null; + timeTaken += Timer.elapsed(startTime); + } + LOG.info("Created file at " + fn + " of size " + + Helper.toByteInfo(bytesWritten) + " bytes using blocksize " + + Helper.toByteInfo(blockSize) + " and replication amount " + + replicationAmount + " in " + timeTaken + " milliseconds"); + // collect all the stats + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.OK_TIME_TAKEN, timeTaken)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.BYTES_WRITTEN, bytesWritten)); + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.SUCCESSES, 1L)); + } catch (IOException e) { + out.add(new OperationOutput(OutputType.LONG, getType(), + ReportWriter.FAILURES, 1L)); + LOG.warn("Error with creating", e); + } finally { + if (os != null) { + try { + os.close(); + } catch (IOException e) { + LOG.warn("Error closing create stream", e); + } + } + } + return out; + } +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java?rev=946832&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java Fri May 21 00:02:12 2010 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.slive; + +import java.util.Random; + +/** + * Class which is used to create the data to write for a given path and offset + * into that file for writing and later verification that the expected value is + * read at that file bytes offset + */ +class DataHasher { + + private Random rnd; + + DataHasher(long mixIn) { + this.rnd = new Random(mixIn); + } + + /** + * @param offSet + * the byte offset into the file + * + * @return the data to be expected at that offset + */ + long generate(long offSet) { + return ((offSet * 47) ^ (rnd.nextLong() * 97)) * 37; + } + +} Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java ------------------------------------------------------------------------------ svn:mime-type = text/plain