hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r792435 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/partition/ src/test/mapred/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/
Date Thu, 09 Jul 2009 07:53:59 GMT
Author: sharad
Date: Thu Jul  9 07:53:58 2009
New Revision: 792435

URL: http://svn.apache.org/viewvc?rev=792435&view=rev
Log:
MAPREDUCE-371. Change KeyFieldBasedComparator and KeyFieldBasedPartitioner to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldHelper.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=792435&r1=792434&r2=792435&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  9 07:53:58 2009
@@ -81,6 +81,9 @@
     MAPREDUCE-675. Sqoop should allow user-defined class and package names.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-371. Change KeyFieldBasedComparator and KeyFieldBasedPartitioner
+    to use new api. (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java?rev=792435&r1=792434&r2=792435&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedComparator.java Thu Jul  9 07:53:58 2009
@@ -18,14 +18,8 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.util.List;
-
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
-import org.apache.hadoop.io.Text;
 
 /**
  * This comparator implementation provides a subset of the features provided
@@ -41,288 +35,16 @@
  *  field). opts are ordering options (any of 'nr' as described above). 
  * We assume that the fields in the key are separated by 
  * map.output.key.field.separator.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator} 
+ * instead
  */
+@Deprecated
+public class KeyFieldBasedComparator<K, V> extends 
+    org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator<K, V>
+    implements JobConfigurable {
 
-public class KeyFieldBasedComparator<K, V> extends WritableComparator 
-implements JobConfigurable {
-  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
-  private static final byte NEGATIVE = (byte)'-';
-  private static final byte ZERO = (byte)'0';
-  private static final byte DECIMAL = (byte)'.';
-  
   public void configure(JobConf job) {
-    String option = job.getKeyFieldComparatorOption();
-    String keyFieldSeparator = job.get("map.output.key.field.separator","\t");
-    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
-    keyFieldHelper.parseOption(option);
-  }
-  
-  public KeyFieldBasedComparator() {
-    super(Text.class);
-  }
-    
-
-  public int compare(byte[] b1, int s1, int l1,
-      byte[] b2, int s2, int l2) {
-    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
-    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
-    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
-    if (allKeySpecs.size() == 0) {
-      return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
-    }
-    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(b1, s1+n1, s1+l1);
-    int []lengthIndicesSecond = keyFieldHelper.getWordLengths(b2, s2+n2, s2+l2);
-    for (KeyDescription keySpec : allKeySpecs) {
-      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1+n1, s1+l1, lengthIndicesFirst,
-          keySpec);
-      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1+n1, s1+l1, lengthIndicesFirst,
-          keySpec);
-      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2+n2, s2+l2, lengthIndicesSecond,
-          keySpec);
-      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2+n2, s2+l2, lengthIndicesSecond,
-          keySpec);
-      int result;
-      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
-          startCharSecond, endCharSecond, keySpec)) != 0) {
-        return result;
-      }
-    }
-    return 0;
-  }
-  
-  private int compareByteSequence(byte[] first, int start1, int end1, 
-      byte[] second, int start2, int end2, KeyDescription key) {
-    if (start1 == -1) {
-      if (key.reverse) {
-        return 1;
-      }
-      return -1;
-    }
-    if (start2 == -1) {
-      if (key.reverse) {
-        return -1; 
-      }
-      return 1;
-    }
-    int compareResult = 0;
-    if (!key.numeric) {
-      compareResult = compareBytes(first, start1, end1-start1+1, second, start2, end2-start2+1);
-    }
-    if (key.numeric) {
-      compareResult = numericalCompare (first, start1, end1, second, start2, end2);
-    }
-    if (key.reverse) {
-      return -compareResult;
-    }
-    return compareResult;
-  }
-  
-  private int numericalCompare (byte[] a, int start1, int end1, 
-      byte[] b, int start2, int end2) {
-    int i = start1;
-    int j = start2;
-    int mul = 1;
-    byte first_a = a[i];
-    byte first_b = b[j];
-    if (first_a == NEGATIVE) {
-      if (first_b != NEGATIVE) {
-        //check for cases like -0.0 and 0.0 (they should be declared equal)
-        return oneNegativeCompare(a,start1+1,end1,b,start2,end2);
-      }
-      i++;
-    }
-    if (first_b == NEGATIVE) {
-      if (first_a != NEGATIVE) {
-        //check for cases like 0.0 and -0.0 (they should be declared equal)
-        return -oneNegativeCompare(b,start2+1,end2,a,start1,end1);
-      }
-      j++;
-    }
-    if (first_b == NEGATIVE && first_a == NEGATIVE) {
-      mul = -1;
-    }
-
-    //skip over ZEROs
-    while (i <= end1) {
-      if (a[i] != ZERO) {
-        break;
-      }
-      i++;
-    }
-    while (j <= end2) {
-      if (b[j] != ZERO) {
-        break;
-      }
-      j++;
-    }
-    
-    //skip over equal characters and stopping at the first nondigit char
-    //The nondigit character could be '.'
-    while (i <= end1 && j <= end2) {
-      if (!isdigit(a[i]) || a[i] != b[j]) {
-        break;
-      }
-      i++; j++;
-    }
-    if (i <= end1) {
-      first_a = a[i];
-    }
-    if (j <= end2) {
-      first_b = b[j];
-    }
-    //store the result of the difference. This could be final result if the
-    //number of digits in the mantissa is the same in both the numbers 
-    int firstResult = first_a - first_b;
-    
-    //check whether we hit a decimal in the earlier scan
-    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
-            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
-      return ((mul < 0) ? -decimalCompare(a,i,end1,b,j,end2) : 
-        decimalCompare(a,i,end1,b,j,end2));
-    }
-    //check the number of digits in the mantissa of the numbers
-    int numRemainDigits_a = 0;
-    int numRemainDigits_b = 0;
-    while (i <= end1) {
-      //if we encounter a non-digit treat the corresponding number as being 
-      //smaller      
-      if (isdigit(a[i++])) {
-        numRemainDigits_a++;
-      } else break;
-    }
-    while (j <= end2) {
-      //if we encounter a non-digit treat the corresponding number as being 
-      //smaller
-      if (isdigit(b[j++])) {
-        numRemainDigits_b++;
-      } else break;
-    }
-    int ret = numRemainDigits_a - numRemainDigits_b;
-    if (ret == 0) { 
-      return ((mul < 0) ? -firstResult : firstResult);
-    } else {
-      return ((mul < 0) ? -ret : ret);
-    }
-  }
-  private boolean isdigit(byte b) {
-    if ('0' <= b && b <= '9') {
-      return true;
-    }
-    return false;
-  }
-  private int decimalCompare(byte[] a, int i, int end1, 
-                             byte[] b, int j, int end2) {
-    if (i > end1) {
-      //if a[] has nothing remaining
-      return -decimalCompare1(b, ++j, end2);
-    }
-    if (j > end2) {
-      //if b[] has nothing remaining
-      return decimalCompare1(a, ++i, end1);
-    }
-    if (a[i] == DECIMAL && b[j] == DECIMAL) {
-      while (i <= end1 && j <= end2) {
-        if (a[i] != b[j]) {
-          if (isdigit(a[i]) && isdigit(b[j])) {
-            return a[i] - b[j];
-          }
-          if (isdigit(a[i])) {
-            return 1;
-          }
-          if (isdigit(b[j])) {
-            return -1;
-          }
-          return 0;
-        }
-        i++; j++;
-      }
-      if (i > end1 && j > end2) {
-        return 0;
-      }
-        
-      if (i > end1) {
-        //check whether there is a non-ZERO digit after potentially
-        //a number of ZEROs (e.g., a=.4444, b=.444400004)
-        return -decimalCompare1(b, j, end2);
-      }
-      if (j > end2) {
-        //check whether there is a non-ZERO digit after potentially
-        //a number of ZEROs (e.g., b=.4444, a=.444400004)
-        return decimalCompare1(a, i, end1);
-      }
-    }
-    else if (a[i] == DECIMAL) {
-      return decimalCompare1(a, ++i, end1);
-    }
-    else if (b[j] == DECIMAL) {
-      return -decimalCompare1(b, ++j, end2);
-    }
-    return 0;
-  }
-  
-  private int decimalCompare1(byte[] a, int i, int end) {
-    while (i <= end) {
-      if (a[i] == ZERO) {
-        i++;
-        continue;
-      }
-      if (isdigit(a[i])) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-    return 0;
-  }
-  
-  private int oneNegativeCompare(byte[] a, int start1, int end1, 
-      byte[] b, int start2, int end2) {
-    //here a[] is negative and b[] is positive
-    //We have to ascertain whether the number contains any digits.
-    //If it does, then it is a smaller number for sure. If not,
-    //then we need to scan b[] to find out whether b[] has a digit
-    //If b[] does contain a digit, then b[] is certainly
-    //greater. If not, that is, both a[] and b[] don't contain
-    //digits then they should be considered equal.
-    if (!isZero(a, start1, end1)) {
-      return -1;
-    }
-    //reached here - this means that a[] is a ZERO
-    if (!isZero(b, start2, end2)) {
-      return -1;
-    }
-    //reached here - both numbers are basically ZEROs and hence
-    //they should compare equal
-    return 0;
-  }
-  
-  private boolean isZero(byte a[], int start, int end) {
-    //check for zeros in the significand part as well as the decimal part
-    //note that we treat the non-digit characters as ZERO
-    int i = start;
-    //we check the significand for being a ZERO
-    while (i <= end) {
-      if (a[i] != ZERO) {
-        if (a[i] != DECIMAL && isdigit(a[i])) {
-          return false;
-        }
-        break;
-      }
-      i++;
-    }
-
-    if (i != (end+1) && a[i++] == DECIMAL) {
-      //we check the decimal part for being a ZERO
-      while (i <= end) {
-        if (a[i] != ZERO) {
-          if (isdigit(a[i])) {
-            return false;
-          }
-          break;
-        }
-        i++;
-      }
-    }
-    return true;
+    super.setConf(job);
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java?rev=792435&r1=792434&r2=792435&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java Thu Jul  9 07:53:58 2009
@@ -18,14 +18,8 @@
 
 package org.apache.hadoop.mapred.lib;
 
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
 
  /**   
   *  Defines a way to partition keys based on certain key fields (also see
@@ -38,72 +32,16 @@
   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
   *  (the end of the field).
-  * 
+  *  @deprecated Use 
+  *  {@link org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedPartitioner} 
+  *  instead
   */
-public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
-
-  private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
-  private int numOfPartitionFields;
-  
-  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+@Deprecated
+public class KeyFieldBasedPartitioner<K2, V2> extends 
+  org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedPartitioner<K2, V2> 
+  implements Partitioner<K2, V2> {
 
   public void configure(JobConf job) {
-    String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
-    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
-    if (job.get("num.key.fields.for.partition") != null) {
-      LOG.warn("Using deprecated num.key.fields.for.partition. " +
-      		"Use mapred.text.key.partitioner.options instead");
-      this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
-      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
-    } else {
-      String option = job.getKeyFieldPartitionerOption();
-      keyFieldHelper.parseOption(option);
-    }
-  }
-
-  public int getPartition(K2 key, V2 value,
-      int numReduceTasks) {
-    byte[] keyBytes;
-
-    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
-    if (allKeySpecs.size() == 0) {
-      return (key.toString().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-    }
-
-    try {
-      keyBytes = key.toString().getBytes("UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException("The current system does not " +
-          "support UTF-8 encoding!", e);
-    }
-    // return 0 if the key is empty
-    if (keyBytes.length == 0) {
-      return 0;
-    }
-    
-    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
-        keyBytes.length);
-    int currentHash = 0;
-    for (KeyDescription keySpec : allKeySpecs) {
-      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length, 
-          lengthIndicesFirst, keySpec);
-       // no key found! continue
-      if (startChar < 0) {
-        continue;
-      }
-      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
-          lengthIndicesFirst, keySpec);
-      currentHash = hashCode(keyBytes, startChar, endChar, 
-          currentHash);
-    }
-    return (currentHash & Integer.MAX_VALUE) % numReduceTasks;
+    super.setConf(job);
   }
-  
-  protected int hashCode(byte[] b, int start, int end, int currentHash) {
-    for (int i = start; i <= end; i++) {
-      currentHash = 31*currentHash + b[i];
-    }
-    return currentHash;
-  }
-
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java?rev=792435&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java Thu Jul  9 07:53:58 2009
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
+
+
+/**
+ * This comparator implementation provides a subset of the features provided
+ * by the Unix/GNU Sort. In particular, the supported features are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (any of 'nr' as described above). 
+ * We assume that the fields in the key are separated by 
+ * map.output.key.field.separator.
+ */
+
+public class KeyFieldBasedComparator<K, V> extends WritableComparator 
+    implements Configurable {
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+  private static final byte NEGATIVE = (byte)'-';
+  private static final byte ZERO = (byte)'0';
+  private static final byte DECIMAL = (byte)'.';
+  private Configuration conf;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String option = conf.get("mapred.text.key.comparator.options");
+    String keyFieldSeparator = conf.get("map.output.key.field.separator","\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    keyFieldHelper.parseOption(option);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public KeyFieldBasedComparator() {
+    super(Text.class);
+  }
+    
+  public int compare(byte[] b1, int s1, int l1,
+      byte[] b2, int s2, int l2) {
+    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
+    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+
+    if (allKeySpecs.size() == 0) {
+      return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
+    }
+    
+    int []lengthIndicesFirst = 
+      keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1);
+    int []lengthIndicesSecond = 
+      keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2);
+    
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1,
+        lengthIndicesFirst, keySpec);
+      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1, 
+        lengthIndicesFirst, keySpec);
+      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2,
+        lengthIndicesSecond, keySpec);
+      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2, 
+        lengthIndicesSecond, keySpec);
+      int result;
+      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
+          startCharSecond, endCharSecond, keySpec)) != 0) {
+        return result;
+      }
+    }
+    return 0;
+  }
+  
+  private int compareByteSequence(byte[] first, int start1, int end1, 
+      byte[] second, int start2, int end2, KeyDescription key) {
+    if (start1 == -1) {
+      if (key.reverse) {
+        return 1;
+      }
+      return -1;
+    }
+    if (start2 == -1) {
+      if (key.reverse) {
+        return -1; 
+      }
+      return 1;
+    }
+    int compareResult = 0;
+    if (!key.numeric) {
+      compareResult = compareBytes(first, start1, end1-start1 + 1, second,
+        start2, end2 - start2 + 1);
+    }
+    if (key.numeric) {
+      compareResult = numericalCompare (first, start1, end1, second, start2,
+        end2);
+    }
+    if (key.reverse) {
+      return -compareResult;
+    }
+    return compareResult;
+  }
+  
+  private int numericalCompare (byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    int i = start1;
+    int j = start2;
+    int mul = 1;
+    byte first_a = a[i];
+    byte first_b = b[j];
+    if (first_a == NEGATIVE) {
+      if (first_b != NEGATIVE) {
+        //check for cases like -0.0 and 0.0 (they should be declared equal)
+        return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2);
+      }
+      i++;
+    }
+    if (first_b == NEGATIVE) {
+      if (first_a != NEGATIVE) {
+        //check for cases like 0.0 and -0.0 (they should be declared equal)
+        return -oneNegativeCompare(b, start2+1, end2, a, start1, end1);
+      }
+      j++;
+    }
+    if (first_b == NEGATIVE && first_a == NEGATIVE) {
+      mul = -1;
+    }
+
+    //skip over ZEROs
+    while (i <= end1) {
+      if (a[i] != ZERO) {
+        break;
+      }
+      i++;
+    }
+    while (j <= end2) {
+      if (b[j] != ZERO) {
+        break;
+      }
+      j++;
+    }
+    
+    //skip over equal characters and stopping at the first nondigit char
+    //The nondigit character could be '.'
+    while (i <= end1 && j <= end2) {
+      if (!isdigit(a[i]) || a[i] != b[j]) {
+        break;
+      }
+      i++; j++;
+    }
+    if (i <= end1) {
+      first_a = a[i];
+    }
+    if (j <= end2) {
+      first_b = b[j];
+    }
+    //store the result of the difference. This could be final result if the
+    //number of digits in the mantissa is the same in both the numbers 
+    int firstResult = first_a - first_b;
+    
+    //check whether we hit a decimal in the earlier scan
+    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
+            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
+      return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) : 
+        decimalCompare(a, i, end1, b, j, end2));
+    }
+    //check the number of digits in the mantissa of the numbers
+    int numRemainDigits_a = 0;
+    int numRemainDigits_b = 0;
+    while (i <= end1) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller      
+      if (isdigit(a[i++])) {
+        numRemainDigits_a++;
+      } else break;
+    }
+    while (j <= end2) {
+      //if we encounter a non-digit treat the corresponding number as being 
+      //smaller
+      if (isdigit(b[j++])) {
+        numRemainDigits_b++;
+      } else break;
+    }
+    int ret = numRemainDigits_a - numRemainDigits_b;
+    if (ret == 0) { 
+      return ((mul < 0) ? -firstResult : firstResult);
+    } else {
+      return ((mul < 0) ? -ret : ret);
+    }
+  }
+  private boolean isdigit(byte b) {
+    if ('0' <= b && b <= '9') {
+      return true;
+    }
+    return false;
+  }
+  private int decimalCompare(byte[] a, int i, int end1, 
+                             byte[] b, int j, int end2) {
+    if (i > end1) {
+      //if a[] has nothing remaining
+      return -decimalCompare1(b, ++j, end2);
+    }
+    if (j > end2) {
+      //if b[] has nothing remaining
+      return decimalCompare1(a, ++i, end1);
+    }
+    if (a[i] == DECIMAL && b[j] == DECIMAL) {
+      while (i <= end1 && j <= end2) {
+        if (a[i] != b[j]) {
+          if (isdigit(a[i]) && isdigit(b[j])) {
+            return a[i] - b[j];
+          }
+          if (isdigit(a[i])) {
+            return 1;
+          }
+          if (isdigit(b[j])) {
+            return -1;
+          }
+          return 0;
+        }
+        i++; j++;
+      }
+      if (i > end1 && j > end2) {
+        return 0;
+      }
+        
+      if (i > end1) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., a=.4444, b=.444400004)
+        return -decimalCompare1(b, j, end2);
+      }
+      if (j > end2) {
+        //check whether there is a non-ZERO digit after potentially
+        //a number of ZEROs (e.g., b=.4444, a=.444400004)
+        return decimalCompare1(a, i, end1);
+      }
+    }
+    else if (a[i] == DECIMAL) {
+      return decimalCompare1(a, ++i, end1);
+    }
+    else if (b[j] == DECIMAL) {
+      return -decimalCompare1(b, ++j, end2);
+    }
+    return 0;
+  }
+  
+  private int decimalCompare1(byte[] a, int i, int end) {
+    while (i <= end) {
+      if (a[i] == ZERO) {
+        i++;
+        continue;
+      }
+      if (isdigit(a[i])) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    return 0;
+  }
+  
+  private int oneNegativeCompare(byte[] a, int start1, int end1, 
+      byte[] b, int start2, int end2) {
+    //here a[] is negative and b[] is positive
+    //We have to ascertain whether the number contains any digits.
+    //If it does, then it is a smaller number for sure. If not,
+    //then we need to scan b[] to find out whether b[] has a digit
+    //If b[] does contain a digit, then b[] is certainly
+    //greater. If not, that is, both a[] and b[] don't contain
+    //digits then they should be considered equal.
+    if (!isZero(a, start1, end1)) {
+      return -1;
+    }
+    //reached here - this means that a[] is a ZERO
+    if (!isZero(b, start2, end2)) {
+      return -1;
+    }
+    //reached here - both numbers are basically ZEROs and hence
+    //they should compare equal
+    return 0;
+  }
+  
+  private boolean isZero(byte a[], int start, int end) {
+    //check for zeros in the significand part as well as the decimal part
+    //note that we treat the non-digit characters as ZERO
+    int i = start;
+    //we check the significand for being a ZERO
+    while (i <= end) {
+      if (a[i] != ZERO) {
+        if (a[i] != DECIMAL && isdigit(a[i])) {
+          return false;
+        }
+        break;
+      }
+      i++;
+    }
+
+    if (i != (end+1) && a[i++] == DECIMAL) {
+      //we check the decimal part for being a ZERO
+      while (i <= end) {
+        if (a[i] != ZERO) {
+          if (isdigit(a[i])) {
+            return false;
+          }
+          break;
+        }
+        i++;
+      }
+    }
+    return true;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java?rev=792435&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java Thu Jul  9 07:53:58 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
+
+ /**   
+  *  Defines a way to partition keys based on certain key fields (also see
+  *  {@link KeyFieldBasedComparator}.
+  *  The key specification supported is of the form -k pos1[,pos2], where,
+  *  pos is of the form f[.c][opts], where f is the number
+  *  of the key field to use, and c is the number of the first character from
+  *  the beginning of the field. Fields and character posns are numbered 
+  *  starting with 1; a character position of zero in pos2 indicates the
+  *  field's last character. If '.c' is omitted from pos1, it defaults to 1
+  *  (the beginning of the field); if omitted from pos2, it defaults to 0 
+  *  (the end of the field).
+  * 
+  */
+public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2> 
+    implements Configurable {
+
+  private static final Log LOG = LogFactory.getLog(
+                                   KeyFieldBasedPartitioner.class.getName());
+  private int numOfPartitionFields;
+  
+  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+  
+  private Configuration conf;
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String keyFieldSeparator = 
+      conf.get("map.output.key.field.separator", "\t");
+    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
+    if (conf.get("num.key.fields.for.partition") != null) {
+      LOG.warn("Using deprecated num.key.fields.for.partition. " +
+      		"Use mapred.text.key.partitioner.options instead");
+      this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
+      keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
+    } else {
+      String option = conf.get("mapred.text.key.partitioner.options");
+      keyFieldHelper.parseOption(option);
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public int getPartition(K2 key, V2 value, int numReduceTasks) {
+    byte[] keyBytes;
+
+    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
+    if (allKeySpecs.size() == 0) {
+      return (key.toString().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+    }
+
+    try {
+      keyBytes = key.toString().getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }
+    // return 0 if the key is empty
+    if (keyBytes.length == 0) {
+      return 0;
+    }
+    
+    int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0, 
+        keyBytes.length);
+    int currentHash = 0;
+    for (KeyDescription keySpec : allKeySpecs) {
+      int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, 
+        keyBytes.length, lengthIndicesFirst, keySpec);
+       // no key found! continue
+      if (startChar < 0) {
+        continue;
+      }
+      int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length, 
+          lengthIndicesFirst, keySpec);
+      currentHash = hashCode(keyBytes, startChar, endChar, 
+          currentHash);
+    }
+    return (currentHash & Integer.MAX_VALUE) % numReduceTasks;
+  }
+  
+  protected int hashCode(byte[] b, int start, int end, int currentHash) {
+    for (int i = start; i <= end; i++) {
+      currentHash = 31*currentHash + b[i];
+    }
+    return currentHash;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java?rev=792435&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldHelper.java Thu Jul  9 07:53:58 2009
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
+
+/**
+ * This is used in {@link KeyFieldBasedComparator} & 
+ * {@link KeyFieldBasedPartitioner}. Defines all the methods
+ * for parsing key specifications. The key specification is of the form:
+ * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
+ *  of the field to use, and c is the number of the first character from the
+ *  beginning of the field. Fields and character posns are numbered starting
+ *  with 1; a character position of zero in pos2 indicates the field's last
+ *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
+ *  of the field); if omitted from pos2, it defaults to 0 (the end of the
+ *  field). opts are ordering options (supported options are 'nr'). 
+ */
+
+class KeyFieldHelper {
+  
+  protected static class KeyDescription {
+    int beginFieldIdx = 1;
+    int beginChar = 1;
+    int endFieldIdx = 0;
+    int endChar = 0;
+    boolean numeric;
+    boolean reverse;
+  }
+  
+  private List<KeyDescription> allKeySpecs = new ArrayList<KeyDescription>();
+  private byte[] keyFieldSeparator;
+  private boolean keySpecSeen = false;
+  
+  public void setKeyFieldSeparator(String keyFieldSeparator) {
+    try {
+      this.keyFieldSeparator =
+        keyFieldSeparator.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("The current system does not " +
+          "support UTF-8 encoding!", e);
+    }    
+  }
+  
+  /** Required for backcompatibility with num.key.fields.for.partition in
+   * {@link KeyFieldBasedPartitioner} */
+  public void setKeyFieldSpec(int start, int end) {
+    if (end >= start) {
+      KeyDescription k = new KeyDescription();
+      k.beginFieldIdx = start;
+      k.endFieldIdx = end;
+      keySpecSeen = true;
+      allKeySpecs.add(k);
+    }
+  }
+  
+  public List<KeyDescription> keySpecs() {
+    return allKeySpecs;
+  }
+    
+  public int[] getWordLengths(byte []b, int start, int end) {
+    //Given a string like "hello how are you", it returns an array
+    //like [4 5, 3, 3, 3], where the first element is the number of
+	//fields
+    if (!keySpecSeen) {
+      //if there were no key specs, then the whole key is one word
+      return new int[] {1};
+    }
+    int[] lengths = new int[10];
+    int currLenLengths = lengths.length;
+    int idx = 1;
+    int pos;
+    while ((pos = UTF8ByteArrayUtils.findBytes(b, start, end, 
+        keyFieldSeparator)) != -1) {
+      if (++idx == currLenLengths) {
+        int[] temp = lengths;
+        lengths = new int[(currLenLengths = currLenLengths*2)];
+        System.arraycopy(temp, 0, lengths, 0, temp.length);
+      }
+      lengths[idx - 1] = pos - start;
+      start = pos + 1;
+    }
+    
+    if (start != end) {
+      lengths[idx] = end - start;
+    }
+    lengths[0] = idx; //number of words is the first element
+    return lengths;
+  }
+  public int getStartOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2.5,2 is the keyspec, the startChar is lengthIndices[1] + 5
+    //note that the [0]'th element is the number of fields in the key
+    if (lengthIndices[0] >= k.beginFieldIdx) {
+      int position = 0;
+      for (int i = 1; i < k.beginFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length; 
+      }
+      if (position + k.beginChar <= (end - start)) {
+        return start + position + k.beginChar - 1; 
+      }
+    }
+    return -1;
+  }
+  public int getEndOffset(byte[]b, int start, int end, 
+      int []lengthIndices, KeyDescription k) {
+    //if -k2,2.8 is the keyspec, the endChar is lengthIndices[1] + 8
+    //note that the [0]'th element is the number of fields in the key
+    if (k.endFieldIdx == 0) {
+      //there is no end field specified for this keyspec. So the remaining
+      //part of the key is considered in its entirety.
+      return end; 
+    }
+    if (lengthIndices[0] >= k.endFieldIdx) {
+      int position = 0;
+      int i;
+      for (i = 1; i < k.endFieldIdx; i++) {
+        position += lengthIndices[i] + keyFieldSeparator.length;
+      }
+      if (k.endChar == 0) { 
+        position += lengthIndices[i];
+      }
+      if (position + k.endChar <= (end - start)) {
+        return start + position + k.endChar - 1;
+      }
+      return end;
+    }
+    return end;
+  }
+  public void parseOption(String option) {
+    if (option == null || option.equals("")) {
+      //we will have only default comparison
+      return;
+    }
+    StringTokenizer args = new StringTokenizer(option);
+    KeyDescription global = new KeyDescription();
+    while (args.hasMoreTokens()) {
+      String arg = args.nextToken();
+      if (arg.equals("-n")) {  
+        global.numeric = true;
+      }
+      if (arg.equals("-r")) {
+        global.reverse = true;
+      }
+      if (arg.equals("-nr")) {
+        global.numeric = true;
+        global.reverse = true;
+      }
+      if (arg.startsWith("-k")) {
+        KeyDescription k = parseKey(arg, args);
+        if (k != null) {
+          allKeySpecs.add(k);
+          keySpecSeen = true;
+        }
+      }
+    }
+    for (KeyDescription key : allKeySpecs) {
+      if (!(key.reverse | key.numeric)) {
+        key.reverse = global.reverse;
+        key.numeric = global.numeric;
+      }
+    }
+    if (allKeySpecs.size() == 0) {
+      allKeySpecs.add(global);
+    }
+  }
+  
+  private KeyDescription parseKey(String arg, StringTokenizer args) {
+    //we allow for -k<arg> and -k <arg>
+    String keyArgs = null;
+    if (arg.length() == 2) {
+      if (args.hasMoreTokens()) {
+        keyArgs = args.nextToken();
+      }
+    } else {
+      keyArgs = arg.substring(2);
+    }
+    if (keyArgs == null || keyArgs.length() == 0) {
+      return null;
+    }
+    StringTokenizer st = new StringTokenizer(keyArgs,"nr.,",true);
+       
+    KeyDescription key = new KeyDescription();
+    
+    String token;
+    //the key is of the form 1[.3][nr][,1.5][nr]
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      //the first token must be a number
+      key.beginFieldIdx = Integer.parseInt(token);
+    }
+    if (st.hasMoreTokens()) {
+      token = st.nextToken();
+      if (token.equals(".")) {
+        token = st.nextToken();
+        key.beginChar = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } 
+      do {
+        if (token.equals("n")) {
+          key.numeric = true;
+        }
+        else if (token.equals("r")) {
+          key.reverse = true;
+        }
+        else break;
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+        } else {
+          return key;
+        }
+      } while (true);
+      if (token.equals(",")) {
+        token = st.nextToken();
+        //the first token must be a number
+        key.endFieldIdx = Integer.parseInt(token);
+        if (st.hasMoreTokens()) {
+          token = st.nextToken();
+          if (token.equals(".")) {
+            token = st.nextToken();
+            key.endChar = Integer.parseInt(token);
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              return key;
+            }
+          }
+          do {
+            if (token.equals("n")) {
+              key.numeric = true;
+            }
+            else if (token.equals("r")) {
+              key.reverse = true;
+            }
+            else { 
+              throw new IllegalArgumentException("Invalid -k argument. " +
+               "Must be of the form -k pos1,[pos2], where pos is of the form " +
+               "f[.c]nr");
+            }
+            if (st.hasMoreTokens()) {
+              token = st.nextToken();
+            } else {
+              break;
+            }
+          } while (true);
+        }
+        return key;
+      }
+      throw new IllegalArgumentException("Invalid -k argument. " +
+          "Must be of the form -k pos1,[pos2], where pos is of the form " +
+          "f[.c]nr");
+    }
+    return key;
+  }
+  private void printKey(KeyDescription key) {
+    System.out.println("key.beginFieldIdx: " + key.beginFieldIdx);
+    System.out.println("key.beginChar: " + key.beginChar);
+    System.out.println("key.endFieldIdx: " + key.endFieldIdx);
+    System.out.println("key.endChar: " + key.endChar);
+    System.out.println("key.numeric: " + key.numeric);
+    System.out.println("key.reverse: " + key.reverse);
+    System.out.println("parseKey over");
+  }  
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=792435&r1=792434&r2=792435&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Thu Jul  9 07:53:58 2009
@@ -231,6 +231,13 @@
 
   public static Job createJob(Configuration conf, Path inDir, Path outDir, 
       int numInputFiles, int numReds) throws IOException {
+    String input = "The quick brown fox\n" + "has many silly\n"
+      + "red fox sox\n";
+    return createJob(conf, inDir, outDir, numInputFiles, numReds, input);
+  }
+
+  public static Job createJob(Configuration conf, Path inDir, Path outDir, 
+      int numInputFiles, int numReds, String input) throws IOException {
     Job job = new Job(conf);
     FileSystem fs = FileSystem.get(conf);
     if (fs.exists(outDir)) {
@@ -240,8 +247,6 @@
       fs.delete(inDir, true);
     }
     fs.mkdirs(inDir);
-    String input = "The quick brown fox\n" + "has many silly\n"
-      + "red fox sox\n";
     for (int i = 0; i < numInputFiles; ++i) {
       DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
       file.writeBytes(input);

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java?rev=792435&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedComparator.java Thu Jul  9 07:53:58 2009
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
+
+
+public class TestMRKeyFieldBasedComparator extends HadoopTestCase {
+  Configuration conf;
+  
+  String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.0 4444.1 011 011 234";
+  String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.1 4444";
+
+  public TestMRKeyFieldBasedComparator() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+    conf = createJobConf();
+    conf.set("map.output.key.field.separator", " ");
+  }
+  
+  private void testComparator(String keySpec, int expect) 
+      throws Exception {
+    String root = System.getProperty("test.build.data", "/tmp");
+    Path inDir = new Path(root, "test_cmp/in");
+    Path outDir = new Path(root, "test_cmp/out");
+    
+    conf.set("mapred.text.key.comparator.options", keySpec);
+    conf.set("mapred.text.key.partitioner.options", "-k1.1,1.1");
+    conf.set("map.output.key.field.separator", " ");
+
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 1, 2,
+                line1 +"\n" + line2 + "\n"); 
+    job.setMapperClass(InverseMapper.class);
+    job.setReducerClass(Reducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setSortComparatorClass(KeyFieldBasedComparator.class);
+    job.setPartitionerClass(KeyFieldBasedPartitioner.class);
+
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+
+    // validate output
+    Path[] outputFiles = FileUtil.stat2Paths(getFileSystem().listStatus(outDir,
+        new OutputLogFilter()));
+    if (outputFiles.length > 0) {
+      InputStream is = getFileSystem().open(outputFiles[0]);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+      String line = reader.readLine();
+      //make sure we get what we expect as the first line, and also
+      //that we have two lines (both the lines must end up in the same
+      //reducer since the partitioner takes the same key spec for all
+      //lines
+      if (expect == 1) {
+        assertTrue(line.startsWith(line1));
+      } else if (expect == 2) {
+        assertTrue(line.startsWith(line2));
+      }
+      line = reader.readLine();
+      if (expect == 1) {
+        assertTrue(line.startsWith(line2));
+      } else if (expect == 2) {
+        assertTrue(line.startsWith(line1));
+      }
+      reader.close();
+    }
+  }
+  
+  public void testBasicUnixComparator() throws Exception {
+    testComparator("-k1,1n", 1);
+    testComparator("-k2,2n", 1);
+    testComparator("-k2.2,2n", 2);
+    testComparator("-k3.4,3n", 2);
+    testComparator("-k3.2,3.3n -k4,4n", 2);
+    testComparator("-k3.2,3.3n -k4,4nr", 1);
+    testComparator("-k2.4,2.4n", 2);
+    testComparator("-k7,7", 1);
+    testComparator("-k7,7n", 2);
+    testComparator("-k8,8n", 1);
+    testComparator("-k9,9", 2);
+    testComparator("-k11,11",2);
+    testComparator("-k10,10",2);
+    
+    testWithoutMRJob("-k9,9", 1);
+  }
+  
+  byte[] line1_bytes = line1.getBytes();
+  byte[] line2_bytes = line2.getBytes();
+
+  public void testWithoutMRJob(String keySpec, int expect) throws Exception {
+    KeyFieldBasedComparator<Void, Void> keyFieldCmp = 
+      new KeyFieldBasedComparator<Void, Void>();
+    conf.set("mapred.text.key.comparator.options", keySpec);
+    keyFieldCmp.setConf(conf);
+    int result = keyFieldCmp.compare(line1_bytes, 0, line1_bytes.length,
+        line2_bytes, 0, line2_bytes.length);
+    if ((expect >= 0 && result < 0) || (expect < 0 && result >= 0))
+      fail();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java?rev=792435&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/partition/TestMRKeyFieldBasedPartitioner.java Thu Jul  9 07:53:58 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.partition;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+public class TestMRKeyFieldBasedPartitioner extends TestCase {
+
+  /**
+   * Test is key-field-based partitioned works with empty key.
+   */
+  public void testEmptyKey() throws Exception {
+    KeyFieldBasedPartitioner<Text, Text> kfbp = 
+      new KeyFieldBasedPartitioner<Text, Text>();
+    Configuration conf = new Configuration();
+    conf.setInt("num.key.fields.for.partition", 10);
+    kfbp.setConf(conf);
+    assertEquals("Empty key should map to 0th partition", 
+                 0, kfbp.getPartition(new Text(), new Text(), 10));
+  }
+}



Mime
View raw message