mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeast...@apache.org
Subject svn commit: r1090881 [1/2] - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/common/commandline/ core/src/test/java/org/apache/mahout/clustering/canopy/ examples/src/main/java/org/apache/ma...
Date Sun, 10 Apr 2011 20:00:13 GMT
Author: jeastman
Date: Sun Apr 10 20:00:13 2011
New Revision: 1090881

URL: http://svn.apache.org/viewvc?rev=1090881&view=rev
Log:
MAHOUT-626: Added optional T3/T4 arguments to Canopy. Added new unit test. All tests run

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java Sun Apr 10 20:00:13 2011
@@ -35,198 +35,249 @@ import org.slf4j.LoggerFactory;
 
 public class CanopyClusterer {
 
-  private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class);
+	private static final Logger log = LoggerFactory
+			.getLogger(CanopyClusterer.class);
 
-  private int nextCanopyId;
+	private int nextCanopyId;
 
-  // the T1 distance threshold
-  private double t1;
+	// the T1 distance threshold
+	protected double t1;
 
-  // the T2 distance threshold
-  private double t2;
-
-  // the distance measure
-  private DistanceMeasure measure;
-
-  // private int nextClusterId = 0;
-
-  public CanopyClusterer(DistanceMeasure measure, double t1, double t2) {
-    this.t1 = t1;
-    this.t2 = t2;
-    this.measure = measure;
-  }
-
-  public CanopyClusterer(Configuration config) {
-    this.configure(config);
-  }
-
-  /**
-   * Configure the Canopy and its distance measure
-   */
-  public void configure(Configuration configuration) {
-    try {
-      ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-      measure = ccl.loadClass(configuration.get(CanopyConfigKeys.DISTANCE_MEASURE_KEY))
-          .asSubclass(DistanceMeasure.class).newInstance();
-      measure.configure(configuration);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException(e);
-    } catch (InstantiationException e) {
-      throw new IllegalStateException(e);
-    }
-    t1 = Double.parseDouble(configuration.get(CanopyConfigKeys.T1_KEY));
-    t2 = Double.parseDouble(configuration.get(CanopyConfigKeys.T2_KEY));
-    nextCanopyId = 0;
-  }
-
-  /** Configure the Canopy for unit tests */
-  public void config(DistanceMeasure aMeasure, double aT1, double aT2) {
-    measure = aMeasure;
-    t1 = aT1;
-    t2 = aT2;
-  }
-
-  /**
-   * This is the same algorithm as the reference but inverted to iterate over existing canopies instead of the
-   * points. Because of this it does not need to actually store the points, instead storing a total points
-   * vector and the number of points. From this a centroid can be computed.
-   * <p/>
-   * This method is used by the CanopyMapper, CanopyReducer and CanopyDriver.
-   * 
-   * @param point
-   *          the point to be added
-   * @param canopies
-   *          the List<Canopy> to be appended
-   */
-  public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
-    boolean pointStronglyBound = false;
-    for (Canopy canopy : canopies) {
-      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
-      if (dist < t1) {
-        log.debug("Added point: " + AbstractCluster.formatVector(point, null) + " to canopy: " + canopy.getIdentifier());
-        canopy.observe(point);
-      }
-      pointStronglyBound = pointStronglyBound || (dist < t2);
-    }
-    if (!pointStronglyBound) {
-      log.debug("Created new Canopy:" + nextCanopyId + " at center:" + AbstractCluster.formatVector(point, null));
-      canopies.add(new Canopy(point, nextCanopyId++, measure));
-    }
-  }
-
-  /**
-   * Emit the point to the closest Canopy
-   */
-  public void emitPointToClosestCanopy(Vector point,
-                                       Iterable<Canopy> canopies,
-                                       Mapper<?,?,IntWritable,WeightedVectorWritable>.Context context)
-    throws IOException, InterruptedException {
-    Canopy closest = findClosestCanopy(point, canopies);
-    context.write(new IntWritable(closest.getId()), new WeightedVectorWritable(1, point));
-    context.setStatus("Emit Closest Canopy ID:" + closest.getIdentifier());
-  }
-
-  protected Canopy findClosestCanopy(Vector point, Iterable<Canopy> canopies) {
-    double minDist = Double.MAX_VALUE;
-    Canopy closest = null;
-    // find closest canopy
-    for (Canopy canopy : canopies) {
-      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
-      if (dist < minDist) {
-        minDist = dist;
-        closest = canopy;
-      }
-    }
-    return closest;
-  }
-
-  /**
-   * Return if the point is covered by the canopy
-   * 
-   * @param point
-   *          a point
-   * @return if the point is covered
-   */
-  public boolean canopyCovers(Canopy canopy, Vector point) {
-    return measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point) < t1;
-  }
-
-  /**
-   * Iterate through the points, adding new canopies. Return the canopies.
-   * 
-   * @param points
-   *          a list<Vector> defining the points to be clustered
-   * @param measure
-   *          a DistanceMeasure to use
-   * @param t1
-   *          the T1 distance threshold
-   * @param t2
-   *          the T2 distance threshold
-   * @return the List<Canopy> created
-   */
-  public static List<Canopy> createCanopies(List<Vector> points, DistanceMeasure measure, double t1, double t2) {
-    List<Canopy> canopies = new ArrayList<Canopy>();
-    /**
-     * Reference Implementation: Given a distance metric, one can create canopies as follows: Start with a
-     * list of the data points in any order, and with two distance thresholds, T1 and T2, where T1 > T2.
-     * (These thresholds can be set by the user, or selected by cross-validation.) Pick a point on the list
-     * and measure its distance to all other points. Put all points that are within distance threshold T1 into
-     * a canopy. Remove from the list all points that are within distance threshold T2. Repeat until the list
-     * is empty.
-     */
-    int nextCanopyId = 0;
-    while (!points.isEmpty()) {
-      Iterator<Vector> ptIter = points.iterator();
-      Vector p1 = ptIter.next();
-      ptIter.remove();
-      Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
-      canopies.add(canopy);
-      while (ptIter.hasNext()) {
-        Vector p2 = ptIter.next();
-        double dist = measure.distance(p1, p2);
-        // Put all points that are within distance threshold T1 into the canopy
-        if (dist < t1) {
-          canopy.observe(p2);
-        }
-        // Remove from the list all points that are within distance threshold T2
-        if (dist < t2) {
-          ptIter.remove();
-        }
-      }
-      for (Canopy c : canopies) {
-        c.computeParameters();
-      }
-    }
-    return canopies;
-  }
-
-  /**
-   * Iterate through the canopies, adding their centroids to a list
-   * 
-   * @param canopies
-   *          a List<Canopy>
-   * @return the List<Vector>
-   */
-  public static List<Vector> getCenters(Iterable<Canopy> canopies) {
-    List<Vector> result = new ArrayList<Vector>();
-    for (Canopy canopy : canopies) {
-      result.add(canopy.getCenter());
-    }
-    return result;
-  }
-
-  /**
-   * Iterate through the canopies, resetting their center to their centroids
-   * 
-   * @param canopies
-   *          a List<Canopy>
-   */
-  public static void updateCentroids(Iterable<Canopy> canopies) {
-    for (Canopy canopy : canopies) {
-      canopy.computeParameters();
-    }
-  }
+	// the T2 distance threshold
+	protected double t2;
+
+	// the T3 distance threshold
+	private double t3;
+
+	// the T4 distance threshold
+	private double t4;
+
+	// the distance measure
+	private DistanceMeasure measure;
+
+	public CanopyClusterer(DistanceMeasure measure, double t1, double t2) {
+		this.t1 = t1;
+		this.t2 = t2;
+		this.t3 = t1;
+		this.t4 = t2;
+		this.measure = measure;
+	}
+
+	public CanopyClusterer(Configuration config) {
+		this.configure(config);
+	}
+
+	/**
+	 * Configure the Canopy and its distance measure
+	 * 
+	 * @param configuration
+	 *            the Configuration
+	 */
+	public void configure(Configuration configuration) {
+		try {
+			ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+			measure = ccl.loadClass(
+					configuration.get(CanopyConfigKeys.DISTANCE_MEASURE_KEY))
+					.asSubclass(DistanceMeasure.class).newInstance();
+			measure.configure(configuration);
+		} catch (ClassNotFoundException e) {
+			throw new IllegalStateException(e);
+		} catch (IllegalAccessException e) {
+			throw new IllegalStateException(e);
+		} catch (InstantiationException e) {
+			throw new IllegalStateException(e);
+		}
+		t1 = Double.parseDouble(configuration.get(CanopyConfigKeys.T1_KEY));
+		t2 = Double.parseDouble(configuration.get(CanopyConfigKeys.T2_KEY));
+		t3 = t1;
+		String d = configuration.get(CanopyConfigKeys.T3_KEY);
+		if (d != null)
+			t3 = Double.parseDouble(d);
+		t4 = t2;
+		d = configuration.get(CanopyConfigKeys.T4_KEY);
+		if (d != null)
+			t4 = Double.parseDouble(d);
+		nextCanopyId = 0;
+	}
+
+	/**
+	 * Used by CanopyReducer to set t1=t3 and t2=t4 configuration values
+	 */
+	public void useT3T4() {
+		t1 = t3;
+		t2 = t4;
+	}
+
+	/**
+	 * Configure the Canopy for unit tests
+	 * 
+	 * @param aMeasure
+	 *            the DistanceMeasure
+	 * @param aT1
+	 *            the T1 distance threshold
+	 * @param aT2
+	 *            the T2 distance threshold
+	 * */
+	public void config(DistanceMeasure aMeasure, double aT1, double aT2) {
+		measure = aMeasure;
+		t1 = aT1;
+		t2 = aT2;
+		t3 = t1;
+		t4 = t2;
+	}
+
+	/**
+	 * This is the same algorithm as the reference but inverted to iterate over
+	 * existing canopies instead of the points. Because of this it does not need
+	 * to actually store the points, instead storing a total points vector and
+	 * the number of points. From this a centroid can be computed.
+	 * <p/>
+	 * This method is used by the CanopyMapper, CanopyReducer and CanopyDriver.
+	 * 
+	 * @param point
+	 *            the point to be added
+	 * @param canopies
+	 *            the List<Canopy> to be appended
+	 */
+	public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
+		boolean pointStronglyBound = false;
+		for (Canopy canopy : canopies) {
+			double dist = measure.distance(canopy.getCenter()
+					.getLengthSquared(), canopy.getCenter(), point);
+			if (dist < t1) {
+				log.debug("Added point: "
+						+ AbstractCluster.formatVector(point, null)
+						+ " to canopy: " + canopy.getIdentifier());
+				canopy.observe(point);
+			}
+			pointStronglyBound = pointStronglyBound || (dist < t2);
+		}
+		if (!pointStronglyBound) {
+			log.debug("Created new Canopy:" + nextCanopyId + " at center:"
+					+ AbstractCluster.formatVector(point, null));
+			canopies.add(new Canopy(point, nextCanopyId++, measure));
+		}
+	}
+
+	/**
+	 * Emit the point to the closest Canopy
+	 */
+	public void emitPointToClosestCanopy(Vector point,
+			Iterable<Canopy> canopies,
+			Mapper<?, ?, IntWritable, WeightedVectorWritable>.Context context)
+			throws IOException, InterruptedException {
+		Canopy closest = findClosestCanopy(point, canopies);
+		context.write(new IntWritable(closest.getId()),
+				new WeightedVectorWritable(1, point));
+		context.setStatus("Emit Closest Canopy ID:" + closest.getIdentifier());
+	}
+
+	protected Canopy findClosestCanopy(Vector point, Iterable<Canopy> canopies) {
+		double minDist = Double.MAX_VALUE;
+		Canopy closest = null;
+		// find closest canopy
+		for (Canopy canopy : canopies) {
+			double dist = measure.distance(canopy.getCenter()
+					.getLengthSquared(), canopy.getCenter(), point);
+			if (dist < minDist) {
+				minDist = dist;
+				closest = canopy;
+			}
+		}
+		return closest;
+	}
+
+	/**
+	 * Return if the point is covered by the canopy
+	 * 
+	 * @param point
+	 *            a point
+	 * @return if the point is covered
+	 */
+	public boolean canopyCovers(Canopy canopy, Vector point) {
+		return measure.distance(canopy.getCenter().getLengthSquared(), canopy
+				.getCenter(), point) < t1;
+	}
+
+	/**
+	 * Iterate through the points, adding new canopies. Return the canopies.
+	 * 
+	 * @param points
+	 *            a list<Vector> defining the points to be clustered
+	 * @param measure
+	 *            a DistanceMeasure to use
+	 * @param t1
+	 *            the T1 distance threshold
+	 * @param t2
+	 *            the T2 distance threshold
+	 * @return the List<Canopy> created
+	 */
+	public static List<Canopy> createCanopies(List<Vector> points,
+			DistanceMeasure measure, double t1, double t2) {
+		List<Canopy> canopies = new ArrayList<Canopy>();
+		/**
+		 * Reference Implementation: Given a distance metric, one can create
+		 * canopies as follows: Start with a list of the data points in any
+		 * order, and with two distance thresholds, T1 and T2, where T1 > T2.
+		 * (These thresholds can be set by the user, or selected by
+		 * cross-validation.) Pick a point on the list and measure its distance
+		 * to all other points. Put all points that are within distance
+		 * threshold T1 into a canopy. Remove from the list all points that are
+		 * within distance threshold T2. Repeat until the list is empty.
+		 */
+		int nextCanopyId = 0;
+		while (!points.isEmpty()) {
+			Iterator<Vector> ptIter = points.iterator();
+			Vector p1 = ptIter.next();
+			ptIter.remove();
+			Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
+			canopies.add(canopy);
+			while (ptIter.hasNext()) {
+				Vector p2 = ptIter.next();
+				double dist = measure.distance(p1, p2);
+				// Put all points that are within distance threshold T1 into the
+				// canopy
+				if (dist < t1) {
+					canopy.observe(p2);
+				}
+				// Remove from the list all points that are within distance
+				// threshold T2
+				if (dist < t2) {
+					ptIter.remove();
+				}
+			}
+			for (Canopy c : canopies) {
+				c.computeParameters();
+			}
+		}
+		return canopies;
+	}
+
+	/**
+	 * Iterate through the canopies, adding their centroids to a list
+	 * 
+	 * @param canopies
+	 *            a List<Canopy>
+	 * @return the List<Vector>
+	 */
+	public static List<Vector> getCenters(Iterable<Canopy> canopies) {
+		List<Vector> result = new ArrayList<Vector>();
+		for (Canopy canopy : canopies) {
+			result.add(canopy.getCenter());
+		}
+		return result;
+	}
+
+	/**
+	 * Iterate through the canopies, resetting their center to their centroids
+	 * 
+	 * @param canopies
+	 *            a List<Canopy>
+	 */
+	public static void updateCentroids(Iterable<Canopy> canopies) {
+		for (Canopy canopy : canopies) {
+			canopy.computeParameters();
+		}
+	}
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java Sun Apr 10 20:00:13 2011
@@ -18,11 +18,18 @@
 package org.apache.mahout.clustering.canopy;
 
 public interface CanopyConfigKeys {
-  
+
   String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+
   String CANOPY_PATH_KEY = "org.apache.mahout.clustering.canopy.path";
+
   String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+
+  String T3_KEY = "org.apache.mahout.clustering.canopy.t3";
+
+  String T4_KEY = "org.apache.mahout.clustering.canopy.t4";
+
   // keys used by Driver, Mapper, Combiner & Reducer
   String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Sun Apr 10 20:00:13 2011
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -68,6 +70,8 @@ public class CanopyDriver extends Abstra
     addOption(DefaultOptionCreator.distanceMeasureOption().create());
     addOption(DefaultOptionCreator.t1Option().create());
     addOption(DefaultOptionCreator.t2Option().create());
+    addOption(DefaultOptionCreator.t3Option().create());
+    addOption(DefaultOptionCreator.t4Option().create());
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(DefaultOptionCreator.methodOption().create());
@@ -79,131 +83,212 @@ public class CanopyDriver extends Abstra
 
     Path input = getInputPath();
     Path output = getOutputPath();
+    Configuration conf = getConf();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
-      HadoopUtil.delete(getConf(), output);
+      HadoopUtil.delete(conf, output);
     }
     String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
     double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
     double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
+    double t3 = t1;
+    if (hasOption(DefaultOptionCreator.T3_OPTION))
+      t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION));
+    double t4 = t2;
+    if (hasOption(DefaultOptionCreator.T4_OPTION))
+      t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
-        DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
+        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
+    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
+        DistanceMeasure.class).newInstance();
 
-    run(getConf(), input, output, measure, t1, t2, runClustering, runSequential);
+    run(conf, input, output, measure, t1, t2, t3, t4, runClustering,
+        runSequential);
     return 0;
   }
 
   /**
-   * Build a directory of Canopy clusters from the input arguments and, if requested,
-   * cluster the input vectors using these clusters
-   * @param input the Path to the directory containing input vectors
-   * @param output the Path for all output directories
-   * @param t1 the double T1 distance metric
-   * @param t2 the double T2 distance metric
-   * @param runClustering cluster the input vectors if true
-   * @param runSequential execute sequentially if true
+   * Build a directory of Canopy clusters from the input arguments and, if
+   * requested, cluster the input vectors using these clusters
+   * 
+   * @param conf
+   *          the Configuration
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param runSequential
+   *          execute sequentially if true
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         boolean runClustering,
-                         boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
-    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, runSequential);
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      boolean runClustering, boolean runSequential) throws IOException,
+      InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
+    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
+        t4, runSequential);
     if (runClustering) {
-      clusterData(conf, input, clustersOut, output, measure, t1, t2, runSequential);
+      clusterData(conf, input, clustersOut, output, measure, t1, t2,
+          runSequential);
     }
   }
 
   /**
-   * Convenience method creates new Configuration()
-   * Build a directory of Canopy clusters from the input arguments and, if requested,
-   * cluster the input vectors using these clusters
-   * @param input the Path to the directory containing input vectors
-   * @param output the Path for all output directories
-   * @param t1 the double T1 distance metric
-   * @param t2 the double T2 distance metric
-   * @param runClustering cluster the input vectors if true
-   * @param runSequential execute sequentially if true
+   * Convenience method to provide backward compatibility
    */
-  public static void run(Path input,
-                         Path output,
-                         DistanceMeasure measure,
-                         double t1,
-                         double t2,
-                         boolean runClustering,
-                         boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException {
-    run(new Configuration(), input, output, measure, t1, t2, runClustering, runSequential);
+  public static void run(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, boolean runClustering,
+      boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    run(conf, input, output, measure, t1, t2, t1, t2, runClustering,
+        runSequential);
   }
 
   /**
-   * Build a directory of Canopy clusters from the input vectors and other arguments.
-   * Run sequential or mapreduce execution as requested
+   * Convenience method creates new Configuration() Build a directory of Canopy
+   * clusters from the input arguments and, if requested, cluster the input
+   * vectors using these clusters
    * 
-   * @param conf the Configuration to use
-   * @param input the Path to the directory containing input vectors
-   * @param output the Path for all output directories
-   * @param measure the DistanceMeasure 
-   * @param t1 the double T1 distance metric
-   * @param t2 the double T2 distance metric
-   * @param runSequential a boolean indicates to run the sequential (reference) algorithm
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param runClustering
+   *          cluster the input vectors if true
+   * @param runSequential
+   *          execute sequentially if true
+   */
+  public static void run(Path input, Path output, DistanceMeasure measure,
+      double t1, double t2, boolean runClustering, boolean runSequential)
+      throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    run(new Configuration(), input, output, measure, t1, t2, runClustering,
+        runSequential);
+  }
+
+  /**
+   * Convenience method for backwards compatibility
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, boolean runSequential)
+      throws InstantiationException, IllegalAccessException, IOException,
+      InterruptedException, ClassNotFoundException {
+    return buildClusters(conf, output, output, measure, t1, t2, t1, t2,
+        runSequential);
+  }
+
+  /**
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential or mapreduce execution as requested
+   * 
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
+   * @param runSequential
+   *          a boolean indicates to run the sequential (reference) algorithm
    * @return the canopy output directory Path
    */
-  public static Path buildClusters(Configuration conf,
-                                   Path input,
-                                   Path output,
-                                   DistanceMeasure measure,
-                                   double t1,
-                                   double t2,
-                                   boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
+  public static Path buildClusters(Configuration conf, Path input, Path output,
+      DistanceMeasure measure, double t1, double t2, double t3, double t4,
+      boolean runSequential) throws InstantiationException,
+      IllegalAccessException, IOException, InterruptedException,
+      ClassNotFoundException {
     log.info("Build Clusters Input: {} Out: {} " + "Measure: {} t1: {} t2: {}",
-             new Object[] { input, output, measure, t1, t2 });
+        new Object[] { input, output, measure, t1, t2 });
     if (runSequential) {
       return buildClustersSeq(input, output, measure, t1, t2);
     } else {
-      return buildClustersMR(conf, input, output, measure, t1, t2);
+      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4);
     }
   }
 
   /**
-   * Build a directory of Canopy clusters from the input vectors and other arguments.
-   * Run sequential execution
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run sequential execution
    * 
-   * @param input the Path to the directory containing input vectors
-   * @param output the Path for all output directories
-   * @param measure the DistanceMeasure 
-   * @param t1 the double T1 distance metric
-   * @param t2 the double T2 distance metric
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
    * @return the canopy output directory Path
    */
-  private static Path buildClustersSeq(Path input, Path output, DistanceMeasure measure, double t1, double t2)
-    throws IOException {
+  private static Path buildClustersSeq(Path input, Path output,
+      DistanceMeasure measure, double t1, double t2)
+      throws InstantiationException, IllegalAccessException, IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
     Collection<Canopy> canopies = new ArrayList<Canopy>();
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(input.toUri(), conf);
-    for (VectorWritable value :
-         new SequenceFileDirValueIterable<VectorWritable>(input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
-      clusterer.addPointToCanopies(value.get(), canopies);
+    FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
+          conf);
+      try {
+        Writable key = reader.getKeyClass().asSubclass(Writable.class)
+            .newInstance();
+        VectorWritable vw = reader.getValueClass().asSubclass(
+            VectorWritable.class).newInstance();
+        while (reader.next(key, vw)) {
+          clusterer.addPointToCanopies(vw.get(), canopies);
+          vw = reader.getValueClass().asSubclass(VectorWritable.class)
+              .newInstance();
+        }
+      } finally {
+        reader.close();
+      }
     }
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     Path path = new Path(canopyOutputDir, "part-r-00000");
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Canopy.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+        Text.class, Canopy.class);
     try {
       for (Canopy canopy : canopies) {
         canopy.computeParameters();
-        log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
-                  new Object[] {
-                      canopy.getIdentifier(),
-                      AbstractCluster.formatVector(canopy.getCenter(), null),
-                      canopy.getNumPoints(),
-                      AbstractCluster.formatVector(canopy.getRadius(), null)
-                  });
+        log.debug("Writing Canopy:" + canopy.getIdentifier() + " center:"
+            + AbstractCluster.formatVector(canopy.getCenter(), null)
+            + " numPoints:" + canopy.getNumPoints() + " radius:"
+            + AbstractCluster.formatVector(canopy.getRadius(), null));
         writer.append(new Text(canopy.getIdentifier()), canopy);
       }
     } finally {
@@ -213,28 +298,40 @@ public class CanopyDriver extends Abstra
   }
 
   /**
-   * Build a directory of Canopy clusters from the input vectors and other arguments.
-   * Run mapreduce execution
-   * @param conf 
+   * Build a directory of Canopy clusters from the input vectors and other
+   * arguments. Run mapreduce execution
+   * 
+   * @param conf
+   * @param input
+   *          the Path to the directory containing input vectors
+   * @param output
+   *          the Path for all output directories
+   * @param measure
+   *          the DistanceMeasure
+   * @param t1
+   *          the double T1 distance metric
+   * @param t2
+   *          the double T2 distance metric
+   * @param t3
+   *          the reducer's double T1 distance metric
+   * @param t4
+   *          the reducer's double T2 distance metric
    * 
-   * @param input the Path to the directory containing input vectors
-   * @param output the Path for all output directories
-   * @param measure the DistanceMeasure 
-   * @param t1 the double T1 distance metric
-   * @param t2 the double T2 distance metric
    * @return the canopy output directory Path
    */
-  private static Path buildClustersMR(Configuration conf,
-                                      Path input,
-                                      Path output,
-                                      DistanceMeasure measure,
-                                      double t1, double t2)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
+  private static Path buildClustersMR(Configuration conf, Path input,
+      Path output, DistanceMeasure measure, double t1, double t2, double t3,
+      double t4) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
+    conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
+    conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
 
-    Job job = new Job(conf, "Canopy Driver running buildClusters over input: " + input);
+    Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
+        + input);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(CanopyMapper.class);
@@ -250,20 +347,17 @@ public class CanopyDriver extends Abstra
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     FileOutputFormat.setOutputPath(job, canopyOutputDir);
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Job failed processing " + input.toString());
+      throw new InterruptedException("Canopy Job failed processing "
+          + input.toString());
     }
     return canopyOutputDir;
   }
 
-  public static void clusterData(Configuration conf,
-                                 Path points,
-                                 Path canopies,
-                                 Path output,
-                                 DistanceMeasure measure,
-                                 double t1,
-                                 double t2,
-                                 boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException {
+  public static void clusterData(Configuration conf, Path points,
+      Path canopies, Path output, DistanceMeasure measure, double t1,
+      double t2, boolean runSequential) throws InstantiationException,
+      IllegalAccessException, IOException, InterruptedException,
+      ClassNotFoundException {
     if (runSequential) {
       clusterDataSeq(points, canopies, output, measure, t1, t2);
     } else {
@@ -271,55 +365,73 @@ public class CanopyDriver extends Abstra
     }
   }
 
-  private static void clusterDataSeq(Path points,
-                                     Path canopies,
-                                     Path output,
-                                     DistanceMeasure measure,
-                                     double t1,
-                                     double t2) throws IOException {
+  private static void clusterDataSeq(Path points, Path canopies, Path output,
+      DistanceMeasure measure, double t1, double t2)
+      throws InstantiationException, IllegalAccessException, IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
 
     Collection<Canopy> clusters = new ArrayList<Canopy>();
     Configuration conf = new Configuration();
-    for (Canopy value :
-         new SequenceFileDirValueIterable<Canopy>(canopies, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
-      clusters.add(value);
-    }
-    // iterate over all points, assigning each to the closest canopy and outputing that clustering
-    FileSystem fs = FileSystem.get(points.toUri(), conf);
-    FileStatus[] status = fs.listStatus(points, PathFilters.logsCRCFilter());
+    FileSystem fs = FileSystem.get(canopies.toUri(), conf);
+    FileStatus[] status = fs.listStatus(canopies, new OutputLogFilter());
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
+          conf);
+      try {
+        Writable key = reader.getKeyClass().asSubclass(Writable.class)
+            .newInstance();
+        Canopy value = reader.getValueClass().asSubclass(Canopy.class)
+            .newInstance();
+        while (reader.next(key, value)) {
+          clusters.add(value);
+          value = reader.getValueClass().asSubclass(Canopy.class).newInstance();
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    // iterate over all points, assigning each to the closest canopy and
+    // outputing that clustering
+    fs = FileSystem.get(points.toUri(), conf);
+    status = fs.listStatus(points, new OutputLogFilter());
     Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
     int part = 0;
     for (FileStatus s : status) {
-      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
-                                                           conf,
-                                                           new Path(outPath, "part-m-" + part++),
-                                                           IntWritable.class,
-                                                           WeightedVectorWritable.class);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
+          conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(
+          outPath, "part-m-" + part), IntWritable.class,
+          WeightedVectorWritable.class);
       try {
-        for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(s.getPath(), conf)) {
-          Canopy closest = clusterer.findClosestCanopy(value.get(), clusters);
-          writer.append(new IntWritable(closest.getId()), new WeightedVectorWritable(1, value.get()));
+        Writable key = reader.getKeyClass().asSubclass(Writable.class)
+            .newInstance();
+        VectorWritable vw = reader.getValueClass().asSubclass(
+            VectorWritable.class).newInstance();
+        while (reader.next(key, vw)) {
+          Canopy closest = clusterer.findClosestCanopy(vw.get(), clusters);
+          writer.append(new IntWritable(closest.getId()),
+              new WeightedVectorWritable(1, vw.get()));
+          vw = reader.getValueClass().asSubclass(VectorWritable.class)
+              .newInstance();
         }
       } finally {
+        reader.close();
         writer.close();
       }
     }
   }
 
-  private static void clusterDataMR(Configuration conf,
-                                    Path points,
-                                    Path canopies,
-                                    Path output,
-                                    DistanceMeasure measure,
-                                    double t1,
-                                    double t2) throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
+  private static void clusterDataMR(Configuration conf, Path points,
+      Path canopies, Path output, DistanceMeasure measure, double t1, double t2)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
+        .getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
 
-    Job job = new Job(conf, "Canopy Driver running clusterData over input: " + points);
+    Job job = new Job(conf, "Canopy Driver running clusterData over input: "
+        + points);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(ClusterMapper.class);
@@ -334,7 +446,8 @@ public class CanopyDriver extends Abstra
     HadoopUtil.delete(conf, outPath);
 
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Clustering failed processing " + canopies.toString());
+      throw new InterruptedException("Canopy Clustering failed processing "
+          + canopies.toString());
     }
   }
 

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Sun Apr 10 20:00:13 2011
@@ -26,27 +26,32 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.math.VectorWritable;
 
-class CanopyMapper extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
+class CanopyMapper extends
+    Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> {
 
   private final Collection<Canopy> canopies = new ArrayList<Canopy>();
+
   private CanopyClusterer canopyClusterer;
 
   @Override
-  protected void map(WritableComparable<?> key, VectorWritable point, Context context)
-    throws IOException, InterruptedException {
+  protected void map(WritableComparable<?> key, VectorWritable point,
+      Context context) throws IOException, InterruptedException {
     canopyClusterer.addPointToCanopies(point.get(), canopies);
   }
 
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
     super.setup(context);
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
   }
 
   @Override
-  protected void cleanup(Context context) throws IOException, InterruptedException {
+  protected void cleanup(Context context) throws IOException,
+      InterruptedException {
     for (Canopy canopy : canopies) {
-      context.write(new Text("centroid"), new VectorWritable(canopy.computeCentroid()));
+      context.write(new Text("centroid"), new VectorWritable(canopy
+          .computeCentroid()));
     }
     super.cleanup(context);
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Sun Apr 10 20:00:13 2011
@@ -29,11 +29,12 @@ import org.apache.mahout.math.VectorWrit
 public class CanopyReducer extends Reducer<Text, VectorWritable, Text, Canopy> {
 
   private final Collection<Canopy> canopies = new ArrayList<Canopy>();
-  private CanopyClusterer canopyClusterer;
+
+  protected CanopyClusterer canopyClusterer;
 
   @Override
-  protected void reduce(Text arg0, Iterable<VectorWritable> values, Context context)
-    throws IOException, InterruptedException {
+  protected void reduce(Text arg0, Iterable<VectorWritable> values,
+      Context context) throws IOException, InterruptedException {
     for (VectorWritable value : values) {
       Vector point = value.get();
       canopyClusterer.addPointToCanopies(point, canopies);
@@ -45,9 +46,11 @@ public class CanopyReducer extends Reduc
   }
 
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
     super.setup(context);
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
+    canopyClusterer.useT3T4();
   }
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Sun Apr 10 20:00:13 2011
@@ -35,13 +35,15 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 
-public class ClusterMapper extends Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable> {
+public class ClusterMapper
+    extends
+    Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable> {
 
   private CanopyClusterer canopyClusterer;
 
   @Override
-  protected void map(WritableComparable<?> key, VectorWritable point, Context context)
-    throws IOException, InterruptedException {
+  protected void map(WritableComparable<?> key, VectorWritable point,
+      Context context) throws IOException, InterruptedException {
     canopyClusterer.emitPointToClosestCanopy(point.get(), canopies, context);
   }
 
@@ -56,7 +58,8 @@ public class ClusterMapper extends Mappe
   }
 
   @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
+  protected void setup(Context context) throws IOException,
+      InterruptedException {
     super.setup(context);
 
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
@@ -66,11 +69,13 @@ public class ClusterMapper extends Mappe
 
     // filter out the files
     if (clustersIn != null && clustersIn.length() > 0) {
-      Path clusterPath = new Path(clustersIn,"*");
+      Path clusterPath = new Path(clustersIn, "*");
       FileSystem fs = clusterPath.getFileSystem(conf);
-      Path[] paths = FileUtil.stat2Paths(fs.globStatus(clusterPath, PathFilters.partFilter()));
+      Path[] paths = FileUtil.stat2Paths(fs.globStatus(clusterPath, PathFilters
+          .partFilter()));
       for (FileStatus file : fs.listStatus(paths, PathFilters.partFilter())) {
-        for (Canopy value : new SequenceFileValueIterable<Canopy>(file.getPath(), conf)) {
+        for (Canopy value : new SequenceFileValueIterable<Canopy>(file
+            .getPath(), conf)) {
           canopies.add(value);
         }
       }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java?rev=1090881&r1=1090880&r2=1090881&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java Sun Apr 10 20:00:13 2011
@@ -25,130 +25,211 @@ import org.apache.mahout.common.distance
 public final class DefaultOptionCreator {
 
   public static final String CLUSTERING_OPTION = "clustering";
+
   public static final String CLUSTERS_IN_OPTION = "clusters";
+
   public static final String CONVERGENCE_DELTA_OPTION = "convergenceDelta";
+
   public static final String DISTANCE_MEASURE_OPTION = "distanceMeasure";
+
   public static final String EMIT_MOST_LIKELY_OPTION = "emitMostLikely";
+
   public static final String INPUT_OPTION = "input";
+
   public static final String MAX_ITERATIONS_OPTION = "maxIter";
+
   public static final String MAX_REDUCERS_OPTION = "maxRed";
+
   public static final String METHOD_OPTION = "method";
+
   public static final String NUM_CLUSTERS_OPTION = "numClusters";
+
   public static final String OUTPUT_OPTION = "output";
+
   public static final String OVERWRITE_OPTION = "overwrite";
+
   public static final String T1_OPTION = "t1";
+
   public static final String T2_OPTION = "t2";
+
+  public static final String T3_OPTION = "t3";
+
+  public static final String T4_OPTION = "t4";
+
   public static final String THRESHOLD_OPTION = "threshold";
+
   public static final String SEQUENTIAL_METHOD = "sequential";
+
   public static final String MAPREDUCE_METHOD = "mapreduce";
 
   private DefaultOptionCreator() {
   }
 
   /**
-   * Returns a default command line option for help. Used by all clustering jobs and many others
+   * Returns a default command line option for help. Used by all clustering jobs
+   * and many others
    * */
   public static Option helpOption() {
-    return new DefaultOptionBuilder().withLongName("help").withDescription("Print out help")
-        .withShortName("h").create();
+    return new DefaultOptionBuilder().withLongName("help").withDescription(
+        "Print out help").withShortName("h").create();
   }
 
   /**
-   * Returns a default command line option for input directory specification. Used by all clustering jobs plus others
+   * Returns a default command line option for input directory specification.
+   * Used by all clustering jobs plus others
    */
   public static DefaultOptionBuilder inputOption() {
-    return new DefaultOptionBuilder().withLongName(INPUT_OPTION).withRequired(false).withShortName("i")
-        .withArgument(new ArgumentBuilder().withName(INPUT_OPTION).withMinimum(1).withMaximum(1).create())
-        .withDescription("Path to job input directory.");
+    return new DefaultOptionBuilder().withLongName(INPUT_OPTION).withRequired(
+        false).withShortName("i").withArgument(
+        new ArgumentBuilder().withName(INPUT_OPTION).withMinimum(1)
+            .withMaximum(1).create()).withDescription(
+        "Path to job input directory.");
   }
 
   /**
-   * Returns a default command line option for clusters input directory specification. Used by FuzzyKmeans, Kmeans
+   * Returns a default command line option for clusters input directory
+   * specification. Used by FuzzyKmeans, Kmeans
    */
   public static DefaultOptionBuilder clustersInOption() {
-    return new DefaultOptionBuilder().withLongName(CLUSTERS_IN_OPTION).withRequired(true)
-        .withArgument(new ArgumentBuilder().withName(CLUSTERS_IN_OPTION).withMinimum(1).withMaximum(1).create())
-        .withDescription("The path to the initial clusters directory. Must be a SequenceFile of some type of Cluster")
+    return new DefaultOptionBuilder()
+        .withLongName(CLUSTERS_IN_OPTION)
+        .withRequired(true)
+        .withArgument(
+            new ArgumentBuilder().withName(CLUSTERS_IN_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription(
+            "The path to the initial clusters directory. Must be a SequenceFile of some type of Cluster")
         .withShortName("c");
   }
 
   /**
-   * Returns a default command line option for output directory specification. Used by all clustering jobs plus others
+   * Returns a default command line option for output directory specification.
+   * Used by all clustering jobs plus others
    */
   public static DefaultOptionBuilder outputOption() {
-    return new DefaultOptionBuilder().withLongName(OUTPUT_OPTION).withRequired(false).withShortName("o")
-        .withArgument(new ArgumentBuilder().withName(OUTPUT_OPTION).withMinimum(1).withMaximum(1).create())
-        .withDescription("The directory pathname for output.");
+    return new DefaultOptionBuilder().withLongName(OUTPUT_OPTION).withRequired(
+        false).withShortName("o").withArgument(
+        new ArgumentBuilder().withName(OUTPUT_OPTION).withMinimum(1)
+            .withMaximum(1).create()).withDescription(
+        "The directory pathname for output.");
   }
 
   /**
-   * Returns a default command line option for output directory overwriting. Used by all clustering jobs
+   * Returns a default command line option for output directory overwriting.
+   * Used by all clustering jobs
    */
   public static DefaultOptionBuilder overwriteOption() {
-    return new DefaultOptionBuilder().withLongName(OVERWRITE_OPTION).withRequired(false)
-        .withDescription("If present, overwrite the output directory before running job").withShortName("ow");
+    return new DefaultOptionBuilder().withLongName(OVERWRITE_OPTION)
+        .withRequired(false).withDescription(
+            "If present, overwrite the output directory before running job")
+        .withShortName("ow");
   }
 
   /**
-   * Returns a default command line option for specification of distance measure class to use.
-   * Used by Canopy, FuzzyKmeans, Kmeans, MeanShift
+   * Returns a default command line option for specification of distance measure
+   * class to use. Used by Canopy, FuzzyKmeans, Kmeans, MeanShift
    */
   public static DefaultOptionBuilder distanceMeasureOption() {
-    return new DefaultOptionBuilder().withLongName(DISTANCE_MEASURE_OPTION).withRequired(false).withShortName("dm")
-        .withArgument(new ArgumentBuilder().withName(DISTANCE_MEASURE_OPTION)
-            .withDefault(SquaredEuclideanDistanceMeasure.class.getName()).withMinimum(1).withMaximum(1).create())
-        .withDescription("The classname of the DistanceMeasure. Default is SquaredEuclidean");
+    return new DefaultOptionBuilder()
+        .withLongName(DISTANCE_MEASURE_OPTION)
+        .withRequired(false)
+        .withShortName("dm")
+        .withArgument(
+            new ArgumentBuilder().withName(DISTANCE_MEASURE_OPTION)
+                .withDefault(SquaredEuclideanDistanceMeasure.class.getName())
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "The classname of the DistanceMeasure. Default is SquaredEuclidean");
   }
 
   /**
-   * Returns a default command line option for specification of sequential or parallel operation.
-   * Used by Canopy, FuzzyKmeans, Kmeans, MeanShift, Dirichlet
+   * Returns a default command line option for specification of sequential or
+   * parallel operation. Used by Canopy, FuzzyKmeans, Kmeans, MeanShift,
+   * Dirichlet
    */
   public static DefaultOptionBuilder methodOption() {
-    return new DefaultOptionBuilder().withLongName(METHOD_OPTION).withRequired(false).withShortName("xm")
-        .withArgument(new ArgumentBuilder().withName(METHOD_OPTION).withDefault(MAPREDUCE_METHOD)
-            .withMinimum(1).withMaximum(1).create())
-        .withDescription("The execution method to use: sequential or mapreduce. Default is mapreduce");
+    return new DefaultOptionBuilder()
+        .withLongName(METHOD_OPTION)
+        .withRequired(false)
+        .withShortName("xm")
+        .withArgument(
+            new ArgumentBuilder().withName(METHOD_OPTION).withDefault(
+                MAPREDUCE_METHOD).withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "The execution method to use: sequential or mapreduce. Default is mapreduce");
   }
 
   /**
-   * Returns a default command line option for specification of T1. Used by Canopy, MeanShift
+   * Returns a default command line option for specification of T1. Used by
+   * Canopy, MeanShift
    */
   public static DefaultOptionBuilder t1Option() {
-    return new DefaultOptionBuilder().withLongName(T1_OPTION).withRequired(true).withArgument(new ArgumentBuilder()
-        .withName(T1_OPTION).withMinimum(1).withMaximum(1).create()).withDescription("T1 threshold value")
+    return new DefaultOptionBuilder().withLongName(T1_OPTION)
+        .withRequired(true).withArgument(
+            new ArgumentBuilder().withName(T1_OPTION).withMinimum(1)
+                .withMaximum(1).create()).withDescription("T1 threshold value")
         .withShortName(T1_OPTION);
   }
 
   /**
-   * Returns a default command line option for specification of T2. Used by Canopy, MeanShift
+   * Returns a default command line option for specification of T2. Used by
+   * Canopy, MeanShift
    */
   public static DefaultOptionBuilder t2Option() {
-    return new DefaultOptionBuilder().withLongName(T2_OPTION).withRequired(true).withArgument(new ArgumentBuilder()
-        .withName(T2_OPTION).withMinimum(1).withMaximum(1).create()).withDescription("T2 threshold value")
+    return new DefaultOptionBuilder().withLongName(T2_OPTION)
+        .withRequired(true).withArgument(
+            new ArgumentBuilder().withName(T2_OPTION).withMinimum(1)
+                .withMaximum(1).create()).withDescription("T2 threshold value")
         .withShortName(T2_OPTION);
   }
 
   /**
-   * Returns a default command line option for specification of max number of iterations.
-   * Used by Dirichlet, FuzzyKmeans, Kmeans, LDA
+   * Returns a default command line option for specification of T3 (Reducer T1).
+   * Used by Canopy, MeanShift
+   */
+  public static DefaultOptionBuilder t3Option() {
+    return new DefaultOptionBuilder().withLongName(T3_OPTION).withRequired(
+        false).withArgument(
+        new ArgumentBuilder().withName(T3_OPTION).withMinimum(1).withMaximum(1)
+            .create()).withDescription("T3 (Reducer T1) threshold value")
+        .withShortName(T3_OPTION);
+  }
+
+  /**
+   * Returns a default command line option for specification of T4 (Reducer T2).
+   * Used by Canopy, MeanShift
+   */
+  public static DefaultOptionBuilder t4Option() {
+    return new DefaultOptionBuilder().withLongName(T4_OPTION).withRequired(
+        false).withArgument(
+        new ArgumentBuilder().withName(T4_OPTION).withMinimum(1).withMaximum(1)
+            .create()).withDescription("T4 (Reducer T2) threshold value")
+        .withShortName(T4_OPTION);
+  }
+
+  /**
+   * Returns a default command line option for specification of max number of
+   * iterations. Used by Dirichlet, FuzzyKmeans, Kmeans, LDA
    */
   public static DefaultOptionBuilder maxIterationsOption() {
     // default value used by LDA which overrides withRequired(false)
-    return new DefaultOptionBuilder().withLongName(MAX_ITERATIONS_OPTION).withRequired(true).withShortName("x")
-        .withArgument(new ArgumentBuilder().withName(MAX_ITERATIONS_OPTION).withDefault("-1")
-            .withMinimum(1).withMaximum(1)
-            .create()).withDescription("The maximum number of iterations.");
+    return new DefaultOptionBuilder().withLongName(MAX_ITERATIONS_OPTION)
+        .withRequired(true).withShortName("x").withArgument(
+            new ArgumentBuilder().withName(MAX_ITERATIONS_OPTION).withDefault(
+                "-1").withMinimum(1).withMaximum(1).create()).withDescription(
+            "The maximum number of iterations.");
   }
 
   /**
-   * Returns a default command line option for specification of numbers of clusters to create.
-   * Used by Dirichlet, FuzzyKmeans, Kmeans
+   * Returns a default command line option for specification of numbers of
+   * clusters to create. Used by Dirichlet, FuzzyKmeans, Kmeans
    */
   public static DefaultOptionBuilder numClustersOption() {
-    return new DefaultOptionBuilder().withLongName(NUM_CLUSTERS_OPTION).withRequired(false)
-        .withArgument(new ArgumentBuilder().withName("k").withMinimum(1).withMaximum(1).create())
-        .withDescription("The number of clusters to create").withShortName("k");
+    return new DefaultOptionBuilder().withLongName(NUM_CLUSTERS_OPTION)
+        .withRequired(false).withArgument(
+            new ArgumentBuilder().withName("k").withMinimum(1).withMaximum(1)
+                .create()).withDescription("The number of clusters to create")
+        .withShortName("k");
   }
 
   /**
@@ -156,52 +237,63 @@ public final class DefaultOptionCreator 
    * Used by FuzzyKmeans, Kmeans, MeanShift
    */
   public static DefaultOptionBuilder convergenceOption() {
-    return new DefaultOptionBuilder().withLongName(CONVERGENCE_DELTA_OPTION).withRequired(false).withShortName("cd")
-        .withArgument(new ArgumentBuilder().withName(CONVERGENCE_DELTA_OPTION).withDefault("0.5")
-            .withMinimum(1).withMaximum(1).create()).withDescription("The convergence delta value. Default is 0.5");
+    return new DefaultOptionBuilder().withLongName(CONVERGENCE_DELTA_OPTION)
+        .withRequired(false).withShortName("cd").withArgument(
+            new ArgumentBuilder().withName(CONVERGENCE_DELTA_OPTION)
+                .withDefault("0.5").withMinimum(1).withMaximum(1).create())
+        .withDescription("The convergence delta value. Default is 0.5");
   }
 
   /**
-   * Returns a default command line option for specifying the max number of reducers.
-   * Used by Dirichlet, FuzzyKmeans, Kmeans and LDA
-   * @deprecated 
+   * Returns a default command line option for specifying the max number of
+   * reducers. Used by Dirichlet, FuzzyKmeans, Kmeans and LDA
+   * 
+   * @deprecated
    */
   @Deprecated
   public static DefaultOptionBuilder numReducersOption() {
-    return new DefaultOptionBuilder().withLongName(MAX_REDUCERS_OPTION).withRequired(false).withShortName("r")
-        .withArgument(new ArgumentBuilder().withName(MAX_REDUCERS_OPTION).withDefault("2")
-            .withMinimum(1).withMaximum(1).create())
+    return new DefaultOptionBuilder().withLongName(MAX_REDUCERS_OPTION)
+        .withRequired(false).withShortName("r").withArgument(
+            new ArgumentBuilder().withName(MAX_REDUCERS_OPTION)
+                .withDefault("2").withMinimum(1).withMaximum(1).create())
         .withDescription("The number of reduce tasks. Defaults to 2");
   }
 
   /**
-   * Returns a default command line option for clustering specification. Used by all clustering except LDA
+   * Returns a default command line option for clustering specification. Used by
+   * all clustering except LDA
    */
   public static DefaultOptionBuilder clusteringOption() {
-    return new DefaultOptionBuilder().withLongName(CLUSTERING_OPTION).withRequired(false)
-        .withDescription("If present, run clustering after the iterations have taken place").withShortName("cl");
+    return new DefaultOptionBuilder().withLongName(CLUSTERING_OPTION)
+        .withRequired(false).withDescription(
+            "If present, run clustering after the iterations have taken place")
+        .withShortName("cl");
   }
 
   /**
-   * Returns a default command line option for specifying the emitMostLikely flag. Used by Dirichlet and FuzzyKmeans
+   * Returns a default command line option for specifying the emitMostLikely
+   * flag. Used by Dirichlet and FuzzyKmeans
    */
   public static DefaultOptionBuilder emitMostLikelyOption() {
-    return new DefaultOptionBuilder().withLongName(EMIT_MOST_LIKELY_OPTION).withRequired(false).withShortName("e")
-        .withArgument(new ArgumentBuilder().withName(EMIT_MOST_LIKELY_OPTION).withDefault("true")
-            .withMinimum(1).withMaximum(1).create())
-        .withDescription("True if clustering should emit the most likely point only, "
-            + "false for threshold clustering. Default is true");
+    return new DefaultOptionBuilder().withLongName(EMIT_MOST_LIKELY_OPTION)
+        .withRequired(false).withShortName("e").withArgument(
+            new ArgumentBuilder().withName(EMIT_MOST_LIKELY_OPTION)
+                .withDefault("true").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "True if clustering should emit the most likely point only, "
+                + "false for threshold clustering. Default is true");
   }
 
   /**
-   * Returns a default command line option for specifying the clustering threshold value.
-   * Used by Dirichlet and FuzzyKmeans
+   * Returns a default command line option for specifying the clustering
+   * threshold value. Used by Dirichlet and FuzzyKmeans
    */
   public static DefaultOptionBuilder thresholdOption() {
-    return new DefaultOptionBuilder().withLongName(THRESHOLD_OPTION).withRequired(false).withShortName("t")
-        .withArgument(new ArgumentBuilder().withName(THRESHOLD_OPTION).withDefault("0")
-            .withMinimum(1).withMaximum(1).create())
-        .withDescription("The pdf threshold used for cluster determination. Default is 0");
+    return new DefaultOptionBuilder().withLongName(THRESHOLD_OPTION)
+        .withRequired(false).withShortName("t").withArgument(
+            new ArgumentBuilder().withName(THRESHOLD_OPTION).withDefault("0")
+                .withMinimum(1).withMaximum(1).create()).withDescription(
+            "The pdf threshold used for cluster determination. Default is 0");
   }
 
 }



Mime
View raw message