hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r788608 [2/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/lib/aggregate/ src/java/org/apache/hadoop/mapreduce/lib/aggregate/ src/test/ src/test/mapred/org/apache/hadoop/mapred/...
Date Fri, 26 Jun 2009 06:43:34 GMT
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorDescriptor.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorDescriptor.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorDescriptor.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This interface defines the contract a value aggregator descriptor must
+ * support. Such a descriptor can be configured with a {@link Configuration}
+ * object. Its main function is to generate a list of aggregation-id/value 
+ * pairs. An aggregation id encodes an aggregation type which is used to 
+ * guide the way to aggregate the value in the reduce/combiner phrase of an
+ * Aggregate based job. 
+ * The mapper in an Aggregate based map/reduce job may create one or more of
+ * ValueAggregatorDescriptor objects at configuration time. For each input
+ * key/value pair, the mapper will use those objects to create aggregation
+ * id/value pairs.
+ * 
+ */
+public interface ValueAggregatorDescriptor {
+
+  public static final String TYPE_SEPARATOR = ":";
+
+  public static final Text ONE = new Text("1");
+
+  /**
+   * Generate a list of aggregation-id/value pairs for 
+   * the given key/value pair.
+   * This function is usually called by the mapper of an Aggregate based job.
+   * 
+   * @param key
+   *          input key
+   * @param val
+   *          input value
+   * @return a list of aggregation id/value pairs. An aggregation id encodes an
+   *         aggregation type which is used to guide the way to aggregate the
+   *         value in the reduce/combiner phrase of an Aggregate based job.
+   */
+  public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
+                                                          Object val);
+
+  /**
+   * Configure the object
+   * 
+   * @param conf
+   *          a Configuration object that may contain the information 
+   *          that can be used to configure the object.
+   */
+  public void configure(Configuration conf);
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJob.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+/**
+ * This is the main class for creating a map/reduce job using Aggregate
+ * framework. The Aggregate is a specialization of map/reduce framework,
+ * specializing for performing various simple aggregations.
+ * 
+ * Generally speaking, in order to implement an application using Map/Reduce
+ * model, the developer is to implement Map and Reduce functions (and possibly
+ * combine function). However, a lot of applications related to counting and
+ * statistics computing have very similar characteristics. Aggregate abstracts
+ * out the general patterns of these functions and implementing those patterns.
+ * In particular, the package provides generic mapper/redducer/combiner 
+ * classes, and a set of built-in value aggregators, and a generic utility 
+ * class that helps user create map/reduce jobs using the generic class. 
+ * The built-in aggregators include:
+ * 
+ * sum over numeric values count the number of distinct values compute the
+ * histogram of values compute the minimum, maximum, media,average, standard
+ * deviation of numeric values
+ * 
+ * The developer using Aggregate will need only to provide a plugin class
+ * conforming to the following interface:
+ * 
+ * public interface ValueAggregatorDescriptor { public ArrayList<Entry>
+ * generateKeyValPairs(Object key, Object value); public void
+ * configure(Configuration conf); }
+ * 
+ * The package also provides a base class, ValueAggregatorBaseDescriptor,
+ * implementing the above interface. The user can extend the base class and
+ * implement generateKeyValPairs accordingly.
+ * 
+ * The primary work of generateKeyValPairs is to emit one or more key/value
+ * pairs based on the input key/value pair. The key in an output key/value pair
+ * encode two pieces of information: aggregation type and aggregation id. The
+ * value will be aggregated onto the aggregation id according the aggregation
+ * type.
+ * 
+ * This class offers a function to generate a map/reduce job using Aggregate
+ * framework. The function takes the following parameters: input directory spec
+ * input format (text or sequence file) output directory a file specifying the
+ * user plugin class
+ * 
+ */
+public class ValueAggregatorJob {
+
+  public static JobControl createValueAggregatorJobs(String args[],
+    Class<? extends ValueAggregatorDescriptor>[] descriptors) 
+  throws IOException {
+    
+    JobControl theControl = new JobControl("ValueAggregatorJobs");
+    ArrayList<ControlledJob> dependingJobs = new ArrayList<ControlledJob>();
+    Configuration conf = new Configuration();
+    if (descriptors != null) {
+      conf = setAggregatorDescriptors(descriptors);
+    }
+    Job job = createValueAggregatorJob(conf, args);
+    ControlledJob cjob = new ControlledJob(job, dependingJobs);
+    theControl.addJob(cjob);
+    return theControl;
+  }
+
+  public static JobControl createValueAggregatorJobs(String args[]) 
+      throws IOException {
+    return createValueAggregatorJobs(args, null);
+  }
+  
+  /**
+   * Create an Aggregate based map/reduce job.
+   * 
+   * @param conf The configuration for job
+   * @param args the arguments used for job creation. Generic hadoop
+   * arguments are accepted.
+   * @return a Job object ready for submission.
+   * 
+   * @throws IOException
+   * @see GenericOptionsParser
+   */
+  public static Job createValueAggregatorJob(Configuration conf, String args[])
+      throws IOException {
+
+    GenericOptionsParser genericParser 
+      = new GenericOptionsParser(conf, args);
+    args = genericParser.getRemainingArgs();
+    
+    if (args.length < 2) {
+      System.out.println("usage: inputDirs outDir "
+          + "[numOfReducer [textinputformat|seq [specfile [jobName]]]]");
+      GenericOptionsParser.printGenericCommandUsage(System.out);
+      System.exit(2);
+    }
+    String inputDir = args[0];
+    String outputDir = args[1];
+    int numOfReducers = 1;
+    if (args.length > 2) {
+      numOfReducers = Integer.parseInt(args[2]);
+    }
+
+    Class<? extends InputFormat> theInputFormat = null;
+    if (args.length > 3 && 
+        args[3].compareToIgnoreCase("textinputformat") == 0) {
+      theInputFormat = TextInputFormat.class;
+    } else {
+      theInputFormat = SequenceFileInputFormat.class;
+    }
+
+    Path specFile = null;
+
+    if (args.length > 4) {
+      specFile = new Path(args[4]);
+    }
+
+    String jobName = "";
+    
+    if (args.length > 5) {
+      jobName = args[5];
+    }
+
+    if (specFile != null) {
+      conf.addResource(specFile);
+    }
+    String userJarFile = conf.get("user.jar.file");
+    if (userJarFile != null) {
+      conf.set("mapred.jar", userJarFile);
+    }
+
+    Job theJob = new Job(conf);
+    if (userJarFile == null) {
+      theJob.setJarByClass(ValueAggregator.class);
+    } 
+    theJob.setJobName("ValueAggregatorJob: " + jobName);
+
+    FileInputFormat.addInputPaths(theJob, inputDir);
+
+    theJob.setInputFormatClass(theInputFormat);
+    
+    theJob.setMapperClass(ValueAggregatorMapper.class);
+    FileOutputFormat.setOutputPath(theJob, new Path(outputDir));
+    theJob.setOutputFormatClass(TextOutputFormat.class);
+    theJob.setMapOutputKeyClass(Text.class);
+    theJob.setMapOutputValueClass(Text.class);
+    theJob.setOutputKeyClass(Text.class);
+    theJob.setOutputValueClass(Text.class);
+    theJob.setReducerClass(ValueAggregatorReducer.class);
+    theJob.setCombinerClass(ValueAggregatorCombiner.class);
+    theJob.setNumReduceTasks(numOfReducers);
+    return theJob;
+  }
+
+  public static Job createValueAggregatorJob(String args[], 
+      Class<? extends ValueAggregatorDescriptor>[] descriptors) 
+      throws IOException {
+    return createValueAggregatorJob(
+             setAggregatorDescriptors(descriptors), args);
+  }
+  
+  public static Configuration setAggregatorDescriptors(
+      Class<? extends ValueAggregatorDescriptor>[] descriptors) {
+    Configuration conf = new Configuration();
+    conf.setInt("aggregator.descriptor.num", descriptors.length);
+    //specify the aggregator descriptors
+    for(int i=0; i< descriptors.length; i++) {
+      conf.set("aggregator.descriptor." + i, 
+               "UserDefined," + descriptors[i].getName());
+    }
+    return conf;
+  }
+  
+  /**
+   * create and run an Aggregate based map/reduce job.
+   * 
+   * @param args the arguments used for job creation
+   * @throws IOException
+   */
+  public static void main(String args[]) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = ValueAggregatorJob.createValueAggregatorJob(
+                new Configuration(), args);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+    System.exit(ret);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorJobBase.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This abstract class implements some common functionalities of the
+ * the generic mapper, reducer and combiner classes of Aggregate.
+ */
+public class ValueAggregatorJobBase<K1 extends WritableComparable<?>,
+                                             V1 extends Writable>
+{
+
+  protected static ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList =
null;
+
+  public static void setup(Configuration job) {
+    initializeMySpec(job);
+    logSpec();
+  }
+
+  protected static ValueAggregatorDescriptor getValueAggregatorDescriptor(
+      String spec, Configuration conf) {
+    if (spec == null)
+      return null;
+    String[] segments = spec.split(",", -1);
+    String type = segments[0];
+    if (type.compareToIgnoreCase("UserDefined") == 0) {
+      String className = segments[1];
+      return new UserDefinedValueAggregatorDescriptor(className, conf);
+    }
+    return null;
+  }
+
+  protected static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(
+      Configuration conf) {
+    String advn = "aggregator.descriptor";
+    int num = conf.getInt(advn + ".num", 0);
+    ArrayList<ValueAggregatorDescriptor> retv = 
+      new ArrayList<ValueAggregatorDescriptor>(num);
+    for (int i = 0; i < num; i++) {
+      String spec = conf.get(advn + "." + i);
+      ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, conf);
+      if (ad != null) {
+        retv.add(ad);
+      }
+    }
+    return retv;
+  }
+
+  private static void initializeMySpec(Configuration conf) {
+    aggregatorDescriptorList = getAggregatorDescriptors(conf);
+    if (aggregatorDescriptorList.size() == 0) {
+      aggregatorDescriptorList
+          .add(new UserDefinedValueAggregatorDescriptor(
+              ValueAggregatorBaseDescriptor.class.getCanonicalName(), conf));
+    }
+  }
+
+  protected static void logSpec() {
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorMapper.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorMapper.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorMapper.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * This class implements the generic mapper of Aggregate.
+ */
+public class ValueAggregatorMapper<K1 extends WritableComparable<?>,
+                                   V1 extends Writable>
+  extends Mapper<K1, V1, Text, Text> {
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    ValueAggregatorJobBase.setup(context.getConfiguration());
+  }
+  
+  /**
+   *  the map function. It iterates through the value aggregator descriptor 
+   *  list to generate aggregation id/value pairs and emit them.
+   */
+  public void map(K1 key, V1 value,
+      Context context) throws IOException, InterruptedException  {
+
+    Iterator<?> iter = 
+      ValueAggregatorJobBase.aggregatorDescriptorList.iterator();
+    while (iter.hasNext()) {
+      ValueAggregatorDescriptor ad = (ValueAggregatorDescriptor) iter.next();
+      Iterator<Entry<Text, Text>> ens =
+        ad.generateKeyValPairs(key, value).iterator();
+      while (ens.hasNext()) {
+        Entry<Text, Text> en = ens.next();
+        context.write(en.getKey(), en.getValue());
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueAggregatorReducer.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This class implements the generic reducer of Aggregate.
+ */
+public class ValueAggregatorReducer<K1 extends WritableComparable<?>,
+                                    V1 extends Writable>
+  extends Reducer<Text, Text, Text, Text> {
+
+  public void setup(Context context) 
+      throws IOException, InterruptedException {
+    ValueAggregatorJobBase.setup(context.getConfiguration());
+  }
+
+  /**
+   * @param key
+   *        the key is expected to be a Text object, whose prefix indicates
+   *        the type of aggregation to aggregate the values. In effect, data
+   *        driven computing is achieved. It is assumed that each aggregator's
+   *        getReport method emits appropriate output for the aggregator. This
+   *        may be further customized.
+   * @param values the values to be aggregated
+   * @param context 
+   */
+  public void reduce(Text key, Iterable<Text> values,
+      Context context) throws IOException, InterruptedException {
+    String keyStr = key.toString();
+    int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
+    String type = keyStr.substring(0, pos);
+    keyStr = keyStr.substring(pos + 
+               ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
+    long uniqCount = context.getConfiguration().
+      getLong("aggregate.max.num.unique.values", Long.MAX_VALUE);
+    ValueAggregator aggregator = ValueAggregatorBaseDescriptor
+      .generateValueAggregator(type, uniqCount);
+    for (Text value : values) {
+      aggregator.addNextValue(value);
+    }
+
+    String val = aggregator.getReport();
+    key = new Text(keyStr);
+    context.write(key, new Text(val));
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueHistogram.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueHistogram.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/ValueHistogram.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Arrays;
+
+
+/**
+ * This class implements a value aggregator that computes the 
+ * histogram of a sequence of strings.
+ * 
+ */
+public class ValueHistogram implements ValueAggregator<String> {
+
+  TreeMap<Object, Object> items = null;
+
+  public ValueHistogram() {
+    items = new TreeMap<Object, Object>();
+  }
+
+  /**
+   * add the given val to the aggregator.
+   * 
+   * @param val the value to be added. It is expected to be a string
+   * in the form of xxxx\tnum, meaning xxxx has num occurrences.
+   */
+  public void addNextValue(Object val) {
+    String valCountStr = val.toString();
+    int pos = valCountStr.lastIndexOf("\t");
+    String valStr = valCountStr;
+    String countStr = "1";
+    if (pos >= 0) {
+      valStr = valCountStr.substring(0, pos);
+      countStr = valCountStr.substring(pos + 1);
+    }
+    
+    Long count = (Long) this.items.get(valStr);
+    long inc = Long.parseLong(countStr);
+
+    if (count == null) {
+      count = inc;
+    } else {
+      count = count.longValue() + inc;
+    }
+    items.put(valStr, count);
+  }
+
+  /**
+   * @return the string representation of this aggregator.
+   * It includes the following basic statistics of the histogram:
+   *    the number of unique values
+   *    the minimum value
+   *    the media value
+   *    the maximum value
+   *    the average value
+   *    the standard deviation
+   */
+  public String getReport() {
+    long[] counts = new long[items.size()];
+
+    StringBuffer sb = new StringBuffer();
+    Iterator<Object> iter = items.values().iterator();
+    int i = 0;
+    while (iter.hasNext()) {
+      Long count = (Long) iter.next();
+      counts[i] = count.longValue();
+      i += 1;
+    }
+    Arrays.sort(counts);
+    sb.append(counts.length);
+    i = 0;
+    long acc = 0;
+    while (i < counts.length) {
+      long nextVal = counts[i];
+      int j = i + 1;
+      while (j < counts.length && counts[j] == nextVal) {
+        j++;
+      }
+      acc += nextVal * (j - i);
+      i = j;
+    }
+    double average = 0.0;
+    double sd = 0.0;
+    if (counts.length > 0) {
+      sb.append("\t").append(counts[0]);
+      sb.append("\t").append(counts[counts.length / 2]);
+      sb.append("\t").append(counts[counts.length - 1]);
+
+      average = acc * 1.0 / counts.length;
+      sb.append("\t").append(average);
+
+      i = 0;
+      while (i < counts.length) {
+        double nextDiff = counts[i] - average;
+        sd += nextDiff * nextDiff;
+        i += 1;
+      }
+      sd = Math.sqrt(sd / counts.length);
+      sb.append("\t").append(sd);
+
+    }
+    return sb.toString();
+  }
+
+  /** 
+   * 
+   * @return a string representation of the list of value/frequence pairs of 
+   * the histogram
+   */
+  public String getReportDetails() {
+    StringBuffer sb = new StringBuffer();
+    Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry<Object,Object> en = (Entry<Object,Object>) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      sb.append("\t").append(val.toString()).append("\t").
+         append(count.longValue()).append("\n");
+    }
+    return sb.toString();
+  }
+
+  /**
+   *  @return a list value/frequence pairs.
+   *  The return value is expected to be used by the reducer.
+   */
+  public ArrayList<String> getCombinerOutput() {
+    ArrayList<String> retv = new ArrayList<String>();
+    Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
+
+    while (iter.hasNext()) {
+      Entry<Object,Object> en = (Entry<Object,Object>) iter.next();
+      Object val = en.getKey();
+      Long count = (Long) en.getValue();
+      retv.add(val.toString() + "\t" + count.longValue());
+    }
+    return retv;
+  }
+
+  /** 
+   * 
+   * @return a TreeMap representation of the histogram
+   */
+  public TreeMap<Object,Object> getReportItems() {
+    return items;
+  }
+
+  /** 
+   * reset the aggregator
+   */
+  public void reset() {
+    items = new TreeMap<Object, Object>();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/package.html
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/package.html?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/package.html
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/aggregate/package.html
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,199 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+Classes for performing various counting and aggregations.
+<p />
+<h2><a name="Aggregate"></a>Aggregate framework </h2>
+<p />
+Generally speaking, in order to implement an application using Map/Reduce
+model, the developer needs to implement Map and Reduce functions (and possibly
+Combine function). However, for a lot of applications related to counting and
+statistics computing, these functions have very similar
+characteristics. This provides a package implementing 
+those patterns. In particular, the package provides a generic mapper class,
+a reducer class and a combiner class, and a set of built-in value aggregators.
+It also provides a generic utility class, ValueAggregatorJob, that offers 
+a static function that creates map/reduce jobs:
+<blockquote>
+<pre>
+public static Job createValueAggregatorJob(String args&#91;]) throws IOException;
+</pre>
+</blockquote>
+To call this function, the user needs to pass in arguments specifying the input
+directories, the output directory, number of reducers, the input data format 
+(textinputformat or sequencefileinputformat), and a file specifying user plugin
+class(es) to load by the mapper. 
+A user plugin class is responsible for specifying what 
+aggregators to use and what values are for which aggregators. 
+A plugin class must implement the following interface:
+<blockquote>
+<pre>
+ public interface ValueAggregatorDescriptor { 
+     public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object value);

+     public void configure(Configuration conf); 
+} 
+</pre>
+</blockquote>
+Function generateKeyValPairs will generate aggregation key/value pairs for the
+input key/value pair. Each aggregation key encodes two pieces of information: 
+the aggregation type and aggregation ID.
+The value is the value to be aggregated onto the aggregation ID according 
+to the aggregation type. Here is a simple example user plugin class for 
+counting the words in the input texts:
+<blockquote>
+<pre>
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor { 
+    public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+        String words &#91;] &#61; val.toString().split(&#34; &#124;\t&#34;);
+        ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
+        for (int i &#61; 0; i &#60; words.length; i++) {
+            retv.add(generateEntry(LONG&#95;VALUE&#95;SUM, words&#91;i], ONE))
+        }
+        return retv;
+    }
+    public void configure(Configuration conf) {}
+} 
+</pre>
+</blockquote>
+In the above code, LONG_VALUE_SUM is a string denoting the aggregation type 
+LongValueSum, which sums over long values. ONE denotes a string "1". 
+Function generateEntry(LONG_VALUE_SUM, words[i], ONE) will inperpret the first
+argument as an aggregation type, the second as an aggregation ID, and the the 
+third argumnent as the value to be aggregated. The output will look like: 
+"LongValueSum:xxxx", where XXXX is the string value of words[i]. The value will
+be "1". The mapper will call generateKeyValPairs(Object key, Object val)  for 
+each input key/value pair to generate the desired aggregation id/value pairs. 
+The down stream combiner/reducer will interpret these pairs 
+as adding one to the aggregator XXXX.
+<p />
+Class ValueAggregatorBaseDescriptor is a base class that user plugin classes 
+can extend. Here is the XML fragment specifying the user plugin class:
+<blockquote>
+<pre>
+&#60;property&#62;
+    &#60;name&#62;aggregator.descriptor.num&#60;/name&#62;
+    &#60;value&#62;1&#60;/value&#62;
+&#60;/property&#62;
+&#60;property&#62;
+   &#60;name&#62;aggregator.descriptor.0&#60;/name&#62;
+   &#60;value&#62;UserDefined,org.apache.hadoop.mapreduce.lib.aggregate.examples.WordCountAggregatorDescriptor&#60;/value&#62;
+&#60;/property&#62; 
+</pre>
+</blockquote>
+Class ValueAggregatorBaseDescriptor itself provides a 
+default implementation for  generateKeyValPairs:
+<blockquote>
+<pre>
+public ArrayList&#60;Entry&#62; generateKeyValPairs(Object key, Object val) {
+   ArrayList&#60;Entry&#62; retv &#61; new ArrayList&#60;Entry&#62;();
    
+   String countType &#61; LONG&#95;VALUE&#95;SUM;
+   String id &#61; &#34;record&#95;count&#34;;
+   retv.add(generateEntry(countType, id, ONE));
+   return retv;
+}
+</pre>
+</blockquote>
+Thus, if no user plugin class is specified, the default behavior of the
+map/reduce job is to count the number of records (lines) in the imput files.
+<p />
+During runtime, the mapper will invoke the generateKeyValPairs function for 
+each input key/value pair, and emit the generated key/value pairs:
+<blockquote>
+<pre>
+public void map(WritableComparable key, Writable value,
+            Context context) throws IOException {
+   Iterator iter &#61; this.aggregatorDescriptorList.iterator();
+   while (iter.hasNext()) {
+       ValueAggregatorDescriptor ad &#61; (ValueAggregatorDescriptor) iter.next();
+       Iterator&#60;Entry&#62; ens &#61; ad.generateKeyValPairs(key, value).iterator();
+       while (ens.hasNext()) {
+           Entry en &#61; ens.next();
+           context.write((WritableComparable)en.getKey(), (Writable)en.getValue());
+       }
+   }
+}
+</pre>
+</blockquote>
+The reducer will create an aggregator object for each key/value list pair, 
+and perform the appropriate aggregation.
+At the end, it will emit the aggregator's results:
+<blockquote>
+<pre>
+public void reduce(WritableComparable key, Iterator values,
+            Context context) throws IOException {
+   String keyStr &#61; key.toString();
+   int pos &#61; keyStr.indexOf(ValueAggregatorDescriptor.TYPE&#95;SEPARATOR);
+   String type &#61; keyStr.substring(0,pos);
+   keyStr &#61; keyStr.substring(pos+ValueAggregatorDescriptor.TYPE&#95;SEPARATOR.length());
      
+   ValueAggregator aggregator &#61; 
+       ValueAggregatorBaseDescriptor.generateValueAggregator(type);
+   for (Text value : values) {
+     aggregator.addNextValue(value);
+   }
+   String val &#61; aggregator.getReport();
+   key &#61; new Text(keyStr);
+   context.write(key, new Text(val)); 
+}
+</pre>
+</blockquote>
+In order to be able to use combiner, all the aggregation type be 
+aggregators must be associative and communitive.
+The following are the types supported: <ul>
+<li> LongValueSum: sum over long values 
+</li> <li> DoubleValueSum: sum over float/double values 
+</li> <li> uniqValueCount: count the number of distinct values 
+</li> <li> ValueHistogram: compute the histogram of values compute the minimum,
+ maximum, media,average, standard deviation of numeric values
+</li></ul> 
+<p />
+<h2><a name="Create_and_run"></a> Create and run an application </h2>
+<p />
+To create an application, the user needs to do the following things:
+<p />
+1. Implement a user plugin:
+<blockquote>
+<pre>
+import org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorBaseDescriptor;
+
+public class WordCountAggregatorDescriptor extends ValueAggregatorBaseDescriptor {
+   public void map(WritableComparable key, Writable value, Context cpntext) throws IOException
{
+   }
+    public void configure(Configuration conf) {}
+}
+</pre>
+</blockquote>
+
+2. Create an xml file specifying the user plugin.
+<p />
+3. Compile your java class and create a jar file, say wc.jar.
+
+<p />
+Finally, run the job:
+<blockquote>
+<pre>
+        hadoop jar wc.jar org.apache.hadoop.mapreduce.lib.aggregate.ValueAggregatorJob indirs
outdir numofreducers textinputformat|sequencefileinputformat spec_file
+</pre>
+</blockquote>
+<p />
+
+
+</body>
+</html>

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=788608&r1=788607&r2=788608&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Fri Jun 26 06:43:33 2009
@@ -48,6 +48,10 @@
        <Class name="~org.apache.hadoop.mapred.*" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
      </Match>
+     <Match>
+       <Class name="~org.apache.hadoop.mapred.lib.aggregate.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
      <!--
        Ignore warnings for usage of System.exit. This is
        required and have been well thought out

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java?rev=788608&r1=788607&r2=788608&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/aggregate/AggregatorTests.java
Fri Jun 26 06:43:33 2009
@@ -36,7 +36,7 @@
       long numVal = Long.parseLong(word);
       countType = LONG_VALUE_SUM;
       id = "count_" + word;
-      e = generateEntry(countType, id, ONE);
+      e = generateEntry(countType, id, ValueAggregatorDescriptor.ONE);
       if (e != null) {
         retv.add(e);
       }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/AggregatorTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/AggregatorTests.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/AggregatorTests.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/AggregatorTests.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import org.apache.hadoop.io.Text;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+public class AggregatorTests extends ValueAggregatorBaseDescriptor {
+  
+  public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key, Object
val) {
+    ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
+    String [] words = val.toString().split(" ");
+    
+    String countType;
+    String id;
+    Entry<Text, Text> e;
+    
+    for (String word: words) {
+      long numVal = Long.parseLong(word);
+      countType = LONG_VALUE_SUM;
+      id = "count_" + word;
+      e = generateEntry(countType, id, ONE);
+      if (e != null) {
+        retv.add(e);
+      }
+      countType = LONG_VALUE_MAX;
+      id = "max";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = LONG_VALUE_MIN;
+      id = "min";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = STRING_VALUE_MAX;
+      id = "value_as_string_max";
+      e = generateEntry(countType, id, new Text(""+numVal));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = STRING_VALUE_MIN;
+      id = "value_as_string_min";
+      e = generateEntry(countType, id, new Text(""+numVal));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = UNIQ_VALUE_COUNT;
+      id = "uniq_count";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+      
+      countType = VALUE_HISTOGRAM;
+      id = "histogram";
+      e = generateEntry(countType, id, new Text(word));
+      if (e != null) {
+        retv.add(e);
+      }
+    }
+    return retv;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java?rev=788608&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/aggregate/TestMapReduceAggregates.java
Fri Jun 26 06:43:33 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.aggregate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.text.NumberFormat;
+
+public class TestMapReduceAggregates extends TestCase {
+
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+    static {
+      idFormat.setMinimumIntegerDigits(4);
+      idFormat.setGroupingUsed(false);
+  }
+
+
+  public void testAggregates() throws Exception {
+    launch();
+  }
+
+  public static void launch() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    int numOfInputLines = 20;
+
+    Path OUTPUT_DIR = new Path("build/test/output_for_aggregates_test");
+    Path INPUT_DIR = new Path("build/test/input_for_aggregates_test");
+    String inputFile = "input.txt";
+    fs.delete(INPUT_DIR, true);
+    fs.mkdirs(INPUT_DIR);
+    fs.delete(OUTPUT_DIR, true);
+
+    StringBuffer inputData = new StringBuffer();
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append("max\t19\n");
+    expectedOutput.append("min\t1\n"); 
+
+    FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
+    for (int i = 1; i < numOfInputLines; i++) {
+      expectedOutput.append("count_").append(idFormat.format(i));
+      expectedOutput.append("\t").append(i).append("\n");
+
+      inputData.append(idFormat.format(i));
+      for (int j = 1; j < i; j++) {
+        inputData.append(" ").append(idFormat.format(i));
+      }
+      inputData.append("\n");
+    }
+    expectedOutput.append("value_as_string_max\t9\n");
+    expectedOutput.append("value_as_string_min\t1\n");
+    expectedOutput.append("uniq_count\t15\n");
+
+
+    fileOut.write(inputData.toString().getBytes("utf-8"));
+    fileOut.close();
+
+    System.out.println("inputData:");
+    System.out.println(inputData.toString());
+
+    conf.setInt("aggregator.descriptor.num", 1);
+    conf.set("aggregator.descriptor.0", 
+      "UserDefined,org.apache.hadoop.mapreduce.lib.aggregate.AggregatorTests");
+    conf.setLong("aggregate.max.num.unique.values", 14);
+    
+    Job job = new Job(conf);
+    FileInputFormat.setInputPaths(job, INPUT_DIR);
+    job.setInputFormatClass(TextInputFormat.class);
+    FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(1);
+    job.setMapperClass(ValueAggregatorMapper.class);
+    job.setReducerClass(ValueAggregatorReducer.class);
+    job.setCombinerClass(ValueAggregatorCombiner.class);
+
+
+    job.waitForCompletion(true);
+
+    assertTrue(job.isSuccessful());
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    String outdata = readOutput(OUTPUT_DIR, conf);
+    System.out.println("full out data:");
+    System.out.println(outdata.toString());
+    outdata = outdata.substring(0, expectedOutput.toString().length());
+
+    assertEquals(expectedOutput.toString(),outdata);
+    fs.delete(OUTPUT_DIR, true);
+    fs.delete(INPUT_DIR, true);
+  }
+
+  public static String readOutput(Path outDir, Configuration conf) 
+    throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    StringBuffer result = new StringBuffer();
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+                        new OutputLogFilter()));
+    for(int i=0; i < fileList.length; ++i) {
+      BufferedReader file = 
+        new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
+      String line = file.readLine();
+      while (line != null) {
+        result.append(line);
+        result.append("\n");
+        line = file.readLine();
+      }
+      file.close();
+    }
+    return result.toString();
+  }
+  
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    launch();
+  }
+}



Mime
View raw message