mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeast...@apache.org
Subject svn commit: r1211715 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/topdown/ core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ core/src/test/java/org/apac...
Date Wed, 07 Dec 2011 23:58:37 GMT
Author: jeastman
Date: Wed Dec  7 23:58:36 2011
New Revision: 1211715

URL: http://svn.apache.org/viewvc?rev=1211715&view=rev
Log:
MAHOUT-843: Final patch plus some integration fixes. All tests run

Added:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/TopDownClusteringPathConstants.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorTest.java
    mahout/trunk/src/conf/clusterpp.props
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
    mahout/trunk/src/conf/driver.classes.props

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Wed Dec  7 23:58:36 2011
@@ -269,7 +269,7 @@ public class CanopyDriver extends Abstra
       clusterer.addPointToCanopies(vw.get(), canopies);
     }
 
-    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0'+ Cluster.FINAL_ITERATION_SUFFIX);
     Path path = new Path(canopyOutputDir, "part-r-00000");
     SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
         Text.class, Canopy.class);
@@ -343,7 +343,7 @@ public class CanopyDriver extends Abstra
     job.setJarByClass(CanopyDriver.class);
 
     FileInputFormat.addInputPath(job, input);
-    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
+    Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX);
     FileOutputFormat.setOutputPath(job, canopyOutputDir);
     if (!job.waitForCompletion(true)) {
       throw new InterruptedException("Canopy Job failed processing " + input);

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.bottomLevelClusterDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.clusteredPointsDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.postProcessDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.topLevelClusterDirectory;
+
+import java.io.File;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Contains list of all internal paths used in top down clustering.
+ */
+public class PathDirectory {
+  
+  /**
+   * All output of top level clustering is stored in output directory/topLevelCluster.
+   * 
+   * @param output
+   *          the output path of clustering.
+   * @return The top level Cluster Directory.
+   */
+  public static Path getTopLevelClusterPath(Path output) {
+    return new Path(output + File.separator + topLevelClusterDirectory);
+  }
+  
+  /**
+   * The output of top level clusters is post processed and kept in this path.
+   * 
+   * @param outputPathProvidedByUser
+   *          the output path of clustering.
+   * @return the path where the output of top level cluster post processor is kept.
+   */
+  public static Path getClusterPostProcessorOutputDirectory(Path outputPathProvidedByUser) {
+    return new Path(outputPathProvidedByUser + File.separator + postProcessDirectory);
+  }
+  
+  /**
+   * The top level clustered points before post processing is generated here.
+   * 
+   * @param output
+   *          the output path of clustering.
+   * @return the clustered points directory
+   */
+  public static Path getClusterOutputClusteredPoints(Path output) {
+    return new Path(output + File.separator + clusteredPointsDirectory + File.separator, "*");
+  }
+  
+  /**
+   * Each cluster produced by top level clustering is processed in output/"bottomLevelCluster"/clusterId.
+   * 
+   * @param output
+   * @param clusterId
+   * @return the bottom level clustering path.
+   */
+  public static Path getBottomLevelClusterPath(Path output, String clusterId) {
+    return new Path(output + File.separator + bottomLevelClusterDirectory + File.separator + clusterId);
+  }
+  
+  /**
+   * Each clusters path name is its clusterId. The vectors reside in separate files inside it.
+   * 
+   * @param clusterPostProcessorOutput
+   *          the path of cluster post processor output.
+   * @param clusterId
+   *          the id of the cluster.
+   * @return the cluster path for cluster id.
+   */
+  public static Path getClusterPathForClusterId(Path clusterPostProcessorOutput, String clusterId) {
+    return new Path(clusterPostProcessorOutput + File.separator + clusterId);
+  }
+  
+}
\ No newline at end of file

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/TopDownClusteringPathConstants.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/TopDownClusteringPathConstants.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/TopDownClusteringPathConstants.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/TopDownClusteringPathConstants.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+/**
+ * Constant directory paths used for top down clustering.
+ */
+public class TopDownClusteringPathConstants {
+  
+  public static final String topLevelClusterDirectory = "topLevelCluster";
+  
+  public static final String postProcessDirectory = "clusterPostProcessed";
+  
+  public static final String clusteredPointsDirectory = "clusteredPoints";
+  
+  public static final String bottomLevelClusterDirectory = "bottomLevelCluster";
+  
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+
+/**
+ * Reads the number of clusters produced by the clustering algorithm.
+ */
+public class ClusterCountReader {
+  
+  /**
+   * Reads the number of clusters present by reading the clusters-*-final file.
+   * 
+   * @param clusterOutputPath
+   *          The output path provided to the clustering algorithm.
+   * @param conf
+   *          The hadoop configuration.
+   * @return the number of final clusters.
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException,
+                                                                                   InstantiationException,
+                                                                                   IllegalAccessException {
+    int numberOfClusters = 0;
+    FileStatus[] partFiles = getPartFiles(clusterOutputPath, conf);
+    for (FileStatus fileStatus : partFiles) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(conf), fileStatus.getPath(), conf);
+      WritableComparable key = (WritableComparable) reader.getKeyClass().newInstance();
+      Writable value = (Writable) reader.getValueClass().newInstance();
+      while (reader.next(key, value)) {
+        numberOfClusters++;
+      }
+      reader.close();
+    }
+    return numberOfClusters;
+  }
+  
+  /**
+   * Gets the part file of the final iteration. clusters-n-final
+   * 
+   */
+  private static FileStatus[] getPartFiles(Path path, Configuration conf) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] clusterFiles = fileSystem.listStatus(path, CLUSTER_FINAL);
+    FileStatus[] partFileStatuses = fileSystem
+        .listStatus(clusterFiles[0].getPath(), PathFilters.partFilter());
+    return partFileStatuses;
+  }
+  
+  /**
+   * Pathfilter to read the final clustering file.
+   */
+  private static final PathFilter CLUSTER_FINAL = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith("clusters-") && name.endsWith("-final");
+    }
+  };
+}
\ No newline at end of file

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import static org.apache.mahout.clustering.topdown.PathDirectory.getClusterOutputClusteredPoints;
+import static org.apache.mahout.clustering.topdown.PathDirectory.getClusterPathForClusterId;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * This class reads the output of any clustering algorithm, and, creates separate directories for different
+ * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster
+ * directory associated with that point.
+ * 
+ * This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered
+ * sequentially.
+ * 
+ * The sequential and non sequential version, both are being used from @ClusterOutputPostProcessorDriver.
+ * 
+ */
+public class ClusterOutputPostProcessor {
+  
+  private Path clusteredPoints;
+  private FileSystem fileSystem;
+  private Configuration conf;
+  private Path clusterPostProcessorOutput;
+  private Map<String,Path> postProcessedClusterDirectories = new HashMap<String,Path>();
+  private long uniqueVectorId = 0;
+  private Map<String,SequenceFile.Writer> writersForClusters;
+  
+  public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed,
+                                    Path output,
+                                    Configuration hadoopConfiguration) {
+    this.clusterPostProcessorOutput = output;
+    this.clusteredPoints = getClusterOutputClusteredPoints(clusterOutputToBeProcessed);
+    this.conf = hadoopConfiguration;
+    this.writersForClusters = new HashMap<String,SequenceFile.Writer>();
+  }
+  
+  /**
+   * 
+   * This method takes the clustered points output by the clustering algorithms as input and writes them into
+   * their respective clusters.
+   * 
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  public void process() throws IOException, InstantiationException, IllegalAccessException {
+    
+    fileSystem = clusteredPoints.getFileSystem(conf);
+    
+    createPostProcessDirectory();
+    
+    FileStatus[] partFiles = getAllClusteredPointPartFiles();
+    for (FileStatus partFile : partFiles) {
+      SequenceFile.Reader clusteredPointsReader = new SequenceFile.Reader(fileSystem, partFile.getPath(),
+          conf);
+      WritableComparable clusterIdAsKey = (WritableComparable) clusteredPointsReader.getKeyClass()
+          .newInstance();
+      Writable vector = (Writable) clusteredPointsReader.getValueClass().newInstance();
+      while (clusteredPointsReader.next(clusterIdAsKey, vector)) {
+        String clusterId = clusterIdAsKey.toString().trim();
+        putVectorInRespectiveCluster(clusterId, (WeightedVectorWritable) vector);
+      }
+      
+      clusteredPointsReader.close();
+      closeWriters();
+    }
+    
+  }
+  
+  /**
+   * Returns all the part files in the clusterdPoints directory.
+   */
+  private FileStatus[] getAllClusteredPointPartFiles() throws IOException {
+    Path[] partFilePaths = FileUtil.stat2Paths(fileSystem.globStatus(clusteredPoints,
+      PathFilters.partFilter()));
+    FileStatus[] partFileStatuses = fileSystem.listStatus(partFilePaths, PathFilters.partFilter());
+    return partFileStatuses;
+  }
+  
+  /**
+   * Creates the directory to put post processed clusters.
+   */
+  private void createPostProcessDirectory() throws IOException {
+    if (!fileSystem.exists(clusterPostProcessorOutput)) {
+      boolean directoryCreationSuccessFlag = fileSystem.mkdirs(clusterPostProcessorOutput);
+      if (!directoryCreationSuccessFlag) {
+        throw new IOException("Error creating cluster post processor directory");
+      }
+    }
+  }
+  
+  /**
+   * 
+   * Finds out the cluster directory of the vector and writes it into the specified cluster.
+   */
+  private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException {
+    Writer writer = findWriterForVector(clusterId);
+    postProcessedClusterDirectories.put(clusterId,
+      getClusterPathForClusterId(clusterPostProcessorOutput, clusterId));
+    writeVectorToCluster(writer, point);
+  }
+  
+  /**
+   * Finds out the path in cluster where the point is supposed to be written.
+   */
+  private Writer findWriterForVector(String clusterId) throws IOException {
+    Path clusterDirectory = PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId);
+    Writer writer = writersForClusters.get(clusterId);
+    if (writer == null) {
+      final Path pathToWrite = new Path(clusterDirectory, new Path("part-m-0"));
+      SequenceFile.Writer fileWriter = new SequenceFile.Writer(fileSystem, conf, pathToWrite,
+          LongWritable.class, VectorWritable.class);
+      writer = fileWriter;
+      writersForClusters.put(clusterId, writer);
+    }
+    return writer;
+  }
+  
+  /**
+   * Writes vector to the cluster directory.
+   */
+  private void writeVectorToCluster(Writer writer, WeightedVectorWritable point) throws IOException {
+    writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector()));
+    writer.sync();
+  }
+  
+  /**
+   * 
+   * Returns the set of all post processed cluster paths.
+   */
+  public Map<String,Path> getPostProcessedClusterDirectories() {
+    return postProcessedClusterDirectories;
+  }
+  
+  public void setClusteredPoints(Path clusteredPoints) {
+    this.clusteredPoints = clusteredPoints;
+  }
+  
+  public void closeWriters() throws IOException {
+    for (Writer writer : writersForClusters.values()) {
+      writer.close();
+    }
+  }
+  
+}
\ No newline at end of file

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be
+ * used for top down clustering. It can also be used if the clustering output needs to be grouped into their
+ * respective clusters.
+ */
+public class ClusterOutputPostProcessorDriver extends AbstractJob {
+  
+  /**
+   * CLI to run clustering post processor. The input to post processor is the ouput path specified to the
+   * clustering.
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.methodOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+
+    if (getConf() == null) {
+      setConf(new Configuration());
+    }
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+      DefaultOptionCreator.SEQUENTIAL_METHOD);
+    run(input, output, runSequential);
+    return 0;
+    
+  }
+  
+  /**
+   * Constructor to be used by the ToolRunner.
+   */
+  private ClusterOutputPostProcessorDriver() {}
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args);
+  }
+  
+  /**
+   * Post processes the output of clustering algorithms and groups them into respective clusters. Each
+   * cluster's vectors are written into a directory named after its clusterId.
+   * 
+   * @param input
+   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *          path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output
+   *          The post processed data would be stored at this path.
+   * @param runSequential
+   *          If set to true, post processes it sequentially, else, uses. MapReduce. Hint : If the clustering
+   *          was done sequentially, make it sequential, else vice versa.
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  public static void run(Path input, Path output, boolean runSequential) throws IOException,
+                                                                        InterruptedException,
+                                                                        ClassNotFoundException,
+                                                                        InstantiationException,
+                                                                        IllegalAccessException {
+    if (runSequential) {
+      postProcessSeq(input, output);
+    } else {
+      Configuration conf = new Configuration();
+      postProcessMR(conf, input, output);
+      movePartFilesToRespectiveDirectories(conf, output);
+    }
+    
+  }
+  
+  /**
+   * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after
+   * their clusterId.
+   * 
+   * @param input
+   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *          path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output
+   *          The post processed data would be stored at this path.
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   */
+  private static void postProcessSeq(Path input, Path output) throws IOException,
+                                                             InstantiationException,
+                                                             IllegalAccessException {
+    ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output,
+        new Configuration());
+    clusterOutputPostProcessor.process();
+  }
+  
+  /**
+   * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the
+   * output. So that each cluster's vector is written in its own part file.
+   * 
+   * @param conf
+   *          The hadoop configuration.
+   * @param input
+   *          The output path provided to the clustering algorithm, whose would be post processed. Hint : The
+   *          path of the directory containing clusters-*-final and clusteredPoints.
+   * @param output
+   *          The post processed data would be stored at this path.
+   * @throws IOException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException,
+                                                                                InstantiationException,
+                                                                                IllegalAccessException,
+                                                                                InterruptedException,
+                                                                                ClassNotFoundException {
+    Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(ClusterOutputPostProcessorMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+    job.setReducerClass(ClusterOutputPostProcessorReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(VectorWritable.class);
+    int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf);
+    job.setNumReduceTasks(numberOfClusters);
+    job.setJarByClass(ClusterOutputPostProcessorDriver.class);
+    
+    FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints")));
+    FileOutputFormat.setOutputPath(job, output);
+    if (!job.waitForCompletion(true)) {
+      throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input);
+    }
+  }
+  
+  /**
+   * The mapreduce version of the post processor writes different clusters into different part files. This
+   * method reads the part files and moves them into directories named after their clusterIds.
+   * 
+   * @param conf
+   *          The hadoop configuration.
+   * @param output
+   *          The post processed data would be stored at this path.
+   * @throws IOException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException,
+                                                                                           InstantiationException,
+                                                                                           IllegalAccessException {
+    FileStatus[] partFiles = getPartFiles(output, conf);
+    for (FileStatus fileStatus : partFiles) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(conf), fileStatus.getPath(), conf);
+      WritableComparable key = (WritableComparable) reader.getKeyClass().newInstance();
+      Writable value = (Writable) reader.getValueClass().newInstance();
+      boolean next = reader.next(key, value);
+      reader.close();
+      if (next) {
+        renameFile(key, fileStatus, conf);
+      }
+    }
+  }
+  
+  /**
+   * Using @FileSystem rename method to move the file.
+   */
+  private static void renameFile(WritableComparable key, FileStatus fileStatus, Configuration conf) throws IOException {
+    Path path = fileStatus.getPath();
+    FileSystem fileSystem = path.getFileSystem(conf);
+    Path subDir = new Path(key.toString());
+    Path renameTo = new Path(path.getParent(), subDir);
+    boolean mkdirs = fileSystem.mkdirs(renameTo);
+    fileSystem.rename(path, renameTo);
+  }
+  
+  /**
+   * Gets the part file of the final iteration. clusters-n-final
+   */
+  private static FileStatus[] getPartFiles(Path output, Configuration conf) throws IOException {
+    FileSystem fileSystem = output.getFileSystem(conf);
+    FileStatus[] partFiles = fileSystem.listStatus(output, PathFilters.partFilter());
+    return partFiles;
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Mapper for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorMapper extends
+    Mapper<IntWritable,WeightedVectorWritable,Text,VectorWritable> {
+  
+  /**
+   * The key is the cluster id and the value is the vector.
+   */
+  @Override
+  protected void map(IntWritable key, WeightedVectorWritable vector, Context context) throws IOException,
+                                                                                     InterruptedException {
+    context.write(new Text(key.toString().trim()), new VectorWritable(vector.getVector()));
+  }
+}

Added: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java (added)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Reducer for post processing cluster output.
+ */
+public class ClusterOutputPostProcessorReducer extends Reducer<Text,VectorWritable,Text,VectorWritable> {
+  /**
+   * The key is the cluster id and the values contains the points in that cluster.
+   */
+  @Override
+  protected void reduce(Text key, Iterable<VectorWritable> values, Context context) throws IOException,
+                                                                                   InterruptedException {
+    for (VectorWritable value : values) {
+      context.write(key, value);
+    }
+  }
+  
+}

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java Wed Dec  7 23:58:36 2011
@@ -333,7 +333,7 @@ public final class TestCanopyCreation ex
         manhattanDistanceMeasure, 3.1, 2.1, false, false);
 
     // verify output from sequence file
-    Path path = new Path(output, "clusters-0/part-r-00000");
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
     FileSystem fs = FileSystem.get(path.toUri(), config);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
     try {
@@ -373,7 +373,7 @@ public final class TestCanopyCreation ex
         euclideanDistanceMeasure, 3.1, 2.1, false, false);
 
     // verify output from sequence file
-    Path path = new Path(output, "clusters-0/part-r-00000");
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
     FileSystem fs = FileSystem.get(path.toUri(), config);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config);
     try {
@@ -495,7 +495,7 @@ public final class TestCanopyCreation ex
         manhattanDistanceMeasure, 3.1, 2.1, true, true);
 
     // verify output from sequence file
-    Path path = new Path(output, "clusters-0/part-r-00000");
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
     int ix = 0;
     for (Canopy value : new SequenceFileValueIterable<Canopy>(path, true,
         config)) {
@@ -532,7 +532,7 @@ public final class TestCanopyCreation ex
     new CanopyDriver().run(args);
 
     // verify output from sequence file
-    Path path = new Path(output, "clusters-0/part-r-00000");
+    Path path = new Path(output, "clusters-0-final/part-r-00000");
 
     int ix = 0;
     for (Canopy value : new SequenceFileValueIterable<Canopy>(path, true,

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java Wed Dec  7 23:58:36 2011
@@ -491,7 +491,7 @@ public final class TestKmeansClustering 
 
     // now run the KMeans job
     KMeansDriver.run(pointsPath,
-                     new Path(outputPath, "clusters-0"),
+                     new Path(outputPath, "clusters-0-final"),
                      outputPath,
                      new EuclideanDistanceMeasure(),
                      0.001,

Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+import static java.io.File.separator;
+import static org.apache.mahout.clustering.topdown.PathDirectory.getClusterPostProcessorOutputDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.clusteredPointsDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.postProcessDirectory;
+import static org.apache.mahout.clustering.topdown.TopDownClusteringPathConstants.topLevelClusterDirectory;
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class PathDirectoryTest {
+  
+  private Path output = new Path("output");
+  
+  @Test
+  public void shouldReturnTopLevelClusterPath() {
+    Path expectedPath = new Path(output, topLevelClusterDirectory);
+    Assert.assertEquals(expectedPath, PathDirectory.getTopLevelClusterPath(output));
+  }
+  
+  @Test
+  public void shouldReturnClusterPostProcessorOutputDirectory() {
+    Path expectedPath = new Path(output, postProcessDirectory);
+    Assert.assertEquals(expectedPath, getClusterPostProcessorOutputDirectory(output));
+  }
+  
+  @Test
+  public void shouldReturnClusterOutputClusteredPoints() {
+    Path expectedPath = new Path(output, clusteredPointsDirectory + separator + "*");
+    Assert.assertEquals(expectedPath, PathDirectory.getClusterOutputClusteredPoints(output));
+  }
+  
+  @Test
+  public void shouldReturnBottomLevelClusterPath() {
+    Path expectedPath = new Path(output + separator
+                                 + TopDownClusteringPathConstants.bottomLevelClusterDirectory + separator
+                                 + "1");
+    Assert.assertEquals(expectedPath, PathDirectory.getBottomLevelClusterPath(output, "1"));
+  }
+  
+  @Test
+  public void shouldReturnClusterPathForClusterId() {
+    Path expectedPath = new Path(getClusterPostProcessorOutputDirectory(output), new Path("1"));
+    Assert.assertEquals(expectedPath,
+      PathDirectory.getClusterPathForClusterId(getClusterPostProcessorOutputDirectory(output), "1"));
+  }
+  
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ClusterCountReaderTest extends MahoutTestCase {
+  
+  public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private FileSystem fs;
+  
+  private Path outputPathForCanopy;
+  
+  private Configuration conf;
+  
+  private Path outputPathForKMeans;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  /**
+   * Story: User wants to use cluster post processor after canopy clustering and then run clustering on the
+   * output clusters
+   */
+  @Test
+  public void testGetNumberOfClusters() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf);
+    
+    outputPathForCanopy = getTestTempDirPath("canopy");
+    outputPathForKMeans = getTestTempDirPath("kmeans");
+    
+    topLevelClustering(pointsPath, conf);
+    
+    int numberOfClusters = ClusterCountReader.getNumberOfClusters(outputPathForKMeans, conf);
+    Assert.assertEquals(2, numberOfClusters);
+    verifyThatNumberOfClustersIsCorrect(conf, new Path(outputPathForKMeans, new Path("clusteredPoints")));
+    
+  }
+  
+  private void topLevelClustering(Path pointsPath, Configuration conf) throws IOException,
+                                                                      InterruptedException,
+                                                                      ClassNotFoundException {
+    final ManhattanDistanceMeasure measure = new ManhattanDistanceMeasure();
+    CanopyDriver.run(conf, pointsPath, outputPathForCanopy, measure, 4.0, 3.0, true, true);
+    final Path clustersIn = new Path(outputPathForCanopy, new Path(Cluster.CLUSTERS_DIR + '0'
+                                                                   + Cluster.FINAL_ITERATION_SUFFIX));
+    KMeansDriver.run(conf, pointsPath, clustersIn, outputPathForKMeans, measure, 1, 1, true, true);
+  }
+  
+  private void verifyThatNumberOfClustersIsCorrect(Configuration conf, Path clusteredPointsPath) {
+    DummyOutputCollector<IntWritable,WeightedVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedVectorWritable>();
+    
+    // The key is the clusterId, the value is the weighted vector
+    for (Pair<IntWritable,WeightedVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedVectorWritable>(
+        new Path(clusteredPointsPath, "part-m-0"), conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    final int clusterSize = collector.getKeys().size();
+    Assert.assertTrue(clusterSize == 2);
+  }
+  
+}

Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorTest.java?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorTest.java (added)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorTest.java Wed Dec  7 23:58:36 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ClusterOutputPostProcessorTest extends MahoutTestCase {
+  
+  public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private FileSystem fs;
+  
+  private Path outputPath;
+  
+  private Configuration conf;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  /**
+   * Story: User wants to use cluster post processor after canopy clustering and then run clustering on the
+   * output clusters
+   */
+  @Test
+  public void testTopDownClustering() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf);
+    
+    outputPath = getTestTempDirPath("output");
+    
+    topLevelClustering(pointsPath, conf);
+    
+    Map<String,Path> postProcessedClusterDirectories = ouputPostProcessing(conf);
+    
+    assertPostProcessedOutput(postProcessedClusterDirectories);
+    
+    bottomLevelClustering(postProcessedClusterDirectories);
+  }
+  
+  private void assertTopLevelCluster(Entry<String,Path> cluster) {
+    String clusterId = cluster.getKey();
+    Path clusterPath = cluster.getValue();
+    
+    try {
+      if (clusterId.equals("0")) {
+        assertPointsInFirstTopLevelCluster(clusterPath);
+      } else if (clusterId.equals("1")) {
+        assertPointsInSecondTopLevelCluster(clusterPath);
+      }
+    } catch (IOException e) {
+      Assert.fail("Exception occurred while asserting top level cluster.");
+    }
+    
+  }
+  
+  private void assertPointsInFirstTopLevelCluster(Path clusterPath) throws IOException {
+    List<Vector> vectorsInCluster = getVectorsInCluster(clusterPath);
+    for (Vector vector : vectorsInCluster) {
+      Assert.assertTrue(ArrayUtils.contains(new String[] {"{1:1.0,0:1.0}", "{1:1.0,0:2.0}", "{1:2.0,0:1.0}"},
+        vector.asFormatString()));
+    }
+  }
+  
+  private void assertPointsInSecondTopLevelCluster(Path clusterPath) throws IOException {
+    List<Vector> vectorsInCluster = getVectorsInCluster(clusterPath);
+    for (Vector vector : vectorsInCluster) {
+      Assert.assertTrue(ArrayUtils.contains(new String[] {"{1:4.0,0:4.0}", "{1:4.0,0:5.0}", "{1:5.0,0:4.0}",
+                                                          "{1:5.0,0:5.0}"}, vector.asFormatString()));
+    }
+  }
+  
+  private List<Vector> getVectorsInCluster(Path clusterPath) throws IOException {
+    Path[] partFilePaths = FileUtil.stat2Paths(fs.globStatus(clusterPath));
+    FileStatus[] listStatus = fs.listStatus(partFilePaths);
+    List<Vector> vectors = new ArrayList<Vector>();
+    for (FileStatus partFile : listStatus) {
+      SequenceFile.Reader topLevelClusterReader = new SequenceFile.Reader(fs, partFile.getPath(), conf);
+      LongWritable clusterIdAsKey = new LongWritable();
+      VectorWritable point = new VectorWritable();
+      while (topLevelClusterReader.next(clusterIdAsKey, point)) {
+        vectors.add(point.get());
+      }
+    }
+    return vectors;
+  }
+  
+  private void bottomLevelClustering(Map<String,Path> postProcessedClusterDirectories) throws IOException,
+                                                                                      InterruptedException,
+                                                                                      ClassNotFoundException {
+    for (Entry<String,Path> topLevelCluster : postProcessedClusterDirectories.entrySet()) {
+      String clusterId = topLevelCluster.getKey();
+      Path topLevelclusterPath = topLevelCluster.getValue();
+      
+      Path bottomLevelCluster = PathDirectory.getBottomLevelClusterPath(outputPath, clusterId);
+      CanopyDriver.run(conf, topLevelclusterPath, bottomLevelCluster, new ManhattanDistanceMeasure(), 2.1,
+        2.0, true, true);
+      assertBottomLevelCluster(bottomLevelCluster);
+    }
+  }
+  
+  private void assertBottomLevelCluster(Path bottomLevelCluster) {
+    Path clusteredPointsPath = new Path(bottomLevelCluster, "clusteredPoints");
+    
+    DummyOutputCollector<IntWritable,WeightedVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedVectorWritable>();
+    
+    // The key is the clusterId, the value is the weighted vector
+    for (Pair<IntWritable,WeightedVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedVectorWritable>(
+        new Path(clusteredPointsPath, "part-m-0"), conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    final int clusterSize = collector.getKeys().size();
+    // First top level cluster produces two more clusters, second top level cluster is not broken again
+    Assert.assertTrue(clusterSize == 1 || clusterSize == 2);
+    
+  }
+  
+  private void assertPostProcessedOutput(Map<String,Path> postProcessedClusterDirectories) {
+    for (Entry<String,Path> cluster : postProcessedClusterDirectories.entrySet()) {
+      assertTopLevelCluster(cluster);
+    }
+  }
+  
+  private Map<String,Path> ouputPostProcessing(Configuration conf) throws IOException,
+                                                                  InstantiationException, IllegalAccessException {
+    ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(outputPath,
+        outputPath, conf);
+    clusterOutputPostProcessor.process();
+    return clusterOutputPostProcessor.getPostProcessedClusterDirectories();
+  }
+  
+  private void topLevelClustering(Path pointsPath, Configuration conf) throws IOException,
+                                                                      InterruptedException,
+                                                                      ClassNotFoundException {
+    CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, true, true);
+  }
+  
+}
\ No newline at end of file

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java Wed Dec  7 23:58:36 2011
@@ -207,7 +207,7 @@ public final class TestClusterDumper ext
         4, false, true);
     // now run the KMeans job
     KMeansDriver.run(conf, getTestTempDirPath("testdata"), new Path(output,
-        "clusters-0"), output, measure, 0.001, 10, true, false);
+        "clusters-0-final"), output, measure, 0.001, 10, true, false);
     // run ClusterDumper
     ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
         output, 10), new Path(output, "clusteredPoints"));
@@ -224,7 +224,7 @@ public final class TestClusterDumper ext
         4, false, true);
     // now run the Fuzzy KMeans job
     FuzzyKMeansDriver.run(conf, getTestTempDirPath("testdata"), new Path(
-        output, "clusters-0"), output, measure, 0.001, 10, 1.1f, true,
+        output, "clusters-0-final"), output, measure, 0.001, 10, 1.1f, true,
         true, 0, true);
     // run ClusterDumper
     ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java Wed Dec  7 23:58:36 2011
@@ -182,7 +182,7 @@ public final class TestClusterEvaluator 
     // run using MR reference point calculation
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, true);
     int numIterations = 2;
-    Path clustersIn = new Path(output, "clusters-0");
+    Path clustersIn = new Path(output, "clusters-0-final");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
         "clusteredPoints"), output, measure, numIterations, false);
     printRepPoints(numIterations);
@@ -320,7 +320,7 @@ public final class TestClusterEvaluator 
     Configuration conf = new Configuration();
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, true);
     int numIterations = 10;
-    Path clustersIn = new Path(output, "clusters-0");
+    Path clustersIn = new Path(output, "clusters-0-final");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
         "clusteredPoints"), output, measure, numIterations, true);
     ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
@@ -342,7 +342,7 @@ public final class TestClusterEvaluator 
     Configuration conf = new Configuration();
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, true);
     // now run the KMeans job
-    KMeansDriver.run(testdata, new Path(output, "clusters-0"), output, measure,
+    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output, measure,
         0.001, 10, true, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-2");
@@ -366,7 +366,7 @@ public final class TestClusterEvaluator 
     Configuration conf = new Configuration();
     CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, true);
     // now run the KMeans job
-    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0"), output,
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
         measure, 0.001, 10, 2, true, true, 0, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-4");

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java Wed Dec  7 23:58:36 2011
@@ -329,7 +329,7 @@ public final class TestCDbwEvaluator ext
     CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
         true, true);
     int numIterations = 10;
-    Path clustersIn = new Path(output, "clusters-0");
+    Path clustersIn = new Path(output, "clusters-0-final");
     RepresentativePointsDriver.run(conf, clustersIn, new Path(output,
         "clusteredPoints"), output, measure, numIterations, true);
     CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
@@ -352,7 +352,7 @@ public final class TestCDbwEvaluator ext
     CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
         false, true);
     // now run the KMeans job
-    KMeansDriver.run(testdata, new Path(output, "clusters-0"), output, measure,
+    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output, measure,
         0.001, 10, true, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-2");
@@ -378,7 +378,7 @@ public final class TestCDbwEvaluator ext
     CanopyDriver.run(new Configuration(), testdata, output, measure, 3.1, 2.1,
         false, true);
     // now run the KMeans job
-    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0"), output,
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), output,
         measure, 0.001, 10, 2, true, true, 0, true);
     int numIterations = 10;
     Path clustersIn = new Path(output, "clusters-4");

Added: mahout/trunk/src/conf/clusterpp.props
URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/clusterpp.props?rev=1211715&view=auto
==============================================================================
--- mahout/trunk/src/conf/clusterpp.props (added)
+++ mahout/trunk/src/conf/clusterpp.props Wed Dec  7 23:58:36 2011
@@ -0,0 +1,14 @@
+# The following parameters must be specified
+#i|input = /path/to/initial/cluster/output
+#o|output = /path/to/output
+
+
+
+
+# The following parameters must be specified
+#i|input = /path/to/initial/cluster/output
+#o|output = /path/to/output
+
+
+
+

Modified: mahout/trunk/src/conf/driver.classes.props
URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/driver.classes.props?rev=1211715&r1=1211714&r2=1211715&view=diff
==============================================================================
--- mahout/trunk/src/conf/driver.classes.props (original)
+++ mahout/trunk/src/conf/driver.classes.props Wed Dec  7 23:58:36 2011
@@ -34,6 +34,7 @@ org.apache.mahout.clustering.meanshift.M
 org.apache.mahout.clustering.canopy.CanopyDriver = canopy : Canopy clustering
 org.apache.mahout.clustering.spectral.eigencuts.EigencutsDriver = eigencuts : Eigencuts spectral clustering
 org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver = spectralkmeans : Spectral k-means clustering
+org.apache.mahout.clustering.topdown.postprocessor.ClusterOutputPostProcessorDriver = clusterpp : Groups Clustering Output In Clusters
 #Freq. Itemset Mining
 org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver = fpg : Frequent Pattern Growth
 #Classification



Mime
View raw message