hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r946832 [3/3] - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/fs/slive/
Date Fri, 21 May 2010 00:02:13 GMT
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java Fri May 21 00:02:12 2010
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class which provides a report for the given operation output
+ */
+class ReportWriter {
+
+  // simple measurement types
+  // expect long values
+  // these will be reported on + rates by this reporter
+  static final String OK_TIME_TAKEN = "milliseconds_taken";
+  static final String FAILURES = "failures";
+  static final String SUCCESSES = "successes";
+  static final String BYTES_WRITTEN = "bytes_written";
+  static final String FILES_CREATED = "files_created";
+  static final String DIR_ENTRIES = "dir_entries";
+  static final String OP_COUNT = "op_count";
+  static final String CHUNKS_VERIFIED = "chunks_verified";
+  static final String CHUNKS_UNVERIFIED = "chunks_unverified";
+  static final String BYTES_READ = "bytes_read";
+  static final String NOT_FOUND = "files_not_found";
+  static final String BAD_FILES = "bad_files";
+
+  private static final Log LOG = LogFactory.getLog(ReportWriter.class);
+
+  private static final String SECTION_DELIM = "-------------";
+
+  /**
+   * @return String to be used for as a section delimiter
+   */
+  private String getSectionDelimiter() {
+    return SECTION_DELIM;
+  }
+
+  /**
+   * Writes a message the the logging library and the given print writer (if it
+   * is not null)
+   * 
+   * @param msg
+   *          the message to write
+   * @param os
+   *          the print writer if specified to also write to
+   */
+  private void writeMessage(String msg, PrintWriter os) {
+    LOG.info(msg);
+    if (os != null) {
+      os.println(msg);
+    }
+  }
+
+  /**
+   * Provides a simple report showing only the input size, and for each
+   * operation the operation type, measurement type and its values.
+   * 
+   * @param input
+   *          the list of operations to report on
+   * @param os
+   *          any print writer for which output should be written to (along with
+   *          the logging library)
+   */
+  void basicReport(List<OperationOutput> input, PrintWriter os) {
+    writeMessage("Default report for " + input.size() + " operations ", os);
+    writeMessage(getSectionDelimiter(), os);
+    for (OperationOutput data : input) {
+      writeMessage("Operation \"" + data.getOperationType() + "\" measuring \""
+          + data.getMeasurementType() + "\" = " + data.getValue(), os);
+    }
+    writeMessage(getSectionDelimiter(), os);
+  }
+
+  /**
+   * Provides a more detailed report for a given operation. This will output the
+   * keys and values for all input and then sort based on measurement type and
+   * attempt to show rates for various metrics which have expected types to be
+   * able to measure there rate. Currently this will show rates for bytes
+   * written, success count, files created, directory entries, op count and
+   * bytes read if the variable for time taken is available for each measurement
+   * type.
+   * 
+   * @param operation
+   *          the operation that is being reported on.
+   * @param input
+   *          the set of data for that that operation.
+   * @param os
+   *          any print writer for which output should be written to (along with
+   *          the logging library)
+   */
+  void opReport(String operation, List<OperationOutput> input,
+      PrintWriter os) {
+    writeMessage("Basic report for operation type " + operation, os);
+    writeMessage(getSectionDelimiter(), os);
+    for (OperationOutput data : input) {
+      writeMessage("Measurement \"" + data.getMeasurementType() + "\" = "
+          + data.getValue(), os);
+    }
+    // split up into measurement types for rates...
+    Map<String, OperationOutput> combined = new TreeMap<String, OperationOutput>();
+    for (OperationOutput data : input) {
+      if (combined.containsKey(data.getMeasurementType())) {
+        OperationOutput curr = combined.get(data.getMeasurementType());
+        combined.put(data.getMeasurementType(), OperationOutput.merge(curr,
+            data));
+      } else {
+        combined.put(data.getMeasurementType(), data);
+      }
+    }
+    // handle the known types
+    OperationOutput timeTaken = combined.get(OK_TIME_TAKEN);
+    if (timeTaken != null) {
+      Long mTaken = Long.parseLong(timeTaken.getValue().toString());
+      if (mTaken > 0) {
+        NumberFormat formatter = Formatter.getDecimalFormatter();
+        for (String measurementType : combined.keySet()) {
+          Double rate = null;
+          String rateType = "";
+          if (measurementType.equals(BYTES_WRITTEN)) {
+            Long mbWritten = Long.parseLong(combined.get(measurementType)
+                .getValue().toString())
+                / (Constants.MEGABYTES);
+            rate = (double) mbWritten / (double) (mTaken / 1000.0d);
+            rateType = "MB/sec";
+          } else if (measurementType.equals(SUCCESSES)) {
+            Long succ = Long.parseLong(combined.get(measurementType).getValue()
+                .toString());
+            rate = (double) succ / (double) (mTaken / 1000.0d);
+            rateType = "successes/sec";
+          } else if (measurementType.equals(FILES_CREATED)) {
+            Long filesCreated = Long.parseLong(combined.get(measurementType)
+                .getValue().toString());
+            rate = (double) filesCreated / (double) (mTaken / 1000.0d);
+            rateType = "files created/sec";
+          } else if (measurementType.equals(DIR_ENTRIES)) {
+            Long entries = Long.parseLong(combined.get(measurementType)
+                .getValue().toString());
+            rate = (double) entries / (double) (mTaken / 1000.0d);
+            rateType = "directory entries/sec";
+          } else if (measurementType.equals(OP_COUNT)) {
+            Long opCount = Long.parseLong(combined.get(measurementType)
+                .getValue().toString());
+            rate = (double) opCount / (double) (mTaken / 1000.0d);
+            rateType = "operations/sec";
+          } else if (measurementType.equals(BYTES_READ)) {
+            Long mbRead = Long.parseLong(combined.get(measurementType)
+                .getValue().toString())
+                / (Constants.MEGABYTES);
+            rate = (double) mbRead / (double) (mTaken / 1000.0d);
+            rateType = "MB/sec";
+          }
+          if (rate != null) {
+            writeMessage("Rate for measurement \"" + measurementType + "\" = "
+                + formatter.format(rate) + " " + rateType, os);
+          }
+        }
+      }
+    }
+    writeMessage(getSectionDelimiter(), os);
+  }
+
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java Fri May 21 00:02:12 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A selection object which simulates a roulette wheel whereby all operation
+ * have a weight and the total value of the wheel is the combined weight and
+ * during selection a random number (0, total weight) is selected and then the
+ * operation that is at that value will be selected. So for a set of operations
+ * with uniform weight they will all have the same probability of being
+ * selected. Operations which choose to have higher weights will have higher
+ * likelihood of being selected (and the same goes for lower weights).
+ */
+class RouletteSelector {
+
+  private Random picker;
+
+  RouletteSelector(Random rnd) {
+    picker = rnd;
+  }
+
+  Operation select(List<OperationWeight> ops) {
+    if (ops.isEmpty()) {
+      return null;
+    }
+    double totalWeight = 0;
+    for (OperationWeight w : ops) {
+      if (w.getWeight() < 0) {
+        throw new IllegalArgumentException("Negative weights not allowed");
+      }
+      totalWeight += w.getWeight();
+    }
+    // roulette wheel selection
+    double sAm = picker.nextDouble() * totalWeight;
+    int index = 0;
+    for (int i = 0; i < ops.size(); ++i) {
+      sAm -= ops.get(i).getWeight();
+      if (sAm <= 0) {
+        index = i;
+        break;
+      }
+    }
+    return ops.get(index).getOperation();
+  }
+
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java Fri May 21 00:02:12 2010
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
+
+/**
+ * Operation which sleeps for a given number of milliseconds according to the
+ * config given, and reports on the sleep time overall
+ */
+class SleepOp extends Operation {
+
+  private static final Log LOG = LogFactory.getLog(SleepOp.class);
+
+  SleepOp(ConfigExtractor cfg, Random rnd) {
+    super(SleepOp.class.getSimpleName(), cfg, rnd);
+  }
+
+  protected long getSleepTime(Range<Long> sleepTime) {
+    long sleepMs = Range.betweenPositive(getRandom(), sleepTime);
+    return sleepMs;
+  }
+
+  /**
+   * Sleep for a random amount of time between a given positive range
+   * 
+   * @param sleepTime
+   *          positive long range for times to choose
+   * 
+   * @return output data on operation
+   */
+  List<OperationOutput> run(Range<Long> sleepTime) {
+    List<OperationOutput> out = super.run(null);
+    try {
+      if (sleepTime != null) {
+        long sleepMs = getSleepTime(sleepTime);
+        long startTime = Timer.now();
+        sleep(sleepMs);
+        long elapsedTime = Timer.elapsed(startTime);
+        out.add(new OperationOutput(OutputType.LONG, getType(),
+            ReportWriter.OK_TIME_TAKEN, elapsedTime));
+        out.add(new OperationOutput(OutputType.LONG, getType(),
+            ReportWriter.SUCCESSES, 1L));
+      }
+    } catch (InterruptedException e) {
+      out.add(new OperationOutput(OutputType.LONG, getType(),
+          ReportWriter.FAILURES, 1L));
+      LOG.warn("Error with sleeping", e);
+    }
+    return out;
+  }
+
+  @Override // Operation
+  List<OperationOutput> run(FileSystem fs) {
+    Range<Long> sleepTime = getConfig().getSleepRange();
+    return run(sleepTime);
+  }
+
+  /**
+   * Sleeps the current thread for X milliseconds
+   * 
+   * @param ms
+   *          milliseconds to sleep for
+   * 
+   * @throws InterruptedException
+   */
+  private void sleep(long ms) throws InterruptedException {
+    if (ms <= 0) {
+      return;
+    }
+    Thread.sleep(ms);
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java Fri May 21 00:02:12 2010
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The slive class which sets up the mapper to be used which itself will receive
+ * a single dummy key and value and then in a loop run the various operations
+ * that have been selected and upon operation completion output the collected
+ * output from that operation (and repeat until finished).
+ */
+@SuppressWarnings("deprecation")
+public class SliveMapper extends MapReduceBase implements
+    Mapper<Object, Object, Text, Text> {
+
+  private static final Log LOG = LogFactory.getLog(SliveMapper.class);
+
+  private static final String OP_TYPE = SliveMapper.class.getSimpleName();
+
+  private FileSystem filesystem;
+  private ConfigExtractor config;
+  private WeightSelector selector;
+  private Random rnd;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred
+   * .JobConf)
+   */
+  @Override // MapReduceBase
+  public void configure(JobConf conf) {
+    try {
+      filesystem = FileSystem.get(conf);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Unable to get the filesystem from provided configuration", e);
+    }
+    try {
+      config = new ConfigExtractor(conf);
+      Long rndSeed = config.getRandomSeed();
+      if (rndSeed != null) {
+        rnd = new Random(rndSeed);
+      } else {
+        rnd = new Random();
+      }
+      selector = new WeightSelector(config, rnd);
+      ConfigExtractor.dumpOptions(config);
+    } catch (Exception e) {
+      LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
+      throw new RuntimeException("Unable to setup slive configuration", e);
+    }
+
+  }
+
+  /**
+   * Fetches the config this object uses
+   * 
+   * @return ConfigExtractor
+   */
+  private ConfigExtractor getConfig() {
+    return config;
+  }
+
+  /**
+   * Gets the operation selector to use for this object
+   * 
+   * @return WeightSelector
+   */
+  private WeightSelector getSelector() {
+    return selector;
+  }
+
+  /**
+   * Logs to the given reporter and logs to the internal logger at info level
+   * 
+   * @param r
+   *          the reporter to set status on
+   * @param msg
+   *          the message to log
+   */
+  private void logAndSetStatus(Reporter r, String msg) {
+    r.setStatus(msg);
+    LOG.info(msg);
+  }
+
+  /**
+   * Runs the given operation and reports on its results
+   * 
+   * @param op
+   *          the operation to run
+   * @param reporter
+   *          the status reporter to notify
+   * @param output
+   *          the output to write to
+   * @throws IOException
+   */
+  private void runOperation(Operation op, Reporter reporter,
+      OutputCollector<Text, Text> output, long opNum) throws IOException {
+    if (op == null) {
+      return;
+    }
+    logAndSetStatus(reporter, "Running operation #" + opNum + " (" + op + ")");
+    List<OperationOutput> opOut = op.run(filesystem);
+    logAndSetStatus(reporter, "Finished operation #" + opNum + " (" + op + ")");
+    if (opOut != null && !opOut.isEmpty()) {
+      for (OperationOutput outData : opOut) {
+        output.collect(outData.getKey(), outData.getOutputValue());
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object,
+   * java.lang.Object, org.apache.hadoop.mapred.OutputCollector,
+   * org.apache.hadoop.mapred.Reporter)
+   */
+  @Override // Mapper
+  public void map(Object key, Object value, OutputCollector<Text, Text> output,
+      Reporter reporter) throws IOException {
+    logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
+        + " and dummy value " + value);
+    long startTime = Timer.now();
+    long opAm = 0;
+    long sleepOps = 0;
+    int duration = getConfig().getDurationMilliseconds();
+    Range<Long> sleepRange = getConfig().getSleepRange();
+    Operation sleeper = null;
+    if (sleepRange != null) {
+      sleeper = new SleepOp(getConfig(), rnd);
+    }
+    WeightSelector selector = getSelector();
+    while (Timer.elapsed(startTime) < duration) {
+      try {
+        logAndSetStatus(reporter, "Attempting to select operation #"
+            + (opAm + 1));
+        int currElapsed = (int) (Timer.elapsed(startTime));
+        Operation op = selector.select(currElapsed, duration);
+        if (op == null) {
+          // no ops left
+          break;
+        } else {
+          // got a good op
+          ++opAm;
+          runOperation(op, reporter, output, opAm);
+        }
+        // do a sleep??
+        if (sleeper != null) {
+          // these don't count against the number of operations
+          ++sleepOps;
+          runOperation(sleeper, reporter, output, sleepOps);
+        }
+      } catch (Exception e) {
+        logAndSetStatus(reporter, "Failed at running due to "
+            + StringUtils.stringifyException(e));
+        if (getConfig().shouldExitOnFirstError()) {
+          break;
+        }
+      }
+    }
+    // write out any accumulated mapper stats
+    {
+      long timeTaken = Timer.elapsed(startTime);
+      OperationOutput opCount = new OperationOutput(OutputType.LONG, OP_TYPE,
+          ReportWriter.OP_COUNT, opAm);
+      output.collect(opCount.getKey(), opCount.getOutputValue());
+      OperationOutput overallTime = new OperationOutput(OutputType.LONG,
+          OP_TYPE, ReportWriter.OK_TIME_TAKEN, timeTaken);
+      output.collect(overallTime.getKey(), overallTime.getOutputValue());
+      logAndSetStatus(reporter, "Finished " + opAm + " operations in "
+          + timeTaken + " milliseconds");
+    }
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java Fri May 21 00:02:12 2010
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The slive reducer which iterates over the given input values and merges them
+ * together into a final output value.
+ */
+@SuppressWarnings("deprecation")
+public class SliveReducer extends MapReduceBase implements
+    Reducer<Text, Text, Text, Text> {
+
+  private static final Log LOG = LogFactory.getLog(SliveReducer.class);
+
+  private ConfigExtractor config;
+
+  /**
+   * Logs to the given reporter and logs to the internal logger at info level
+   * 
+   * @param r
+   *          the reporter to set status on
+   * @param msg
+   *          the message to log
+   */
+  private void logAndSetStatus(Reporter r, String msg) {
+    r.setStatus(msg);
+    LOG.info(msg);
+  }
+
+  /**
+   * Fetches the config this object uses
+   * 
+   * @return ConfigExtractor
+   */
+  private ConfigExtractor getConfig() {
+    return config;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object,
+   * java.util.Iterator, org.apache.hadoop.mapred.OutputCollector,
+   * org.apache.hadoop.mapred.Reporter)
+   */
+  @Override // Reducer
+  public void reduce(Text key, Iterator<Text> values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    OperationOutput collector = null;
+    int reduceAm = 0;
+    int errorAm = 0;
+    logAndSetStatus(reporter, "Iterating over reduction values for key " + key);
+    while (values.hasNext()) {
+      Text value = values.next();
+      try {
+        OperationOutput val = new OperationOutput(key, value);
+        if (collector == null) {
+          collector = val;
+        } else {
+          collector = OperationOutput.merge(collector, val);
+        }
+        LOG.info("Combined " + val + " into/with " + collector);
+        ++reduceAm;
+      } catch (Exception e) {
+        ++errorAm;
+        logAndSetStatus(reporter, "Error iterating over reduction input "
+            + value + " due to : " + StringUtils.stringifyException(e));
+        if (getConfig().shouldExitOnFirstError()) {
+          break;
+        }
+      }
+    }
+    logAndSetStatus(reporter, "Reduced " + reduceAm + " values with " + errorAm
+        + " errors");
+    if (collector != null) {
+      logAndSetStatus(reporter, "Writing output " + collector.getKey() + " : "
+          + collector.getOutputValue());
+      output.collect(collector.getKey(), collector.getOutputValue());
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred
+   * .JobConf)
+   */
+  @Override // MapReduceBase
+  public void configure(JobConf conf) {
+    config = new ConfigExtractor(conf);
+    ConfigExtractor.dumpOptions(config);
+  }
+
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java Fri May 21 00:02:12 2010
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * Slive test entry point + main program
+ * 
+ * This program will output a help message given -help which can be used to
+ * determine the program options and configuration which will affect the program
+ * runtime. The program will take these options, either from configuration or
+ * command line and process them (and merge) and then establish a job which will
+ * thereafter run a set of mappers & reducers and then the output of the
+ * reduction will be reported on.
+ */
+@SuppressWarnings("deprecation")
+public class SliveTest {
+
+  private static final Log LOG = LogFactory.getLog(SliveTest.class);
+
+  // ensures the hdfs configurations are loaded if they exist
+  static {
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+
+  private String[] args;
+  private Configuration base;
+
+  public SliveTest(Configuration base) {
+    this.args = null;
+    this.base = base;
+  }
+
+  public SliveTest(String[] args, Configuration base) {
+    this.args = args;
+    this.base = base;
+  }
+
+  public int run() {
+    ParsedOutput parsedOpts = null;
+    try {
+      ArgumentParser argHolder = new ArgumentParser(args);
+      parsedOpts = argHolder.parse();
+      if (parsedOpts.shouldOutputHelp()) {
+        parsedOpts.outputHelp();
+        return 1;
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to parse arguments due to error: " + e.getMessage());
+      e.printStackTrace();
+      return 1;
+    }
+    LOG.info("Running with option list " + Helper.stringifyArray(args, " "));
+    ConfigExtractor config = null;
+    try {
+      ConfigMerger cfgMerger = new ConfigMerger();
+      Configuration cfg = cfgMerger.getMerged(parsedOpts, getBaseConfig());
+      if (cfg != null) {
+        config = new ConfigExtractor(cfg);
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to merge config due to error: " + e.getMessage());
+      e.printStackTrace();
+      return 1;
+    }
+    if (config == null) {
+      LOG.error("Unable to merge config & options!");
+      return 1;
+    }
+    try {
+      LOG.info("Options are:");
+      ConfigExtractor.dumpOptions(config);
+    } catch (Exception e) {
+      LOG.error("Unable to dump options due to error: " + e.getMessage());
+      e.printStackTrace();
+      return 1;
+    }
+    boolean jobOk = false;
+    try {
+      LOG.info("Running job:");
+      runJob(config);
+      jobOk = true;
+    } catch (Exception e) {
+      LOG.error("Unable to run job due to error: " + e.getMessage());
+      e.printStackTrace();
+    }
+    if (jobOk) {
+      try {
+        LOG.info("Reporting on job:");
+        writeReport(config);
+      } catch (Exception e) {
+        LOG.error("Unable to report on job due to error: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+    // attempt cleanup (not critical)
+    boolean cleanUp = getBool(parsedOpts
+        .getValue(ConfigOption.CLEANUP.getOpt()));
+    if (cleanUp) {
+      try {
+        LOG.info("Cleaning up job:");
+        cleanup(config);
+      } catch (Exception e) {
+        LOG.error("Unable to cleanup job due to error: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+    // all mostly worked
+    if (jobOk) {
+      return 0;
+    }
+    // maybe didn't work
+    return 1;
+  }
+
+  /**
+   * Checks if a string is a boolean or not and what type
+   * 
+   * @param val
+   *          val to check
+   * @return boolean
+   */
+  private boolean getBool(String val) {
+    if (val == null) {
+      return false;
+    }
+    String cleanupOpt = val.toLowerCase().trim();
+    if (cleanupOpt.equals("true") || cleanupOpt.equals("1")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Sets up a job conf for the given job using the given config object. Ensures
+   * that the correct input format is set, the mapper and and reducer class and
+   * the input and output keys and value classes along with any other job
+   * configuration.
+   * 
+   * @param config
+   * @return JobConf representing the job to be ran
+   * @throws IOException
+   */
+  private JobConf getJob(ConfigExtractor config) throws IOException {
+    JobConf job = new JobConf(config.getConfig(), SliveTest.class);
+    job.setInputFormat(DummyInputFormat.class);
+    FileOutputFormat.setOutputPath(job, config.getOutputPath());
+    job.setMapperClass(SliveMapper.class);
+    job.setReducerClass(SliveReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    TextOutputFormat.setCompressOutput(job, false);
+    job.setNumReduceTasks(config.getReducerAmount());
+    job.setNumMapTasks(config.getMapAmount());
+    return job;
+  }
+
+  /**
+   * Runs the job given the provided config
+   * 
+   * @param config
+   *          the config to run the job with
+   * 
+   * @throws IOException
+   *           if can not run the given job
+   */
+  private void runJob(ConfigExtractor config) throws IOException {
+    JobClient.runJob(getJob(config));
+  }
+
+  /**
+   * Attempts to write the report to the given output using the specified
+   * config. It will open up the expected reducer output file and read in its
+   * contents and then split up by operation output and sort by operation type
+   * and then for each operation type it will generate a report to the specified
+   * result file and the console.
+   * 
+   * @param cfg
+   *          the config specifying the files and output
+   * 
+   * @throws Exception
+   *           if files can not be opened/closed/read or invalid format
+   */
+  private void writeReport(ConfigExtractor cfg) throws Exception {
+    Path fn = new Path(cfg.getOutputPath(), String.format(
+        Constants.REDUCER_FILE, "00000"));
+    LOG.info("Writing report using contents of " + fn);
+    FileSystem fs = FileSystem.get(cfg.getConfig());
+    BufferedReader fileReader = null;
+    PrintWriter reportWriter = null;
+    try {
+      fileReader = new BufferedReader(new InputStreamReader(
+          new DataInputStream(fs.open(fn))));
+      String line;
+      Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
+      List<OperationOutput> noOperations = new ArrayList<OperationOutput>();
+      while ((line = fileReader.readLine()) != null) {
+        String pieces[] = line.split("\t", 2);
+        if (pieces.length == 2) {
+          OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
+          String op = (data.getOperationType());
+          if (op != null) {
+            List<OperationOutput> opList = splitTypes.get(op);
+            if (opList == null) {
+              opList = new ArrayList<OperationOutput>();
+            }
+            opList.add(data);
+            splitTypes.put(op, opList);
+          } else {
+            noOperations.add(data);
+          }
+        } else {
+          throw new IOException("Unparseable line " + line);
+        }
+      }
+      File resFile = null;
+      if (cfg.getResultFile() != null) {
+        resFile = new File(cfg.getResultFile());
+      }
+      if (resFile != null) {
+        LOG.info("Report results being placed to logging output and to file "
+            + resFile.getCanonicalPath());
+        reportWriter = new PrintWriter(new FileOutputStream(resFile));
+      } else {
+        LOG.info("Report results being placed to logging output");
+      }
+      ReportWriter reporter = new ReportWriter();
+      if (!noOperations.isEmpty()) {
+        reporter.basicReport(noOperations, reportWriter);
+      }
+      for (String opType : splitTypes.keySet()) {
+        reporter.opReport(opType, splitTypes.get(opType), reportWriter);
+      }
+    } finally {
+      if (fileReader != null) {
+        fileReader.close();
+      }
+      if (reportWriter != null) {
+        reportWriter.close();
+      }
+    }
+  }
+
+  /**
+   * Gets the base configuration to use for a "starting" configuration to be
+   * merged with.
+   * 
+   * @return Configuration starting configuration.
+   */
+  private Configuration getBaseConfig() {
+    // ensure a copy is made
+    return new Configuration(base);
+  }
+
+  /**
+   * Cleans up the base directory by removing it
+   * 
+   * @param cfg
+   *          ConfigExtractor which has location of base directory
+   * 
+   * @throws IOException
+   */
+  private void cleanup(ConfigExtractor cfg) throws IOException {
+    FileSystem fs = FileSystem.get(cfg.getConfig());
+    Path base = cfg.getBaseDirectory();
+    if (base != null) {
+      LOG.info("Attempting to recursively delete " + base);
+      fs.delete(base, true);
+    }
+  }
+
+  /**
+   * The main program entry point. Sets up and parses the command line options,
+   * then merges those options and then dumps those options and the runs the
+   * corresponding map/reduce job that those operations represent and then
+   * writes the report for the output of the run that occurred.
+   * 
+   * @param args
+   *          command line options
+   */
+  public static void main(String[] args) {
+    Configuration startCfg = new Configuration(true);
+    SliveTest runner = new SliveTest(args, startCfg);
+    int ec = runner.run();
+    System.exit(ec);
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java Fri May 21 00:02:12 2010
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
+import org.apache.hadoop.fs.slive.Constants.OperationType;
+import org.apache.hadoop.fs.slive.DataVerifier.VerifyOutput;
+import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Junit 4 test for slive
+ */
+public class TestSlive {
+
+  private static final Log LOG = LogFactory.getLog(TestSlive.class);
+
+  private static final Random rnd = new Random(1L);
+
+  private static final String TEST_DATA_PROP = "test.build.data";
+
+  private static Configuration getBaseConfig() {
+    return new Configuration();
+  }
+
+  // gets the test write location according to the coding guidelines
+  private static File getWriteLoc() {
+    String writeLoc = System.getProperty(TEST_DATA_PROP);
+    if (writeLoc == null || writeLoc.isEmpty()) {
+      throw new RuntimeException("No " + TEST_DATA_PROP
+          + " system property specified");
+    }
+    return new File(writeLoc, "slive");
+  }
+
+  // gets where the MR job places its data + output + results
+  private static File getFlowLocation() {
+    return new File(getWriteLoc(), "flow");
+  }
+
+  // gets the test directory which is created
+  // by the mkdir op
+  private static File getTestDir() {
+    return new File(getWriteLoc(), "slivedir");
+  }
+
+  // gets the test file location
+  // which is used for reading, appending and created
+  private static File getTestFile() {
+    return new File(getWriteLoc(), "slivefile");
+  }
+
+  // gets the rename file which is used in combination
+  // with the test file to do a rename operation
+  private static File getTestRenameFile() {
+    return new File(getWriteLoc(), "slivefile1");
+  }
+
+  // gets the MR result file name
+  private static File getResultFile() {
+    return new File(getWriteLoc(), "sliveresfile");
+  }
+
+  private static File getImaginaryFile() {
+    return new File(getWriteLoc(), "slivenofile");
+  }
+
+  // gets the test program arguments
+  // used for merging and main MR running
+  private String[] getTestArgs(boolean sleep) {
+    List<String> args = new LinkedList<String>();
+    // setup the options
+    {
+      args.add("-" + ConfigOption.WRITE_SIZE.getOpt());
+      args.add("1M,2M");
+      args.add("-" + ConfigOption.OPS.getOpt());
+      args.add(Constants.OperationType.values().length + "");
+      args.add("-" + ConfigOption.MAPS.getOpt());
+      args.add("1");
+      args.add("-" + ConfigOption.APPEND_SIZE.getOpt());
+      args.add("1M,2M");
+      args.add("-" + ConfigOption.BLOCK_SIZE.getOpt());
+      args.add("1M,2M");
+      args.add("-" + ConfigOption.REPLICATION_AM.getOpt());
+      args.add("1,1");
+      if (sleep) {
+        args.add("-" + ConfigOption.SLEEP_TIME.getOpt());
+        args.add("10,10");
+      }
+      args.add("-" + ConfigOption.RESULT_FILE.getOpt());
+      args.add(getResultFile().toString());
+      args.add("-" + ConfigOption.BASE_DIR.getOpt());
+      args.add(getFlowLocation().toString());
+      args.add("-" + ConfigOption.DURATION.getOpt());
+      args.add("10");
+      args.add("-" + ConfigOption.DIR_SIZE.getOpt());
+      args.add("10");
+      args.add("-" + ConfigOption.FILES.getOpt());
+      args.add("10");
+    }
+    return args.toArray(new String[args.size()]);
+  }
+
+  @Test
+  public void testFinder() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    PathFinder fr = new PathFinder(extractor, rnd);
+    // should only be able to select 10 files
+    // attempt for a given amount of iterations
+    int maxIterations = 10000;
+    Set<Path> files = new HashSet<Path>();
+    for (int i = 0; i < maxIterations; i++) {
+      files.add(fr.getFile());
+    }
+    assertTrue(files.size() == 10);
+    Set<Path> dirs = new HashSet<Path>();
+    for (int i = 0; i < maxIterations; i++) {
+      dirs.add(fr.getDirectory());
+    }
+    assertTrue(dirs.size() == 10);
+  }
+
+  @Test
+  public void testSelection() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    WeightSelector selector = new WeightSelector(extractor, rnd);
+    // should be 1 of each type - uniform
+    int expected = OperationType.values().length;
+    Operation op = null;
+    Set<String> types = new HashSet<String>();
+    FileSystem fs = FileSystem.get(extractor.getConfig());
+    while (true) {
+      op = selector.select(1, 1);
+      if (op == null) {
+        break;
+      }
+      // doesn't matter if they work or not
+      op.run(fs);
+      types.add(op.getType());
+    }
+    assertEquals(types.size(), expected);
+  }
+
+  // gets the config merged with the arguments
+  private ConfigExtractor getTestConfig(boolean sleep) throws Exception {
+    ArgumentParser parser = new ArgumentParser(getTestArgs(sleep));
+    ParsedOutput out = parser.parse();
+    assertTrue(!out.shouldOutputHelp());
+    ConfigMerger merge = new ConfigMerger();
+    Configuration cfg = merge.getMerged(out, getBaseConfig());
+    ConfigExtractor extractor = new ConfigExtractor(cfg);
+    return extractor;
+  }
+
+  @Before
+  public void ensureDeleted() throws Exception {
+    rDelete(getTestFile());
+    rDelete(getTestDir());
+    rDelete(getTestRenameFile());
+    rDelete(getResultFile());
+    rDelete(getFlowLocation());
+    rDelete(getImaginaryFile());
+  }
+
+  // cleans up a file or directory
+  // recursively if needbe
+  private void rDelete(File place) throws Exception {
+    if (place.isFile()) {
+      LOG.info("Deleting file " + place);
+      assertTrue(place.delete());
+    } else if (place.isDirectory()) {
+      deleteDir(place);
+    }
+  }
+
+  // deletes a dir and its contents
+  private void deleteDir(File dir) throws Exception {
+    String fns[] = dir.list();
+    // delete contents first
+    for (String afn : fns) {
+      File fn = new File(dir, afn);
+      rDelete(fn);
+    }
+    LOG.info("Deleting directory " + dir);
+    // now delete the dir
+    assertTrue(dir.delete());
+  }
+
+  @Test
+  public void testArguments() throws Exception {
+    ConfigExtractor extractor = getTestConfig(true);
+    assertEquals(extractor.getOpCount().intValue(), Constants.OperationType
+        .values().length);
+    assertEquals(extractor.getMapAmount().intValue(), 1);
+    Range<Long> apRange = extractor.getAppendSize();
+    assertEquals(apRange.getLower().intValue(), Constants.MEGABYTES * 1);
+    assertEquals(apRange.getUpper().intValue(), Constants.MEGABYTES * 2);
+    Range<Long> wRange = extractor.getWriteSize();
+    assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1);
+    assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2);
+    Range<Long> bRange = extractor.getBlockSize();
+    assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1);
+    assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2);
+    String resfile = extractor.getResultFile();
+    assertEquals(resfile, getResultFile().toString());
+    int durationMs = extractor.getDurationMilliseconds();
+    assertEquals(durationMs, 10 * 1000);
+  }
+
+  @Test
+  public void testDataWriting() throws Exception {
+    long byteAm = 100;
+    File fn = getTestFile();
+    DataWriter writer = new DataWriter(rnd);
+    FileOutputStream fs = new FileOutputStream(fn);
+    GenerateOutput ostat = writer.writeSegment(byteAm, fs);
+    LOG.info(ostat);
+    fs.close();
+    assertTrue(ostat.getBytesWritten() == byteAm);
+    DataVerifier vf = new DataVerifier();
+    FileInputStream fin = new FileInputStream(fn);
+    VerifyOutput vfout = vf.verifyFile(byteAm, new DataInputStream(fin));
+    LOG.info(vfout);
+    fin.close();
+    assertEquals(vfout.getBytesRead(), byteAm);
+    assertTrue(vfout.getChunksDifferent() == 0);
+  }
+
+  @Test
+  public void testRange() {
+    Range<Long> r = new Range<Long>(10L, 20L);
+    assertEquals(r.getLower().longValue(), 10L);
+    assertEquals(r.getUpper().longValue(), 20L);
+  }
+
+  @Test
+  public void testCreateOp() throws Exception {
+    // setup a valid config
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path fn = new Path(getTestFile().getCanonicalPath());
+    CreateOp op = new CreateOp(extractor, rnd) {
+      protected Path getCreateFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, op, true);
+  }
+
+  @Test
+  public void testOpFailures() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path fn = new Path(getImaginaryFile().getCanonicalPath());
+    ReadOp rop = new ReadOp(extractor, rnd) {
+      protected Path getReadFile() {
+        return fn;
+      }
+    };
+    runOperationBad(extractor, rop);
+
+    DeleteOp dop = new DeleteOp(extractor, rnd) {
+      protected Path getDeleteFile() {
+        return fn;
+      }
+    };
+    runOperationBad(extractor, dop);
+
+    RenameOp reop = new RenameOp(extractor, rnd) {
+      protected SrcTarget getRenames() {
+        return new SrcTarget(fn, fn);
+      }
+    };
+    runOperationBad(extractor, reop);
+
+    AppendOp aop = new AppendOp(extractor, rnd) {
+      protected Path getAppendFile() {
+        return fn;
+      }
+    };
+
+    runOperationBad(extractor, aop);
+  }
+
+  private void runOperationBad(ConfigExtractor cfg, Operation op)
+      throws Exception {
+    FileSystem fs = FileSystem.get(cfg.getConfig());
+    List<OperationOutput> data = op.run(fs);
+    assertTrue(!data.isEmpty());
+    boolean foundFail = false;
+    for (OperationOutput d : data) {
+      if (d.getMeasurementType().equals(ReportWriter.FAILURES)) {
+        foundFail = true;
+      }
+      if (d.getMeasurementType().equals(ReportWriter.NOT_FOUND)) {
+        foundFail = true;
+      }
+    }
+    assertTrue(foundFail);
+  }
+
+  private void runOperationOk(ConfigExtractor cfg, Operation op, boolean checkOk)
+      throws Exception {
+    FileSystem fs = FileSystem.get(cfg.getConfig());
+    List<OperationOutput> data = op.run(fs);
+    assertTrue(!data.isEmpty());
+    if (checkOk) {
+      boolean foundSuc = false;
+      boolean foundOpCount = false;
+      boolean foundTime = false;
+      for (OperationOutput d : data) {
+        assertTrue(!d.getMeasurementType().equals(ReportWriter.FAILURES));
+        if (d.getMeasurementType().equals(ReportWriter.SUCCESSES)) {
+          foundSuc = true;
+        }
+        if (d.getMeasurementType().equals(ReportWriter.OP_COUNT)) {
+          foundOpCount = true;
+        }
+        if (d.getMeasurementType().equals(ReportWriter.OK_TIME_TAKEN)) {
+          foundTime = true;
+        }
+      }
+      assertTrue(foundSuc);
+      assertTrue(foundOpCount);
+      assertTrue(foundTime);
+    }
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path fn = new Path(getTestFile().getCanonicalPath());
+    // ensure file created before delete
+    CreateOp op = new CreateOp(extractor, rnd) {
+      protected Path getCreateFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, op, true);
+    // now delete
+    DeleteOp dop = new DeleteOp(extractor, rnd) {
+      protected Path getDeleteFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, dop, true);
+  }
+
+  @Test
+  public void testRename() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path src = new Path(getTestFile().getCanonicalPath());
+    final Path tgt = new Path(getTestRenameFile().getCanonicalPath());
+    // ensure file created before rename
+    CreateOp op = new CreateOp(extractor, rnd) {
+      protected Path getCreateFile() {
+        return src;
+      }
+    };
+    runOperationOk(extractor, op, true);
+    RenameOp rop = new RenameOp(extractor, rnd) {
+      protected SrcTarget getRenames() {
+        return new SrcTarget(src, tgt);
+      }
+    };
+    runOperationOk(extractor, rop, true);
+  }
+
+  @Test
+  public void testMRFlow() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    SliveTest s = new SliveTest(getTestArgs(false), getBaseConfig());
+    int ec = s.run();
+    assertTrue(ec == 0);
+    String resFile = extractor.getResultFile();
+    File fn = new File(resFile);
+    assertTrue(fn.exists());
+    // can't validate completely since operations may fail (mainly anyone but
+    // create +mkdir) since they may not find there files
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path fn = new Path(getTestFile().getCanonicalPath());
+    // ensure file created before read
+    CreateOp op = new CreateOp(extractor, rnd) {
+      protected Path getCreateFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, op, true);
+    ReadOp rop = new ReadOp(extractor, rnd) {
+      protected Path getReadFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, rop, true);
+  }
+
+  @Test
+  public void testSleep() throws Exception {
+    ConfigExtractor extractor = getTestConfig(true);
+    SleepOp op = new SleepOp(extractor, rnd);
+    runOperationOk(extractor, op, true);
+  }
+
+  @Test
+  public void testList() throws Exception {
+    // ensure dir made
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path dir = new Path(getTestDir().getCanonicalPath());
+    MkdirOp op = new MkdirOp(extractor, rnd) {
+      protected Path getDirectory() {
+        return dir;
+      }
+    };
+    runOperationOk(extractor, op, true);
+    // list it
+    ListOp lop = new ListOp(extractor, rnd) {
+      protected Path getDirectory() {
+        return dir;
+      }
+    };
+    runOperationOk(extractor, lop, true);
+  }
+
+  @Test
+  public void testBadChunks() throws Exception {
+    File fn = getTestFile();
+    int byteAm = 10000;
+    FileOutputStream fout = new FileOutputStream(fn);
+    byte[] bytes = new byte[byteAm];
+    rnd.nextBytes(bytes);
+    fout.write(bytes);
+    fout.close();
+    // attempt to read it
+    DataVerifier vf = new DataVerifier();
+    VerifyOutput vout = new VerifyOutput(0, 0, 0, 0);
+    try {
+      vout = vf
+          .verifyFile(byteAm, new DataInputStream(new FileInputStream(fn)));
+    } catch (Exception e) {
+
+    }
+    assertTrue(vout.getChunksSame() == 0);
+  }
+
+  @Test
+  public void testMkdir() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    final Path dir = new Path(getTestDir().getCanonicalPath());
+    MkdirOp op = new MkdirOp(extractor, rnd) {
+      protected Path getDirectory() {
+        return dir;
+      }
+    };
+    runOperationOk(extractor, op, true);
+  }
+
+  @Test
+  public void testSelector() throws Exception {
+    ConfigExtractor extractor = getTestConfig(false);
+    RouletteSelector selector = new RouletteSelector(rnd);
+    List<OperationWeight> sList = new LinkedList<OperationWeight>();
+    Operation op = selector.select(sList);
+    assertTrue(op == null);
+    CreateOp cop = new CreateOp(extractor, rnd);
+    sList.add(new OperationWeight(cop, 1.0d));
+    AppendOp aop = new AppendOp(extractor, rnd);
+    sList.add(new OperationWeight(aop, 0.01d));
+    op = selector.select(sList);
+    assertTrue(op == cop);
+  }
+
+  @Test
+  public void testAppendOp() throws Exception {
+    // setup a valid config
+    ConfigExtractor extractor = getTestConfig(false);
+    // ensure file created before append
+    final Path fn = new Path(getTestFile().getCanonicalPath());
+    CreateOp op = new CreateOp(extractor, rnd) {
+      protected Path getCreateFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, op, true);
+    // local file system (ChecksumFileSystem) currently doesn't support append -
+    // but we'll leave this test here anyways but can't check the results..
+    AppendOp aop = new AppendOp(extractor, rnd) {
+      protected Path getAppendFile() {
+        return fn;
+      }
+    };
+    runOperationOk(extractor, aop, false);
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java Fri May 21 00:02:12 2010
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+/**
+ * Simple timer class that abstracts time access
+ */
+class Timer {
+
+  // no construction allowed
+  private Timer() {
+
+  }
+
+  /**
+   * The current time in milliseconds
+   * 
+   * @return long (milliseconds)
+   */
+  static long now() {
+    return System.currentTimeMillis();
+  }
+
+  /**
+   * Calculates how much time in milliseconds elapsed from given start time to
+   * the current time in milliseconds
+   * 
+   * @param startTime
+   * @return elapsed time (milliseconds)
+   */
+  static long elapsed(long startTime) {
+    long elapsedTime = now() - startTime;
+    if (elapsedTime < 0) {
+      elapsedTime = 0;
+    }
+    return elapsedTime;
+  }
+
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java Fri May 21 00:02:12 2010
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.slive.Constants.Distribution;
+import org.apache.hadoop.fs.slive.Constants.OperationType;
+import org.apache.hadoop.fs.slive.Weights.UniformWeight;
+import org.apache.hadoop.fs.slive.ObserveableOp.Observer;
+
+/**
+ * This class is the main handler that selects operations to run using the
+ * currently held selection object. It configures and weights each operation and
+ * then hands the operations + weights off to the selector object to determine
+ * which one should be ran. If no operations are left to be ran then it will
+ * return null.
+ */
+class WeightSelector {
+
+  // what a weight calculation means
+  interface Weightable {
+    Double weight(int elapsed, int duration);
+  }
+
+  private static final Log LOG = LogFactory.getLog(WeightSelector.class);
+
+  private static class OperationInfo {
+    Integer amountLeft;
+    Operation operation;
+    Distribution distribution;
+  }
+
+  private Map<OperationType, OperationInfo> operations;
+  private Map<Distribution, Weightable> weights;
+  private RouletteSelector selector;
+  private OperationFactory factory;
+
+  WeightSelector(ConfigExtractor cfg, Random rnd) {
+    selector = new RouletteSelector(rnd);
+    factory = new OperationFactory(cfg, rnd);
+    configureOperations(cfg);
+    configureWeights(cfg);
+  }
+
+  protected RouletteSelector getSelector() {
+    return selector;
+  }
+
+  private void configureWeights(ConfigExtractor e) {
+    weights = new HashMap<Distribution, Weightable>();
+    weights.put(Distribution.UNIFORM, new UniformWeight());
+    // weights.put(Distribution.BEG, new BeginWeight());
+    // weights.put(Distribution.END, new EndWeight());
+    // weights.put(Distribution.MID, new MidWeight());
+  }
+
+  /**
+   * Determines how many initial operations a given operation data should have
+   * 
+   * @param totalAm
+   *          the total amount of operations allowed
+   * 
+   * @param opData
+   *          the given operation information (with a valid percentage >= 0)
+   * 
+   * @return the number of items to allow to run
+   * 
+   * @throws IllegalArgumentException
+   *           if negative operations are determined
+   */
+  static int determineHowMany(int totalAm, OperationData opData,
+      OperationType type) {
+    if (totalAm <= 0) {
+      return 0;
+    }
+    int amLeft = (int) Math.floor(opData.getPercent() * totalAm);
+    if (amLeft < 0) {
+      throw new IllegalArgumentException("Invalid amount " + amLeft
+          + " determined for operation type " + type.name());
+    }
+    return amLeft;
+  }
+
+  /**
+   * Sets up the operation using the given configuration by setting up the
+   * number of operations to perform (and how many are left) and setting up the
+   * operation objects to be used throughout selection.
+   * 
+   * @param cfg
+   *          ConfigExtractor.
+   */
+  private void configureOperations(ConfigExtractor cfg) {
+    operations = new TreeMap<OperationType, OperationInfo>();
+    Map<OperationType, OperationData> opinfo = cfg.getOperations();
+    int totalAm = cfg.getOpCount();
+    int opsLeft = totalAm;
+    NumberFormat formatter = Formatter.getPercentFormatter();
+    for (final OperationType type : opinfo.keySet()) {
+      OperationData opData = opinfo.get(type);
+      OperationInfo info = new OperationInfo();
+      info.distribution = opData.getDistribution();
+      int amLeft = determineHowMany(totalAm, opData, type);
+      opsLeft -= amLeft;
+      LOG
+          .info(type.name() + " has " + amLeft + " initial operations out of "
+              + totalAm + " for its ratio "
+              + formatter.format(opData.getPercent()));
+      info.amountLeft = amLeft;
+      Operation op = factory.getOperation(type);
+      // wrap operation in finalizer so that amount left gets decrements when
+      // its done
+      if (op != null) {
+        Observer fn = new Observer() {
+          public void notifyFinished(Operation op) {
+            OperationInfo opInfo = operations.get(type);
+            if (opInfo != null) {
+              --opInfo.amountLeft;
+            }
+          }
+
+          public void notifyStarting(Operation op) {
+          }
+        };
+        info.operation = new ObserveableOp(op, fn);
+        operations.put(type, info);
+      }
+    }
+    if (opsLeft > 0) {
+      LOG
+          .info(opsLeft
+              + " left over operations found (due to inability to support partial operations)");
+    }
+  }
+
+  /**
+   * Selects an operation from the known operation set or returns null if none
+   * are available by applying the weighting algorithms and then handing off the
+   * weight operations to the selection object.
+   * 
+   * @param elapsed
+   *          the currently elapsed time (milliseconds) of the running program
+   * @param duration
+   *          the maximum amount of milliseconds of the running program
+   * 
+   * @return operation or null if none left
+   */
+  Operation select(int elapsed, int duration) {
+    List<OperationWeight> validOps = new ArrayList<OperationWeight>(operations
+        .size());
+    for (OperationType type : operations.keySet()) {
+      OperationInfo opinfo = operations.get(type);
+      if (opinfo == null || opinfo.amountLeft <= 0) {
+        continue;
+      }
+      Weightable weighter = weights.get(opinfo.distribution);
+      if (weighter != null) {
+        OperationWeight weightOp = new OperationWeight(opinfo.operation,
+            weighter.weight(elapsed, duration));
+        validOps.add(weightOp);
+      } else {
+        throw new RuntimeException("Unable to get weight for distribution "
+            + opinfo.distribution);
+      }
+    }
+    if (validOps.isEmpty()) {
+      return null;
+    }
+    return getSelector().select(validOps);
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java Fri May 21 00:02:12 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import org.apache.hadoop.fs.slive.WeightSelector.Weightable;
+
+/**
+ * Class to isolate the various weight algorithms we use.
+ */
+class Weights {
+  private Weights() {
+
+  }
+
+  /**
+   * A weight which always returns the same weight (1/3). Which will have an
+   * overall area of (1/3) unless otherwise provided.
+   */
+  static class UniformWeight implements Weightable {
+
+    private static Double DEFAULT_WEIGHT = (1.0d / 3.0d);
+
+    private Double weight;
+
+    UniformWeight(double w) {
+      weight = w;
+    }
+
+    UniformWeight() {
+      this(DEFAULT_WEIGHT);
+    }
+
+    @Override // Weightable
+    public Double weight(int elapsed, int duration) {
+      return weight;
+    }
+
+  }
+
+  /**
+   * A weight which normalized the elapsed time and the duration to a value
+   * between 0 and 1 and applies the algorithm to form an output using the
+   * function (-2 * (x-0.5)^2) + 0.5 which initially (close to 0) has a value
+   * close to 0 and near input being 1 has a value close to 0 and near 0.5 has a
+   * value close to 0.5 (with overall area 0.3).
+   */
+  static class MidWeight implements Weightable {
+
+    @Override // Weightable
+    public Double weight(int elapsed, int duration) {
+      double normalized = (double) elapsed / (double) duration;
+      double result = (-2.0d * Math.pow(normalized - 0.5, 2)) + 0.5d;
+      if (result < 0) {
+        result = 0;
+      }
+      if (result > 1) {
+        result = 1;
+      }
+      return result;
+    }
+
+  }
+
+  /**
+   * A weight which normalized the elapsed time and the duration to a value
+   * between 0 and 1 and applies the algorithm to form an output using the
+   * function (x)^2 which initially (close to 0) has a value close to 0 and near
+   * input being 1 has a value close to 1 (with overall area 1/3).
+   */
+  static class EndWeight implements Weightable {
+
+    @Override // Weightable
+    public Double weight(int elapsed, int duration) {
+      double normalized = (double) elapsed / (double) duration;
+      double result = Math.pow(normalized, 2);
+      if (result < 0) {
+        result = 0;
+      }
+      if (result > 1) {
+        result = 1;
+      }
+      return result;
+    }
+
+  }
+
+  /**
+   * A weight which normalized the elapsed time and the duration to a value
+   * between 0 and 1 and applies the algorithm to form an output using the
+   * function (x-1)^2 which initially (close to 0) has a value close to 1 and
+   * near input being 1 has a value close to 0 (with overall area 1/3).
+   */
+  static class BeginWeight implements Weightable {
+
+    @Override // Weightable
+    public Double weight(int elapsed, int duration) {
+      double normalized = (double) elapsed / (double) duration;
+      double result = Math.pow((normalized - 1), 2);
+      if (result < 0) {
+        result = 0;
+      }
+      if (result > 1) {
+        result = 1;
+      }
+      return result;
+    }
+
+  }
+
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message