mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject svn commit: r823911 [1/2] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/classifier/bayes/datastore/ core/src/main/java/org/apache/mahout/classifier/bayes/io/ core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/ core/...
Date Sat, 10 Oct 2009 18:17:31 GMT
Author: srowen
Date: Sat Oct 10 18:17:30 2009
New Revision: 823911

URL: http://svn.apache.org/viewvc?rev=823911&view=rev
Log:
Committing MAHOUT-148 on behalf of Robin

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/common/StringTuple.java
Removed:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesNormalizedWeightDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesNormalizedWeightMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesNormalizedWeightReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaReducer.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/HBaseBayesDatastore.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfOutputFormat.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerOutputFormat.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesWeightSummerReducer.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/classifier/bayes/BayesFeatureMapperTest.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/classifier/bayes/TestClassifier.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/HBaseBayesDatastore.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/HBaseBayesDatastore.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/HBaseBayesDatastore.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/HBaseBayesDatastore.java Sat Oct 10 18:17:30 2009
@@ -33,6 +33,7 @@
 import org.apache.mahout.common.Parameters;
 import org.apache.mahout.classifier.bayes.exceptions.InvalidDatastoreException;
 import org.apache.mahout.classifier.bayes.interfaces.Datastore;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
 import org.apache.mahout.common.cache.Cache;
 import org.apache.mahout.common.cache.HybridCache;
 import org.slf4j.Logger;
@@ -40,8 +41,9 @@
 
 public class HBaseBayesDatastore implements Datastore {
 
-  private static final Logger log = LoggerFactory.getLogger(HBaseBayesDatastore.class);
-  
+  private static final Logger log = LoggerFactory
+      .getLogger(HBaseBayesDatastore.class);
+
   protected HBaseConfiguration config = null;
 
   protected HTable table = null;
@@ -71,15 +73,17 @@
     Collection<String> labels = getKeys("thetaNormalizer");
     for (String label : labels) {
       thetaNormalizer = Math.max(thetaNormalizer, Math.abs(getWeightFromHbase(
-          "*thetaNormalizer", label)));
+          BayesConstants.LABEL_THETA_NORMALIZER, label)));
     }
     for (String label : labels) {
-      System.out.println( label + " " +getWeightFromHbase(
-          "*thetaNormalizer", label) +" " +thetaNormalizer + " " + getWeightFromHbase(
-          "*thetaNormalizer", label)/thetaNormalizer);
+      System.out.println(label + " "
+          + getWeightFromHbase(BayesConstants.LABEL_THETA_NORMALIZER, label)
+          + " " + thetaNormalizer + " "
+          + getWeightFromHbase(BayesConstants.LABEL_THETA_NORMALIZER, label)
+          / thetaNormalizer);
     }
   }
-
+  
   final Map<String, Set<String>> keys = new HashMap<String, Set<String>>();
 
   @Override
@@ -89,18 +93,18 @@
       return keys.get(name);
     Result r = null;
     if (name.equals("labelWeight")) {
-      r = getRowFromHbase("*labelWeight");
+      r = getRowFromHbase(BayesConstants.LABEL_SUM);
     } else if (name.equals("thetaNormalizer")) {
-      r = getRowFromHbase("*thetaNormalizer");
+      r = getRowFromHbase(BayesConstants.LABEL_THETA_NORMALIZER);
     } else
       r = getRowFromHbase(name);
-    
-    if (r == null){
+
+    if (r == null) {
       log.error("Encountered NULL");
       throw new InvalidDatastoreException("Encountered NULL");
     }
 
-    Set<byte[]> labelBytes = r.getNoVersionMap().get(Bytes.toBytes("label"))
+    Set<byte[]> labelBytes = r.getNoVersionMap().get(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY))
         .keySet();
     Set<String> keySet = new HashSet<String>();
     for (byte[] key : labelBytes) {
@@ -134,9 +138,10 @@
         throw new InvalidDatastoreException();
 
     } else if (vectorName.equals("labelWeight")) {
-      return getWeightFromHbase("*labelWeight", index);
+      return getWeightFromHbase(BayesConstants.LABEL_SUM, index);
     } else if (vectorName.equals("thetaNormalizer")) {
-      return getWeightFromHbase("*thetaNormalizer", index) / thetaNormalizer;
+      return getWeightFromHbase(BayesConstants.LABEL_THETA_NORMALIZER, index)
+          / thetaNormalizer;
     } else {
 
       throw new InvalidDatastoreException();
@@ -156,7 +161,9 @@
       }
       tableCache.set(row, r);
     }
-    byte[] value = r.getValue(Bytes.toBytes("label"), Bytes.toBytes(column));
+    byte[] value = r.getValue(
+        Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes
+            .toBytes(column));
     if (value == null)
       return 0.0d;
     return Bytes.toDouble(value);
@@ -164,7 +171,7 @@
   }
 
   protected double getWeightFromHbase(String feature, String label) {
-    return getCachedCell(feature, "label", label);
+    return getCachedCell(feature, BayesConstants.HBASE_COLUMN_FAMILY, label);
   }
 
   protected Result getRowFromHbase(String feature) {
@@ -172,7 +179,7 @@
     try {
       if ((r = tableCache.get(feature)) == null) {
         Get g = new Get(Bytes.toBytes(feature));
-        g.addFamily(Bytes.toBytes("label"));
+        g.addFamily(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY));
         r = table.get(g);
         tableCache.set(feature, r);
         return r;
@@ -185,14 +192,15 @@
   }
 
   protected double getSigma_jFromHbase(String feature) {
-    return getCachedCell(feature, "label", "Sigma_j");
+    return getCachedCell(feature, BayesConstants.HBASE_COLUMN_FAMILY, BayesConstants.FEATURE_SUM);
   }
 
   protected double vocabCount = -1.0;
 
   protected double getVocabCountFromHbase() {
     if (vocabCount == -1.0) {
-      vocabCount = getCachedCell("*totalCounts", "label", "vocabCount");
+      vocabCount = getCachedCell(BayesConstants.HBASE_COUNTS_ROW,
+          BayesConstants.HBASE_COLUMN_FAMILY, BayesConstants.FEATURE_SET_SIZE);
       return vocabCount;
     } else {
       return vocabCount;
@@ -203,7 +211,8 @@
 
   protected double getSigma_jSigma_kFromHbase() {
     if (sigma_jSigma_k == -1.0) {
-      sigma_jSigma_k = getCachedCell("*totalCounts", "label", "sigma_jSigma_k");
+      sigma_jSigma_k = getCachedCell(BayesConstants.HBASE_COUNTS_ROW,
+          BayesConstants.HBASE_COLUMN_FAMILY, BayesConstants.TOTAL_SUM);
       return sigma_jSigma_k;
     } else {
       return sigma_jSigma_k;

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/datastore/InMemoryBayesDatastore.java Sat Oct 10 18:17:30 2009
@@ -154,14 +154,6 @@
     return matrix.size();
   }
 
-  private long sizeOfVector(String vectorName) {
-    Map<String, Double> vector = vectors.get(vectorName);
-    if (vector == null) {
-      return 0;
-    }
-    return vector.size();
-  }
-
   public void loadFeatureWeight(String feature, String label, double weight) {
     matrixPutCell("weight", feature, label, weight);
   }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/io/SequenceFileModelReader.java Sat Oct 10 18:17:30 2009
@@ -23,10 +23,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.mahout.classifier.bayes.datastore.InMemoryBayesDatastore;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
 import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,31 +35,33 @@
 import java.util.Map;
 
 /**
- * This Class reads the different interim  files created during the Training stage as well as the Model File during
- * testing.
+ * This Class reads the different interim files created during the Training
+ * stage as well as the Model File during testing.
  */
 public class SequenceFileModelReader {
 
-  private static final Logger log = LoggerFactory.getLogger(SequenceFileModelReader.class);
+  private static final Logger log = LoggerFactory
+      .getLogger(SequenceFileModelReader.class);
 
   private SequenceFileModelReader() {
   }
 
-  public static void loadModel(InMemoryBayesDatastore datastore, FileSystem fs, Parameters params,
-                               Configuration conf) throws IOException {
+  public static void loadModel(InMemoryBayesDatastore datastore, FileSystem fs,
+      Parameters params, Configuration conf) throws IOException {
 
     loadFeatureWeights(datastore, fs, new Path(params.get("sigma_j")), conf);
-    loadLabelWeights(datastore, fs, new Path(params.get("sigma_k")), conf); 
-    loadSumWeight(datastore, fs, new Path(params.get("sigma_kSigma_j")), conf); 
-    loadThetaNormalizer(datastore, fs, new Path(params.get("thetaNormalizer")), conf); 
+    loadLabelWeights(datastore, fs, new Path(params.get("sigma_k")), conf);
+    loadSumWeight(datastore, fs, new Path(params.get("sigma_kSigma_j")), conf);
+    loadThetaNormalizer(datastore, fs, new Path(params.get("thetaNormalizer")),
+        conf);
     loadWeightMatrix(datastore, fs, new Path(params.get("weight")), conf);
 
-
   }
 
-  public static void loadWeightMatrix(InMemoryBayesDatastore datastore, FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
+  public static void loadWeightMatrix(InMemoryBayesDatastore datastore,
+      FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
 
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -68,23 +70,20 @@
       log.info("{}", path);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
 
-      // the key is either _label_ or label,feature
+      // the key is label,feature
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
 
-        int idx = keyStr.indexOf(',');
-        if (idx != -1) {
-          datastore.loadFeatureWeight(keyStr.substring(idx + 1),keyStr.substring(0, idx), value.get());
-        }
+        datastore.loadFeatureWeight(key.stringAt(2), key.stringAt(1), value
+            .get());
 
       }
     }
   }
 
-  public static void loadFeatureWeights(InMemoryBayesDatastore datastore, FileSystem fs, Path pathPattern,
-                                        Configuration conf) throws IOException {
+  public static void loadFeatureWeights(InMemoryBayesDatastore datastore,
+      FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
 
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -96,13 +95,13 @@
       // the key is either _label_ or label,feature
       long count = 0;
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
 
-        if (keyStr.charAt(0) == ',') { // Sum of weights for a Feature
-          datastore.setSumFeatureWeight(keyStr.substring(1),
-              value.get());
+        if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) { // Sum of
+                                                                  // weights for
+                                                                  // a Feature
+          datastore.setSumFeatureWeight(key.stringAt(1), value.get());
           count++;
-          if (count % 50000 == 0){
+          if (count % 50000 == 0) {
             log.info("Read {} feature weights", count);
           }
         }
@@ -110,10 +109,10 @@
     }
   }
 
-  public static void loadLabelWeights(InMemoryBayesDatastore datastore,FileSystem fs, Path pathPattern,
-      Configuration conf) throws IOException {
+  public static void loadLabelWeights(InMemoryBayesDatastore datastore,
+      FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
 
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -122,16 +121,14 @@
       log.info("{}", path);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
 
-      // the key is either _label_ or label,feature
       long count = 0;
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
-
-        if (keyStr.charAt(0) == '_') { // Sum of weights in a Label
-          datastore.setSumLabelWeight(keyStr.substring(1), value
-              .get());
+        if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) { // Sum of
+                                                                // weights in a
+                                                                // Label
+          datastore.setSumLabelWeight(key.stringAt(1), value.get());
           count++;
-          if (count % 10000 == 0){
+          if (count % 10000 == 0) {
             log.info("Read {} label weights", count);
           }
         }
@@ -139,11 +136,10 @@
     }
   }
 
-  
-  public static void loadThetaNormalizer(InMemoryBayesDatastore datastore,FileSystem fs, Path pathPattern,
-      Configuration conf) throws IOException {
+  public static void loadThetaNormalizer(InMemoryBayesDatastore datastore,
+      FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
 
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -152,15 +148,17 @@
       log.info("{}", path);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
 
-      // the key is either _label_ or label,feature
       long count = 0;
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
-        if (keyStr.charAt(0) == '_') { // Sum of weights in a Label
-          datastore.setThetaNormalizer(keyStr.substring(1), value
-              .get());
+        if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) { // Sum
+                                                                             // of
+                                                                             // weights
+                                                                             // in
+                                                                             // a
+                                                                             // Label
+          datastore.setThetaNormalizer(key.stringAt(1), value.get());
           count++;
-          if (count % 50000 == 0){
+          if (count % 50000 == 0) {
             log.info("Read {} theta norms", count);
           }
         }
@@ -168,10 +166,10 @@
     }
   }
 
-  public static void loadSumWeight(InMemoryBayesDatastore datastore, FileSystem fs, Path pathPattern,
-                                   Configuration conf) throws IOException {
+  public static void loadSumWeight(InMemoryBayesDatastore datastore,
+      FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
 
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -180,12 +178,12 @@
       log.info("{}", path);
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
 
-      // the key is either _label_ or label,feature
+      // the key is _label
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
 
-        if (keyStr.charAt(0) == '*') { // Sum of weights for all Feature
-          // and all Labels
+        if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) { // Sum of
+                                                                // weights for
+          // all Features and all Labels
           datastore.setSigma_jSigma_k(value.get());
           log.info("{}", value.get());
         }
@@ -193,9 +191,10 @@
     }
   }
 
-  public static Map<String, Double> readLabelSums(FileSystem fs, Path pathPattern, Configuration conf) throws IOException {
+  public static Map<String, Double> readLabelSums(FileSystem fs,
+      Path pathPattern, Configuration conf) throws IOException {
     Map<String, Double> labelSum = new HashMap<String, Double>();
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -205,9 +204,9 @@
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
       // the key is either _label_ or label,feature
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
-        if (keyStr.charAt(0) == '_') { // Sum of weights of labels
-          labelSum.put(keyStr.substring(1), value.get());
+        if (key.stringAt(0).equals(BayesConstants.LABEL_SUM)) { // Sum of counts
+                                                                // of labels
+          labelSum.put(key.stringAt(1), value.get());
         }
 
       }
@@ -216,10 +215,10 @@
     return labelSum;
   }
 
-  public static Map<String, Double> readLabelDocumentCounts(FileSystem fs, Path pathPattern, Configuration conf)
-      throws IOException {
+  public static Map<String, Double> readLabelDocumentCounts(FileSystem fs,
+      Path pathPattern, Configuration conf) throws IOException {
     Map<String, Double> labelDocumentCounts = new HashMap<String, Double>();
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -228,9 +227,10 @@
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
       // the key is either _label_ or label,feature
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
-        if (keyStr.charAt(0) == '_') { // Count of Documents in a Label
-          labelDocumentCounts.put(keyStr.substring(1), value.get());
+        if (key.stringAt(0).equals(BayesConstants.LABEL_COUNT)) { // Count of
+                                                                  // Documents
+                                                                  // in a Label
+          labelDocumentCounts.put(key.stringAt(1), value.get());
         }
 
       }
@@ -240,9 +240,9 @@
   }
 
   public static double readSigma_jSigma_k(FileSystem fs, Path pathPattern,
-                                          Configuration conf) throws IOException {
+      Configuration conf) throws IOException {
     Map<String, Double> weightSum = new HashMap<String, Double>();
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -251,23 +251,22 @@
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
       // the key is *
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
         if (weightSum.size() > 1) {
           throw new IOException("Incorrect Sum File");
-        } else if (keyStr.charAt(0) == '*') {
-          weightSum.put(keyStr, value.get());
+        } else if (key.stringAt(0).equals(BayesConstants.TOTAL_SUM)) {
+          weightSum.put(BayesConstants.TOTAL_SUM, value.get());
         }
 
       }
     }
 
-    return weightSum.get("*");
+    return weightSum.get(BayesConstants.TOTAL_SUM);
   }
 
   public static double readVocabCount(FileSystem fs, Path pathPattern,
-                                      Configuration conf) throws IOException {
+      Configuration conf) throws IOException {
     Map<String, Double> weightSum = new HashMap<String, Double>();
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
 
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -276,18 +275,16 @@
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
       // the key is *
       while (reader.next(key, value)) {
-        String keyStr = key.toString();
         if (weightSum.size() > 1) {
           throw new IOException("Incorrect vocabCount File");
         }
-        if (keyStr.charAt(0) == '*') {
-          weightSum.put(keyStr, value.get());
+        if (key.stringAt(0).equals(BayesConstants.FEATURE_SET_SIZE)) {
+          weightSum.put(BayesConstants.FEATURE_SET_SIZE, value.get());
         }
 
       }
     }
 
-    return weightSum.get("*vocabCount");
+    return weightSum.get(BayesConstants.FEATURE_SET_SIZE);
   }
-
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierDriver.java Sat Oct 10 18:17:30 2009
@@ -23,8 +23,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -33,20 +31,18 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.mahout.classifier.ConfusionMatrix;
 import org.apache.mahout.classifier.bayes.common.BayesParameters;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.regex.Pattern;
 
 /** Create and run the Bayes Classifier */
 public class BayesClassifierDriver {
 
   private static final Logger log = LoggerFactory.getLogger(BayesClassifierDriver.class);
-  private static final Pattern CHUNK_DEIMITER_PATTERN = Pattern.compile("____");
-
   private BayesClassifierDriver() {
   }
 
@@ -55,11 +51,11 @@
    * 
    * @param params The Job parameters containing the gramSize, input output folders, defaultCat, encoding
    */
-  public static void runJob(BayesParameters params) throws IOException {
+  public static final void runJob(BayesParameters params) throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(BayesClassifierDriver.class);
     conf.setJobName("Bayes Classifier Driver running over input: " + params.get("testDirPath"));
-    conf.setOutputKeyClass(Text.class);
+    conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
 
     FileInputFormat.setInputPaths(conf, new Path(params.get("testDirPath")));
@@ -89,9 +85,9 @@
     log.info("{}",matrix.summarize());
   }
 
-  private static ConfusionMatrix readResult(FileSystem fs, Path pathPattern, Configuration conf, BayesParameters params) throws IOException{
+  private static final ConfusionMatrix readResult(FileSystem fs, Path pathPattern, Configuration conf, BayesParameters params) throws IOException{
    
-    Writable key = new Text();
+    StringTuple key = new StringTuple();
     DoubleWritable value = new DoubleWritable();
     String defaultLabel = params.get("defaultCat");
     FileStatus[] outputFiles = fs.globStatus(pathPattern);
@@ -100,12 +96,9 @@
     for (FileStatus fileStatus : outputFiles) {
       Path path = fileStatus.getPath();
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      // the key is correctLabel____classifiedLabel value is count
-      while (reader.next(key, value)) {
-        String keyStr = key.toString();
-        String[] chunks = CHUNK_DEIMITER_PATTERN.split(keyStr);
-        String correctLabel = chunks[0];
-        String classifiedLabel = chunks[1];
+      while (reader.next(key, value)) {        
+        String correctLabel = key.stringAt(1);
+        String classifiedLabel = key.stringAt(2);
         Map<String, Integer> rowMatrix = confusionMatrix.get(correctLabel);
         if(rowMatrix == null)
           rowMatrix = new HashMap<String, Integer>();        

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierMapper.java Sat Oct 10 18:17:30 2009
@@ -34,8 +34,10 @@
 import org.apache.mahout.classifier.bayes.exceptions.InvalidDatastoreException;
 import org.apache.mahout.classifier.bayes.interfaces.Algorithm;
 import org.apache.mahout.classifier.bayes.interfaces.Datastore;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
 import org.apache.mahout.classifier.bayes.model.ClassifierContext;
 import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.common.nlp.NGrams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,12 +47,10 @@
 
 /** Reads the input train set(preprocessed using the {@link BayesFileFormatter}). */
 public class BayesClassifierMapper extends MapReduceBase implements
-    Mapper<Text, Text, Text, DoubleWritable> {
+    Mapper<Text, Text, StringTuple, DoubleWritable> {
 
   private static final Logger log = LoggerFactory.getLogger(BayesClassifierMapper.class);
 
-  private static final DoubleWritable one = new DoubleWritable(1.0);
-
   private int gramSize = 1;
   
   ClassifierContext classifier = null;
@@ -66,7 +66,7 @@
    */
   @Override
   public void map(Text key, Text value,
-                  OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+                  OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
     //String line = value.toString();
     String label = key.toString();
@@ -80,9 +80,15 @@
     try {
       ClassifierResult result = classifier.classifyDocument( ngrams
           .toArray(new String[ngrams.size()]), defaultCategory);
+     
       String correctLabel = label;
       String classifiedLabel = result.getLabel();
-      output.collect(new Text(correctLabel+"____"+classifiedLabel), new DoubleWritable(1.0d));
+      
+      StringTuple outputTuple = new StringTuple(BayesConstants.CLASSIFIER_TUPLE);
+      outputTuple.add(correctLabel);
+      outputTuple.add(classifiedLabel);
+      
+      output.collect(outputTuple, new DoubleWritable(1.0d));
     } catch (InvalidDatastoreException e) {
       throw new IOException(e.toString());
     }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesClassifierReducer.java Sat Oct 10 18:17:30 2009
@@ -18,23 +18,23 @@
 package org.apache.mahout.classifier.bayes.mapreduce.bayes;
 
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.StringTuple;
 
 import java.io.IOException;
 import java.util.Iterator;
 
 /** Can also be used as a local Combiner. A simple summing reducer */
 public class BayesClassifierReducer extends MapReduceBase
-    implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
+    implements Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
   @Override
-  public void reduce(Text key,
+  public void reduce(StringTuple key,
                      Iterator<DoubleWritable> values,
-                     OutputCollector<Text, DoubleWritable> output,
+                     OutputCollector<StringTuple, DoubleWritable> output,
                      Reporter reporter) throws IOException {
     //Key is label,word, value is the number of times we've seen this label word per local node.  Output is the same
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesDriver.java Sat Oct 10 18:17:30 2009
@@ -25,7 +25,6 @@
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesTfIdfDriver;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesWeightSummerDriver;
-import org.apache.mahout.classifier.bayes.mapreduce.common.JobExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,19 +36,6 @@
   private static final Logger log = LoggerFactory.getLogger(BayesDriver.class);
 
   /**
-   * Takes in two arguments: <ol> <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
-   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the Model as a
-   * {@link org.apache.hadoop.io.SequenceFile}</li> </ol>
-   *
-   * @param args The args
-   * @throws Exception in case of job execution problems. 
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new BayesDriver());
-  }
-
-  /**
    * Run the job
    *
    * @param input  the input pathname String
@@ -81,17 +67,11 @@
     BayesWeightSummerDriver summer = new BayesWeightSummerDriver();
     summer.runJob(input, output, params);
 
-    //Calculate the W_ij = log(Theta) for each label, feature. This step actually generates the complement class
-    //CBayesThetaDriver.runJob(input, output);
-
     log.info("Calculating the weight Normalisation factor for each class...");
     //Calculate the normalization factor Sigma_W_ij for each complement class.
     BayesThetaNormalizerDriver normalizer = new BayesThetaNormalizerDriver();
     normalizer.runJob(input, output, params);
 
-    //Calculate the normalization factor Sigma_W_ij for each complement class.
-    //CBayesNormalizedWeightDriver.runJob(input, output);
-
     Path docCountOutPath = new Path(output + "/trainer-docCount");
     if (dfs.exists(docCountOutPath)) {
       dfs.delete(docCountOutPath, true);
@@ -112,22 +92,10 @@
     if (dfs.exists(vocabCountPath)) {
       dfs.delete(vocabCountPath, true);
     }
-    /*Path tfIdfOutPath = new Path(output+ "/trainer-tfIdf");
-    if (dfs.exists(tfIdfOutPath))
-      dfs.delete(tfIdfOutPath, true);*/
     Path vocabCountOutPath = new Path(output + "/trainer-vocabCount");
     if (dfs.exists(vocabCountOutPath)) {
       dfs.delete(vocabCountOutPath, true);
     }
-    /* Path weightsOutPath = new Path(output+ "/trainer-weights");
- if (dfs.exists(weightsOutPath))
-   dfs.delete(weightsOutPath, true);*/
-    /*Path thetaOutPath = new Path(output+ "/trainer-theta");
-    if (dfs.exists(thetaOutPath))
-      dfs.delete(thetaOutPath, true);*/
-    /*Path thetaNormalizerOutPath = new Path(output+ "/trainer-thetaNormalizer");
-    if (dfs.exists(thetaNormalizerOutPath))
-      dfs.delete(thetaNormalizerOutPath, true);*/
 
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerDriver.java Sat Oct 10 18:17:30 2009
@@ -21,7 +21,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -32,7 +31,7 @@
 import org.apache.mahout.classifier.bayes.common.BayesParameters;
 import org.apache.mahout.classifier.bayes.io.SequenceFileModelReader;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
-import org.apache.mahout.classifier.bayes.mapreduce.common.JobExecutor;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,18 +44,6 @@
   private static final Logger log = LoggerFactory.getLogger(BayesThetaNormalizerDriver.class);
 
   /**
-   * Takes in two arguments: <ol> <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
-   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the the interim filesas a {@link
-   * org.apache.hadoop.io.SequenceFile}</li> </ol>
-   *
-   * @param args The args - should contain input and output path.
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new BayesThetaNormalizerDriver());
-  }
-
-  /**
    * Run the job
    *
    * @param input  the input pathname String
@@ -69,7 +56,7 @@
 
     conf.setJobName("Bayes Theta Normalizer Driver running over input: " +  input);
 
-    conf.setOutputKeyClass(Text.class);
+    conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
     FileInputFormat.addInputPath(conf, new Path(output + "/trainer-tfIdf/trainer-tfIdf"));
     Path outPath = new Path(output + "/trainer-thetaNormalizer");
@@ -120,7 +107,8 @@
     conf.set("cnaivebayes.vocabCount", vocabCountString);
     double retvocabCount = stringifier.fromString(vocabCountString);
     log.info("{}", retvocabCount);
-
+    conf.set("bayes.parameters", params.toString());
+    conf.set("output.table", output);
     client.setConf(conf);
 
     JobClient.runJob(conf);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerMapper.java Sat Oct 10 18:17:30 2009
@@ -19,13 +19,14 @@
 
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,14 +35,14 @@
 import java.util.Map;
 
 public class BayesThetaNormalizerMapper extends MapReduceBase implements
-    Mapper<Text, DoubleWritable, Text, DoubleWritable> {
+    Mapper<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
   private static final Logger log = LoggerFactory.getLogger(BayesThetaNormalizerMapper.class);
 
   private Map<String, Double> labelWeightSum = null;
   private double sigma_jSigma_k = 0.0;
   private double vocabCount = 0.0;
-
+  private double alpha_i = 1.0;    
   /**
    * We need to calculate the thetaNormalization factor of each label
    *
@@ -49,18 +50,18 @@
    * @param value The tfIdf of the pair
    */
   @Override
-  public void map(Text key, DoubleWritable value,
-                  OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+  public void map(StringTuple key, DoubleWritable value,
+                  OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
 
-    String labelFeaturePair = key.toString();
+    String label = key.stringAt(1);
 
-    int comma = labelFeaturePair.indexOf(',');
-    String label = comma < 0 ? labelFeaturePair : labelFeaturePair.substring(0, comma);
     reporter.setStatus("Bayes Theta Normalizer Mapper: " + label);
     double alpha_i = 1.0;
     double weight = Math.log((value.get() + alpha_i) / (labelWeightSum.get(label) + vocabCount));
-    output.collect(new Text(('_' + label).trim()), new DoubleWritable(weight));
+    StringTuple thetaNormalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
+    thetaNormalizerTuple.add(label);
+    output.collect(thetaNormalizerTuple, new DoubleWritable(weight));
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/bayes/BayesThetaNormalizerReducer.java Sat Oct 10 18:17:30 2009
@@ -17,38 +17,92 @@
 
 package org.apache.mahout.classifier.bayes.mapreduce.bayes;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
+import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
 
-/** Can also be used as a local Combiner beacuse only two values should be there inside the values */
+/**
+ * Can also be used as a local Combiner beacuse only two values should be there
+ * inside the values
+ */
 public class BayesThetaNormalizerReducer extends MapReduceBase implements
-    Reducer<Text, DoubleWritable, Text, DoubleWritable> {
+    Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
+  private static final Logger log = LoggerFactory
+      .getLogger(BayesThetaNormalizerReducer.class);
+
+  private HTable table;
+
+  private HBaseConfiguration HBconf;
+
+  boolean useHbase = false;
 
   @Override
-  public void reduce(Text key, Iterator<DoubleWritable> values,
-                     OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+  public void reduce(StringTuple key, Iterator<DoubleWritable> values,
+      OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
     // Key is label,word, value is the number of times we've seen this label
     // word per local node. Output is the same
 
-    //String token = key.toString();
+    // String token = key.toString();
 
     double weightSumPerLabel = 0.0;
 
     while (values.hasNext()) {
-      reporter.setStatus("Bayes Theta Normalizer Reducer: " + key);	
+      reporter.setStatus("Bayes Theta Normalizer Reducer: " + key);
       weightSumPerLabel += values.next().get();
     }
-    reporter.setStatus("Bayes Theta Normalizer Reducer: " + key + " => "+ weightSumPerLabel);
+    reporter.setStatus("Bayes Theta Normalizer Reducer: " + key + " => "
+        + weightSumPerLabel);
+    if (useHbase) {    
+      if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
+       String label = key.stringAt(1);
+       Put bu = new Put(Bytes.toBytes(BayesConstants.LABEL_THETA_NORMALIZER));
+       bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes.toBytes(label), Bytes
+           .toBytes(weightSumPerLabel));
+       table.put(bu);
+     }
+   }
     output.collect(key, new DoubleWritable(weightSumPerLabel));
 
   }
 
+  @Override
+  public void configure(JobConf job) {
+    try {
+      Parameters params = Parameters
+          .fromString(job.get("bayes.parameters", ""));
+      if (params.get("dataSource").equals("hbase"))
+        useHbase = true;
+      else
+        return;
+
+      HBconf = new HBaseConfiguration(job);
+      table = new HTable(HBconf, job.get("output.table"));
+    } catch (IOException e) {
+      log.error("Unexpected error during configuration", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (useHbase) {
+      table.close();
+    }
+    super.close();
+  }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesDriver.java Sat Oct 10 18:17:30 2009
@@ -25,7 +25,6 @@
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesTfIdfDriver;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesWeightSummerDriver;
-import org.apache.mahout.classifier.bayes.mapreduce.common.JobExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,20 +34,7 @@
 public class CBayesDriver implements BayesJob{
 
   private static final Logger log = LoggerFactory.getLogger(CBayesDriver.class);
-
-  /**
-   * Takes in two arguments: <ol> <li>The input {@link Path} where the input documents live</li>
-   * <li>The output {@link Path} where to write the Model as a
-   * {@link org.apache.hadoop.io.SequenceFile}</li> </ol>
-   *
-   * @param args The args input and output path.
-   * @throws Exception in case of problems during job execution. 
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new CBayesDriver());
-  }
-
+  
   /**
    * Run the job
    *
@@ -81,17 +67,11 @@
     BayesWeightSummerDriver summer = new BayesWeightSummerDriver();
     summer.runJob(input, output, params);
 
-    //Calculate the W_ij = log(Theta) for each label, feature. This step actually generates the complement class
-    //CBayesThetaDriver.runJob(input, output);
-
     log.info("Calculating the weight Normalisation factor for each complement class...");
     //Calculate the normalization factor Sigma_W_ij for each complement class.
     CBayesThetaNormalizerDriver normalizer = new CBayesThetaNormalizerDriver();
     normalizer.runJob(input, output, params);
 
-    //Calculate the normalization factor Sigma_W_ij for each complement class.
-    //CBayesNormalizedWeightDriver.runJob(input, output);
-
     Path docCountOutPath = new Path(output + "/trainer-docCount");
     if (dfs.exists(docCountOutPath)) {
       dfs.delete(docCountOutPath, true);
@@ -112,22 +92,11 @@
     if (dfs.exists(vocabCountPath)) {
       dfs.delete(vocabCountPath, true);
     }
-    /*Path tfIdfOutPath = new Path(output+ "/trainer-tfIdf");
-    if (dfs.exists(tfIdfOutPath))
-      dfs.delete(tfIdfOutPath, true);*/
     Path vocabCountOutPath = new Path(output + "/trainer-vocabCount");
     if (dfs.exists(vocabCountOutPath)) {
       dfs.delete(vocabCountOutPath, true);
     }
-    /* Path weightsOutPath = new Path(output+ "/trainer-weights");
- if (dfs.exists(weightsOutPath))
-   dfs.delete(weightsOutPath, true);*/
-    /*Path thetaOutPath = new Path(output+ "/trainer-theta");
-    if (dfs.exists(thetaOutPath))
-      dfs.delete(thetaOutPath, true);*/
-    /*Path thetaNormalizerOutPath = new Path(output+ "/trainer-thetaNormalizer");
-    if (dfs.exists(thetaNormalizerOutPath))
-      dfs.delete(thetaNormalizerOutPath, true);*/
+
 
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerDriver.java Sat Oct 10 18:17:30 2009
@@ -21,7 +21,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -32,7 +31,7 @@
 import org.apache.mahout.classifier.bayes.common.BayesParameters;
 import org.apache.mahout.classifier.bayes.io.SequenceFileModelReader;
 import org.apache.mahout.classifier.bayes.mapreduce.common.BayesJob;
-import org.apache.mahout.classifier.bayes.mapreduce.common.JobExecutor;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,18 +44,6 @@
   private static final Logger log = LoggerFactory.getLogger(CBayesThetaNormalizerDriver.class);
 
   /**
-   * Takes in two arguments: <ol> <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
-   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the Model as a
-   * {@link org.apache.hadoop.io.SequenceFile}</li> </ol>
-   *
-   * @param args The args input and output path.
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new CBayesThetaNormalizerDriver());
-  }
-
-  /**
    * Run the job
    *
    * @param input  the input pathname String
@@ -69,7 +56,7 @@
     conf.setJobName("Complementary Bayes Theta Normalizer Driver running over input: " +  input);
 
 
-    conf.setOutputKeyClass(Text.class);
+    conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
     FileInputFormat.addInputPath(conf, new Path(output + "/trainer-weights/Sigma_j"));
     FileInputFormat.addInputPath(conf, new Path(output + "/trainer-tfIdf/trainer-tfIdf"));

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerMapper.java Sat Oct 10 18:17:30 2009
@@ -19,13 +19,14 @@
 
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@
 import java.util.Map;
 
 public class CBayesThetaNormalizerMapper extends MapReduceBase implements
-    Mapper<Text, DoubleWritable, Text, DoubleWritable> {
+    Mapper<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
   private static final Logger log = LoggerFactory.getLogger(CBayesThetaNormalizerMapper.class);
 
@@ -48,34 +49,37 @@
    * @param key The label,feature pair (can either be the freq Count or the term Document count
    */
   @Override
-  public void map(Text key, DoubleWritable value,
-                  OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+  public void map(StringTuple key, DoubleWritable value,
+                  OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
 
-    String labelFeaturePair = key.toString();
-    if (labelFeaturePair.charAt(0) == ',') { // if it is from the Sigma_j folder
+    if (key.stringAt(0).equals(BayesConstants.FEATURE_SUM)) { // if it is from the Sigma_j folder
 
       double alpha_i = 1.0;
       for (Map.Entry<String, Double> stringDoubleEntry : labelWeightSum.entrySet()) {
+        String label = stringDoubleEntry.getKey();
         double weight = Math.log((value.get() + alpha_i) / (sigma_jSigma_k - stringDoubleEntry.getValue() + vocabCount));
         
         reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: " + stringDoubleEntry + " => " + weight);
-        
-        output.collect(new Text(('_' + stringDoubleEntry.getKey()).trim()), new DoubleWritable(weight)); //output Sigma_j
+        StringTuple normalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
+        normalizerTuple.add(label);        
+        output.collect(normalizerTuple, new DoubleWritable(weight)); //output Sigma_j
 
       }
 
     } else {
-      int comma = labelFeaturePair.indexOf(',');
-      String label = comma < 0 ? labelFeaturePair : labelFeaturePair.substring(0, comma);
-
+      String label = key.stringAt(1);
+         
       double D_ij = value.get();
       double denominator = 0.5 * ((sigma_jSigma_k / vocabCount) + (D_ij * this.labelWeightSum.size()));
       double weight = Math.log(1.0 - D_ij / denominator);
 
       reporter.setStatus("Complementary Bayes Theta Normalizer Mapper: " + label + " => " + weight);
+     
+      StringTuple normalizerTuple = new StringTuple(BayesConstants.LABEL_THETA_NORMALIZER);
+      normalizerTuple.add(label);    
       
-      output.collect(new Text(('_' +label).trim()), new DoubleWritable(weight));//output -D_ij
+      output.collect(normalizerTuple, new DoubleWritable(weight));//output -D_ij
      
 
     }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/cbayes/CBayesThetaNormalizerReducer.java Sat Oct 10 18:17:30 2009
@@ -22,13 +22,14 @@
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.classifier.bayes.mapreduce.common.BayesConstants;
 import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +38,7 @@
 
 /** Can also be used as a local Combiner beacuse only two values should be there inside the values */
 public class CBayesThetaNormalizerReducer extends MapReduceBase implements
-    Reducer<Text, DoubleWritable, Text, DoubleWritable> {
+    Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
   private static final Logger log = LoggerFactory
       .getLogger(CBayesThetaNormalizerReducer.class);
@@ -49,8 +50,8 @@
   boolean useHbase = false;
 
   @Override
-  public void reduce(Text key, Iterator<DoubleWritable> values,
-                     OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+  public void reduce(StringTuple key, Iterator<DoubleWritable> values,
+                     OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
     // Key is label,word, value is the number of times we've seen this label
     // word per local node. Output is the same
@@ -62,12 +63,12 @@
       weightSumPerLabel += values.next().get();
     }
     reporter.setStatus("Complementary Bayes Theta Normalizer Reducer: " + key + " => " + weightSumPerLabel);
-    char firstChar = key.toString().charAt(0);
+
     if (useHbase) {    
-       if (firstChar == '_') {
-        String label = key.toString().substring(1);
-        Put bu = new Put(Bytes.toBytes("*thetaNormalizer"));
-        bu.add(Bytes.toBytes("label"), Bytes.toBytes(label), Bytes
+       if (key.stringAt(0).equals(BayesConstants.LABEL_THETA_NORMALIZER)) {
+        String label = key.stringAt(1);
+        Put bu = new Put(Bytes.toBytes(BayesConstants.LABEL_THETA_NORMALIZER));
+        bu.add(Bytes.toBytes(BayesConstants.HBASE_COLUMN_FAMILY), Bytes.toBytes(label), Bytes
             .toBytes(weightSumPerLabel));
         table.put(bu);
       }

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java?rev=823911&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesConstants.java Sat Oct 10 18:17:30 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.classifier.bayes.mapreduce.common;
+
+public final class BayesConstants {
+
+  private BayesConstants() {
+  }
+
+  public static final String DOCUMENT_FREQUENCY = "__DF"; // -
+
+  public static final String LABEL_COUNT = "__LC"; // _
+
+  public static final String FEATURE_COUNT = "__FC"; // ,
+
+  public static final String WEIGHT = "__WT";
+
+  public static final String FEATURE_SET_SIZE = "__FS";
+
+  public static final String FEATURE_SUM = "__SJ";
+
+  public static final String LABEL_SUM = "__SK";
+
+  public static final String TOTAL_SUM = "_SJSK";
+
+  public static final String CLASSIFIER_TUPLE = "__CT";
+
+  public static final String LABEL_THETA_NORMALIZER = "_LTN";
+
+  public static final String HBASE_COUNTS_ROW = "_HBASE_COUNTS_ROW";
+
+  public static final String HBASE_COLUMN_FAMILY = "LABEL";
+
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureDriver.java Sat Oct 10 18:17:30 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 import org.apache.mahout.classifier.bayes.common.BayesParameters;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,19 +39,6 @@
   private static final Logger log = LoggerFactory.getLogger(BayesFeatureDriver.class);
 
   /**
-   * Takes in two arguments: <ol> <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
-   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the interim files as a {@link
-   * org.apache.hadoop.io.SequenceFile}</li> </ol>
-   *
-   * @param args The args - input and output path.
-   * @throws Exception in case of problems during job execution.
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new BayesFeatureDriver());
-  }
-
-  /**
    * Run the job
    *
    * @param input  the input pathname String
@@ -61,14 +49,13 @@
     JobClient client = new JobClient();
     JobConf conf = new JobConf(BayesFeatureDriver.class);
     conf.setJobName("Bayes Feature Driver running over input: " +  input);
-    conf.setOutputKeyClass(Text.class);
+    conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(output);
     FileOutputFormat.setOutputPath(conf, outPath);
-    //conf.setNumMapTasks(100);
-    //conf.setNumReduceTasks(1);
+    
     conf.setMapperClass(BayesFeatureMapper.class);
 
     conf.setInputFormat(KeyValueTextInputFormat.class);
@@ -77,7 +64,7 @@
     conf.setOutputFormat(BayesFeatureOutputFormat.class);
     conf.set("io.serializations",
         "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");
-    // Dont ever forget this. People should keep track of how hadoop conf parameters and make or break a piece of code
+    // this conf parameter needs to be set enable serialisation of conf values
 
     FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
     if (dfs.exists(outPath)) {

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureMapper.java Sat Oct 10 18:17:30 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.classifier.BayesFileFormatter;
 import org.apache.mahout.common.Parameters;
+import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.common.nlp.NGrams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,14 +38,12 @@
 
 /** Reads the input train set(preprocessed using the {@link BayesFileFormatter}). */
 public class BayesFeatureMapper extends MapReduceBase implements
-    Mapper<Text, Text, Text, DoubleWritable> {
+    Mapper<Text, Text, StringTuple, DoubleWritable> {
 
   private static final Logger log = LoggerFactory.getLogger(BayesFeatureMapper.class);
 
   private static final DoubleWritable one = new DoubleWritable(1.0);
 
-  private final Text labelWord = new Text();
-
   private int gramSize = 1;
 
   /**
@@ -60,17 +59,13 @@
    */
   @Override
   public void map(Text key, Text value,
-                  OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+                  OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
     //String line = value.toString();
     String label = key.toString();
-    int keyLen = label.length();
 
     Map<String, int[]> wordList = new HashMap<String, int[]>(1000);
 
-    StringBuilder builder = new StringBuilder(label);
-    builder.ensureCapacity(32);// make sure we have a reasonably size buffer to
-                               // begin with
     List<String> ngrams  = new NGrams(value.toString(), gramSize).generateNGramsWithoutLabel(); 
 
     for (String ngram : ngrams) {
@@ -95,31 +90,39 @@
     for (Map.Entry<String, int[]> entry : wordList.entrySet()) {
       // key is label,word
       String token = entry.getKey();
-      builder.append(',').append(token);
-      labelWord.set(builder.toString());
+      StringTuple tuple = new StringTuple();
+      tuple.add(BayesConstants.WEIGHT);
+      tuple.add(label);
+      tuple.add(token);
       DoubleWritable f = new DoubleWritable(Math.log(1.0 + entry.getValue()[0]) / lengthNormalisation);
-      output.collect(labelWord, f);
-      builder.setLength(keyLen);// truncate back
+      output.collect(tuple, f);
     }
     reporter.setStatus("Bayes Feature Mapper: Document Label: " + label);  
     
     // Output Document Frequency per Word per Class
-    String dflabel = '-' + label;
-    int dfKeyLen = dflabel.length();
-    builder = new StringBuilder(dflabel);
+    
     for (String token : wordList.keySet()) {
       // key is label,word
-      builder.append(',').append(token);
-      labelWord.set(builder.toString());
-      output.collect(labelWord, one);
-      output.collect(new Text(',' + token), one);
-      builder.setLength(dfKeyLen);// truncate back
+      
+      StringTuple dfTuple = new StringTuple();
+      dfTuple.add(BayesConstants.DOCUMENT_FREQUENCY);
+      dfTuple.add(label);
+      dfTuple.add(token);      
+      output.collect(dfTuple, one);
+      
+      StringTuple tokenCountTuple = new StringTuple();
+      tokenCountTuple.add(BayesConstants.FEATURE_COUNT);
+      tokenCountTuple.add(token);
+      output.collect(tokenCountTuple, one);
 
     }
 
     // output that we have seen the label to calculate the Count of Document per
     // class
-    output.collect(new Text('_' + label), one);
+    StringTuple labelCountTuple = new StringTuple();
+    labelCountTuple.add(BayesConstants.LABEL_COUNT);
+    labelCountTuple.add(label);
+    output.collect(labelCountTuple, one);
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureOutputFormat.java Sat Oct 10 18:17:30 2009
@@ -18,7 +18,6 @@
 package org.apache.mahout.classifier.bayes.mapreduce.common;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -26,6 +25,7 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
 import org.apache.hadoop.util.Progressable;
+import org.apache.mahout.common.StringTuple;
 
 import java.io.IOException;
 
@@ -49,16 +49,25 @@
 
   @Override
   protected String generateFileNameForKeyValue(WritableComparable<?> k, Writable v, String name) {
-    Text key = (Text) k;
-    char firstChar = key.toString().charAt(0);
-    if (firstChar == '_') {
-      return "trainer-docCount/" + name;
-    } else if (firstChar == '-') {
-      return "trainer-termDocCount/" + name;
-    } else if (firstChar == ',') {
-      return "trainer-featureCount/" + name;
+    StringTuple key = (StringTuple) k;
+    if(key.length() == 3)
+    {
+      if(key.stringAt(0).equals(BayesConstants.WEIGHT))
+        return "trainer-wordFreq/" + name;
+      else if(key.stringAt(0).equals(BayesConstants.DOCUMENT_FREQUENCY))
+        return "trainer-termDocCount/" + name;
+      else throw new RuntimeException("Unrecognized Tuple: " + key);
     }
-    return "trainer-wordFreq/" + name;
+    else if(key.length() == 2)
+    {
+      if(key.stringAt(0).equals(BayesConstants.FEATURE_COUNT))
+        return "trainer-featureCount/" + name;
+      else if(key.stringAt(0).equals(BayesConstants.LABEL_COUNT))
+        return "trainer-docCount/" + name;
+      else throw new RuntimeException("Unrecognized Tuple: " + key);
+    }
+    else
+      throw new RuntimeException("Unrecognized Tuple: " + key);    
   }
 
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesFeatureReducer.java Sat Oct 10 18:17:30 2009
@@ -18,23 +18,23 @@
 package org.apache.mahout.classifier.bayes.mapreduce.common;
 
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.mahout.common.StringTuple;
 
 import java.io.IOException;
 import java.util.Iterator;
 
 /** Can also be used as a local Combiner. A simple summing reducer */
 public class BayesFeatureReducer extends MapReduceBase
-    implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {
+    implements Reducer<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
   @Override
-  public void reduce(Text key,
+  public void reduce(StringTuple key,
                      Iterator<DoubleWritable> values,
-                     OutputCollector<Text, DoubleWritable> output,
+                     OutputCollector<StringTuple, DoubleWritable> output,
                      Reporter reporter) throws IOException {
     //Key is label,word, value is the number of times we've seen this label word per local node.  Output is the same
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfDriver.java Sat Oct 10 18:17:30 2009
@@ -26,7 +26,6 @@
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
@@ -35,6 +34,7 @@
 import org.apache.hadoop.util.GenericsUtil;
 import org.apache.mahout.classifier.bayes.common.BayesParameters;
 import org.apache.mahout.classifier.bayes.io.SequenceFileModelReader;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,23 +47,6 @@
   private static final Logger log = LoggerFactory.getLogger(BayesTfIdfDriver.class);
 
   /**
-   * Takes in two arguments:
-   * <ol>
-   * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents
-   * live</li>
-   * <li>The output {@link org.apache.hadoop.fs.Path} where to write the interim
-   * files as a {@link org.apache.hadoop.io.SequenceFile}</li>
-   * </ol>
-   * 
-   * @param args The args - input and output path.
-   * @throws Exception in case of problems during job execution.
-   */
-  public static void main(String[] args) throws Exception {
-    JobExecutor executor = new JobExecutor();
-    executor.execute(args, new BayesTfIdfDriver());
-  }
-
-  /**
    * Run the job
    * 
    * @param input the input pathname String
@@ -77,7 +60,7 @@
     JobConf conf = new JobConf(BayesWeightSummerDriver.class);
     conf.setJobName("TfIdf Driver running over input: " + input);
 
-    conf.setOutputKeyClass(Text.class);
+    conf.setOutputKeyClass(StringTuple.class);
     conf.setOutputValueClass(DoubleWritable.class);
 
     FileInputFormat.addInputPath(conf, new Path(output + "/trainer-termDocCount"));
@@ -125,7 +108,7 @@
     if (params.get("dataSource").equals("hbase")) {
       HBaseConfiguration hc = new HBaseConfiguration(new Configuration());
       HTableDescriptor ht = new HTableDescriptor(output);
-      HColumnDescriptor hcd = new HColumnDescriptor("label:");
+      HColumnDescriptor hcd = new HColumnDescriptor(BayesConstants.HBASE_COLUMN_FAMILY+":");
       hcd.setBloomfilter(true);
       hcd.setInMemory(true);
       hcd.setMaxVersions(1);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfMapper.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfMapper.java Sat Oct 10 18:17:30 2009
@@ -19,13 +19,13 @@
 
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.StringTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,13 +34,18 @@
 import java.util.Map;
 
 public class BayesTfIdfMapper extends MapReduceBase implements
+    Mapper<StringTuple, DoubleWritable, StringTuple, DoubleWritable> {
 
-Mapper<Text, DoubleWritable, Text, DoubleWritable> {
-
-  private static final Logger log = LoggerFactory.getLogger(BayesTfIdfMapper.class);
+  private static final Logger log = LoggerFactory
+      .getLogger(BayesTfIdfMapper.class);
 
   private Map<String, Double> labelDocumentCounts = null;
 
+  private static StringTuple vocabCount = new StringTuple(
+      BayesConstants.FEATURE_SET_SIZE);
+
+  private static DoubleWritable one = new DoubleWritable(1.0d);
+
   /**
    * We need to calculate the Tf-Idf of each feature in each label
    * 
@@ -48,44 +53,31 @@
    *        Document count
    */
   @Override
-  public void map(Text key, DoubleWritable value, OutputCollector<Text, DoubleWritable> output, Reporter reporter)
+  public void map(StringTuple key, DoubleWritable value,
+      OutputCollector<StringTuple, DoubleWritable> output, Reporter reporter)
       throws IOException {
 
-    String labelFeaturePair = key.toString();
-
-    char firstChar = labelFeaturePair.charAt(0);
-    switch (firstChar) {
-      case '-': {// if it is the termDocumentCount
-        labelFeaturePair = labelFeaturePair.substring(1);
-        // -17th_century_mathematicians_anderson__alexander,1582
-        int idx = labelFeaturePair.indexOf(',');
-        if (idx != -1) {
-          String label = labelFeaturePair.substring(0, idx);
-
-          Double labelDocumentCount = labelDocumentCounts.get(label);
-          if (labelDocumentCount == null) {
-            throw new IOException("Invalid label: " + label);
-          }
-          double logIdf = Math.log(labelDocumentCount / value.get());
-          output.collect(new Text(labelFeaturePair), new DoubleWritable(logIdf));
-          reporter.setStatus("Bayes TfIdf Mapper: log(Idf): " + labelFeaturePair);
-        } else {
-          throw new IOException("Invalid ");
-        }
-        break;
-      }
-
-      case ',': {
-        output.collect(new Text("*vocabCount"), new DoubleWritable(1.0));
-        reporter.setStatus("Bayes TfIdf Mapper: vocabCount");
-        break;
-      }
-      default: {
+    if (key.length() == 3) {
+      if (key.stringAt(0).equals(BayesConstants.WEIGHT)) {
         reporter.setStatus("Bayes TfIdf Mapper: Tf: " + key);
         output.collect(key, value);
-        break;
-      }
+      } else if (key.stringAt(0).equals(BayesConstants.DOCUMENT_FREQUENCY)) {
+        String label = key.stringAt(1);
+        Double labelDocumentCount = labelDocumentCounts.get(label);
+        double logIdf = Math.log(labelDocumentCount / value.get());
+        key.replaceAt(0, BayesConstants.WEIGHT);
+        output.collect(key, new DoubleWritable(logIdf));
+        reporter.setStatus("Bayes TfIdf Mapper: log(Idf): " + key);
+      } else
+        throw new RuntimeException("Unrecognized Tuple: " + key);
+    } else if (key.length() == 2) {
+      if (key.stringAt(0).equals(BayesConstants.FEATURE_COUNT)) {
+        output.collect(vocabCount, one);
+        reporter.setStatus("Bayes TfIdf Mapper: vocabCount");
+      } else
+        throw new RuntimeException("Unexpected Tuple: " + key);
     }
+
   }
 
   @Override
@@ -94,13 +86,16 @@
       if (labelDocumentCounts == null) {
         labelDocumentCounts = new HashMap<String, Double>();
 
-        DefaultStringifier<Map<String, Double>> mapStringifier = new DefaultStringifier<Map<String, Double>>(job,
-            GenericsUtil.getClass(labelDocumentCounts));
+        DefaultStringifier<Map<String, Double>> mapStringifier = new DefaultStringifier<Map<String, Double>>(
+            job, GenericsUtil.getClass(labelDocumentCounts));
 
-        String labelDocumentCountString = mapStringifier.toString(labelDocumentCounts);
-        labelDocumentCountString = job.get("cnaivebayes.labelDocumentCounts", labelDocumentCountString);
+        String labelDocumentCountString = mapStringifier
+            .toString(labelDocumentCounts);
+        labelDocumentCountString = job.get("cnaivebayes.labelDocumentCounts",
+            labelDocumentCountString);
 
-        labelDocumentCounts = mapStringifier.fromString(labelDocumentCountString);
+        labelDocumentCounts = mapStringifier
+            .fromString(labelDocumentCountString);
       }
     } catch (IOException ex) {
       log.warn(ex.toString(), ex);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfOutputFormat.java?rev=823911&r1=823910&r2=823911&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfOutputFormat.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/classifier/bayes/mapreduce/common/BayesTfIdfOutputFormat.java Sat Oct 10 18:17:30 2009
@@ -17,9 +17,7 @@
 
 package org.apache.mahout.classifier.bayes.mapreduce.common;
 
-
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -27,14 +25,16 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
 import org.apache.hadoop.util.Progressable;
+import org.apache.mahout.common.StringTuple;
 
 import java.io.IOException;
 
 /**
- * This class extends the MultipleOutputFormat, allowing to write the output data to different output files in sequence
- * file output format.
+ * This class extends the MultipleOutputFormat, allowing to write the output
+ * data to different output files in sequence file output format.
  */
-public class BayesTfIdfOutputFormat extends MultipleOutputFormat<WritableComparable<?>, Writable> {
+public class BayesTfIdfOutputFormat extends
+    MultipleOutputFormat<WritableComparable<?>, Writable> {
 
   private SequenceFileOutputFormat<WritableComparable<?>, Writable> theSequenceFileOutputFormat = null;
 
@@ -49,13 +49,15 @@
   }
 
   @Override
-  protected String generateFileNameForKeyValue(WritableComparable<?> k, Writable v, String name) {
-    Text key = (Text) k;
+  protected String generateFileNameForKeyValue(WritableComparable<?> k,
+      Writable v, String name) {
+    StringTuple key = (StringTuple) k;
 
-    if (key.toString().charAt(0) == '*') {
+    if (key.length() == 1
+        && key.stringAt(0).equals(BayesConstants.FEATURE_SET_SIZE))
       return "trainer-vocabCount/" + name;
-    }
-    return "trainer-tfIdf/" + name;
+    else
+      return "trainer-tfIdf/" + name;
   }
 
 }



Mime
View raw message