hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r807096 [2/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/mapred/join/ src/java/org/apache/hadoop/mapreduce/lib/join/ src/test/ src/test/mapred/org/apache/hadoop/mapreduce/ src/test/m...
Date Mon, 24 Aug 2009 06:19:23 GMT
Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Mon Aug 24 06:19:21 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.CharArrayReader;
 import java.io.IOException;
@@ -32,13 +32,16 @@
 import java.util.Map;
 import java.util.Stack;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -76,12 +79,15 @@
     }
 
     public TType getType() { return type; }
+    
     public Node getNode() throws IOException {
       throw new IOException("Expected nodetype");
     }
+    
     public double getNum() throws IOException {
       throw new IOException("Expected numtype");
     }
+    
     public String getStr() throws IOException {
       throw new IOException("Expected strtype");
     }
@@ -165,7 +171,8 @@
     }
   }
 
-  public abstract static class Node implements ComposableInputFormat {
+@SuppressWarnings("unchecked")
+public abstract static class Node extends ComposableInputFormat {
     /**
      * Return the node type registered for the particular identifier.
      * By default, this is a CNode for any composite node and a WNode
@@ -181,11 +188,11 @@
         }
         return nodeCstrMap.get(ident).newInstance(ident);
       } catch (IllegalAccessException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InstantiationException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InvocationTargetException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       }
     }
 
@@ -193,8 +200,8 @@
     private static final
         Map<String,Constructor<? extends Node>> nodeCstrMap =
         new HashMap<String,Constructor<? extends Node>>();
-    protected static final
-        Map<String,Constructor<? extends ComposableRecordReader>> rrCstrMap =
+    protected static final Map<String,Constructor<? extends 
+        ComposableRecordReader>> rrCstrMap =
         new HashMap<String,Constructor<? extends ComposableRecordReader>>();
 
     /**
@@ -231,10 +238,12 @@
       this.id = id;
     }
 
-    protected void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
+    protected void setKeyComparator(
+        Class<? extends WritableComparator> cmpcl) {
       this.cmpcl = cmpcl;
     }
-    abstract void parse(List<Token> args, JobConf job) throws IOException;
+    abstract void parse(List<Token> args, Configuration conf) 
+        throws IOException;
   }
 
   /**
@@ -244,14 +253,15 @@
     private static final Class<?>[] cstrSig =
       { Integer.TYPE, RecordReader.class, Class.class };
 
-    static void addIdentifier(String ident,
+    @SuppressWarnings("unchecked")
+	static void addIdentifier(String ident,
                               Class<? extends ComposableRecordReader> cl)
         throws NoSuchMethodException {
       Node.addIdentifier(ident, cstrSig, WNode.class, cl);
     }
 
     private String indir;
-    private InputFormat inf;
+    private InputFormat<?, ?> inf;
 
     public WNode(String ident) {
       super(ident);
@@ -261,20 +271,20 @@
      * Let the first actual define the InputFormat and the second define
      * the <tt>mapred.input.dir</tt> property.
      */
-    public void parse(List<Token> ll, JobConf job) throws IOException {
+    @Override
+    public void parse(List<Token> ll, Configuration conf) throws IOException {
       StringBuilder sb = new StringBuilder();
       Iterator<Token> i = ll.iterator();
       while (i.hasNext()) {
         Token t = i.next();
         if (TType.COMMA.equals(t.getType())) {
           try {
-          	inf = (InputFormat)ReflectionUtils.newInstance(
-          			job.getClassByName(sb.toString()),
-                job);
+          	inf = (InputFormat<?, ?>)ReflectionUtils.newInstance(
+          			conf.getClassByName(sb.toString()), conf);
           } catch (ClassNotFoundException e) {
-            throw (IOException)new IOException().initCause(e);
+            throw new IOException(e);
           } catch (IllegalArgumentException e) {
-            throw (IOException)new IOException().initCause(e);
+            throw new IOException(e);
           }
           break;
         }
@@ -291,32 +301,36 @@
       // no check for ll.isEmpty() to permit extension
     }
 
-    private JobConf getConf(JobConf job) {
-      JobConf conf = new JobConf(job);
-      FileInputFormat.setInputPaths(conf, indir);
-      conf.setClassLoader(job.getClassLoader());
-      return conf;
+    private Configuration getConf(Configuration jconf) throws IOException {
+      Job job = new Job(jconf);
+      FileInputFormat.setInputPaths(job, indir);
+      return job.getConfiguration();
+    }
+    
+    public List<InputSplit> getSplits(JobContext context)
+        throws IOException, InterruptedException {
+      return inf.getSplits(new JobContext(
+        getConf(context.getConfiguration()), context.getJobID()));
     }
 
-    public InputSplit[] getSplits(JobConf job, int numSplits)
-        throws IOException {
-      return inf.getSplits(getConf(job), numSplits);
-    }
-
-    public ComposableRecordReader getRecordReader(
-        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    public ComposableRecordReader<?, ?> createRecordReader(InputSplit split, 
+        TaskAttemptContext taskContext) 
+        throws IOException, InterruptedException {
       try {
         if (!rrCstrMap.containsKey(ident)) {
           throw new IOException("No RecordReader for " + ident);
         }
+        Configuration conf = getConf(taskContext.getConfiguration());
+        TaskAttemptContext context = new TaskAttemptContext(conf, 
+          TaskAttemptID.forName(conf.get("mapred.task.id")));
         return rrCstrMap.get(ident).newInstance(id,
-            inf.getRecordReader(split, getConf(job), reporter), cmpcl);
+            inf.createRecordReader(split, context), cmpcl);
       } catch (IllegalAccessException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InstantiationException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InvocationTargetException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       }
     }
 
@@ -331,9 +345,10 @@
   static class CNode extends Node {
 
     private static final Class<?>[] cstrSig =
-      { Integer.TYPE, JobConf.class, Integer.TYPE, Class.class };
+      { Integer.TYPE, Configuration.class, Integer.TYPE, Class.class };
 
-    static void addIdentifier(String ident,
+    @SuppressWarnings("unchecked")
+	static void addIdentifier(String ident,
                               Class<? extends ComposableRecordReader> cl)
         throws NoSuchMethodException {
       Node.addIdentifier(ident, cstrSig, CNode.class, cl);
@@ -346,6 +361,7 @@
       super(ident);
     }
 
+    @Override
     public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
       super.setKeyComparator(cmpcl);
       for (Node n : kids) {
@@ -357,34 +373,38 @@
      * Combine InputSplits from child InputFormats into a
      * {@link CompositeInputSplit}.
      */
-    public InputSplit[] getSplits(JobConf job, int numSplits)
-        throws IOException {
-      InputSplit[][] splits = new InputSplit[kids.size()][];
+    @SuppressWarnings("unchecked")
+	public List<InputSplit> getSplits(JobContext job)
+        throws IOException, InterruptedException {
+      List<List<InputSplit>> splits = 
+        new ArrayList<List<InputSplit>>(kids.size());
       for (int i = 0; i < kids.size(); ++i) {
-        final InputSplit[] tmp = kids.get(i).getSplits(job, numSplits);
+        List<InputSplit> tmp = kids.get(i).getSplits(job);
         if (null == tmp) {
           throw new IOException("Error gathering splits from child RReader");
         }
-        if (i > 0 && splits[i-1].length != tmp.length) {
+        if (i > 0 && splits.get(i-1).size() != tmp.size()) {
           throw new IOException("Inconsistent split cardinality from child " +
-              i + " (" + splits[i-1].length + "/" + tmp.length + ")");
+              i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")");
         }
-        splits[i] = tmp;
+        splits.add(i, tmp);
       }
-      final int size = splits[0].length;
-      CompositeInputSplit[] ret = new CompositeInputSplit[size];
+      final int size = splits.get(0).size();
+      List<InputSplit> ret = new ArrayList<InputSplit>();
       for (int i = 0; i < size; ++i) {
-        ret[i] = new CompositeInputSplit(splits.length);
-        for (int j = 0; j < splits.length; ++j) {
-          ret[i].add(splits[j][i]);
+        CompositeInputSplit split = new CompositeInputSplit(splits.size());
+        for (int j = 0; j < splits.size(); ++j) {
+          split.add(splits.get(j).get(i));
         }
+        ret.add(split);
       }
       return ret;
     }
 
     @SuppressWarnings("unchecked") // child types unknowable
-    public ComposableRecordReader getRecordReader(
-        InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    public ComposableRecordReader 
+        createRecordReader(InputSplit split, TaskAttemptContext taskContext) 
+        throws IOException, InterruptedException {
       if (!(split instanceof CompositeInputSplit)) {
         throw new IOException("Invalid split type:" +
                               split.getClass().getName());
@@ -396,17 +416,17 @@
         if (!rrCstrMap.containsKey(ident)) {
           throw new IOException("No RecordReader for " + ident);
         }
-        ret = (CompositeRecordReader)
-          rrCstrMap.get(ident).newInstance(id, job, capacity, cmpcl);
+        ret = (CompositeRecordReader)rrCstrMap.get(ident).
+          newInstance(id, taskContext.getConfiguration(), capacity, cmpcl);
       } catch (IllegalAccessException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InstantiationException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       } catch (InvocationTargetException e) {
-        throw (IOException)new IOException().initCause(e);
+        throw new IOException(e);
       }
       for (int i = 0; i < capacity; ++i) {
-        ret.add(kids.get(i).getRecordReader(spl.get(i), job, reporter));
+        ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext));
       }
       return (ComposableRecordReader)ret;
     }
@@ -414,7 +434,8 @@
     /**
      * Parse a list of comma-separated nodes.
      */
-    public void parse(List<Token> args, JobConf job) throws IOException {
+    public void parse(List<Token> args, Configuration conf) 
+        throws IOException {
       ListIterator<Token> i = args.listIterator();
       while (i.hasNext()) {
         Token t = i.next();
@@ -437,7 +458,8 @@
     }
   }
 
-  private static Token reduce(Stack<Token> st, JobConf job) throws IOException {
+  private static Token reduce(Stack<Token> st, Configuration conf) 
+      throws IOException {
     LinkedList<Token> args = new LinkedList<Token>();
     while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
       args.addFirst(st.pop());
@@ -450,7 +472,7 @@
       throw new IOException("Identifier expected");
     }
     Node n = Node.forIdent(st.pop().getStr());
-    n.parse(args, job);
+    n.parse(args, conf);
     return new NodeToken(n);
   }
 
@@ -458,18 +480,18 @@
    * Given an expression and an optional comparator, build a tree of
    * InputFormats using the comparator to sort keys.
    */
-  static Node parse(String expr, JobConf job) throws IOException {
+  static Node parse(String expr, Configuration conf) throws IOException {
     if (null == expr) {
       throw new IOException("Expression is null");
     }
-    Class<? extends WritableComparator> cmpcl =
-      job.getClass("mapred.join.keycomparator", null, WritableComparator.class);
+    Class<? extends WritableComparator> cmpcl = conf.getClass(
+      CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class);
     Lexer lex = new Lexer(expr);
     Stack<Token> st = new Stack<Token>();
     Token tok;
     while ((tok = lex.next()) != null) {
       if (TType.RPAREN.equals(tok.getType())) {
-        st.push(reduce(st, job));
+        st.push(reduce(st, conf));
       } else {
         st.push(tok);
       }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/ResetableIterator.java Mon Aug 24 06:19:21 2009
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/StreamBackedIterator.java Mon Aug 24 06:19:21 2009
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/TupleWritable.java Mon Aug 24 06:19:21 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.DataOutput;
 import java.io.DataInput;
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -43,7 +44,7 @@
  */
 public class TupleWritable implements Writable, Iterable<Writable> {
 
-  private BitSet written;
+  protected BitSet written;
   private Writable[] values;
 
   /**
@@ -129,7 +130,8 @@
       }
       public void remove() {
         if (!written.get(bitIndex)) {
-          throw new IllegalStateException("Attempt to remove non-existent val");
+          throw new IllegalStateException(
+            "Attempt to remove non-existent val");
         }
         written.clear(bitIndex);
       }
@@ -188,17 +190,21 @@
         cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
       }
       for (int i = 0; i < card; ++i) {
+        if (cls[i].equals(NullWritable.class)) {
+          values[i] = NullWritable.get();
+        } else {
           values[i] = cls[i].newInstance();
+        }
         if (has(i)) {
           values[i].readFields(in);
         }
       }
     } catch (ClassNotFoundException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
+      throw new IOException("Failed tuple init", e);
     } catch (IllegalAccessException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
+      throw new IOException("Failed tuple init", e);
     } catch (InstantiationException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
+      throw new IOException("Failed tuple init", e);
     }
   }
 
@@ -226,13 +232,13 @@
   }
 
   /**
-   * Writes the bit set to the stream. The first 64 bit-positions of the bit set
-   * are written as a VLong for backwards-compatibility with older versions of
-   * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
-   * bit-positions.
+   * Writes the bit set to the stream. The first 64 bit-positions of the bit
+   * set are written as a VLong for backwards-compatibility with older 
+   * versions of TupleWritable. All bit-positions >= 64 are encoded as a byte
+   * for every 8 bit-positions.
    */
-  private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
-      throws IOException {
+  private static final void writeBitSet(DataOutput stream, int nbits,
+      BitSet bitSet) throws IOException {
     long bits = 0L;
         
     int bitSetIndex = bitSet.nextSetBit(0);

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java Mon Aug 24 06:19:21 2009
@@ -16,58 +16,102 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Proxy class for a RecordReader participating in the join framework.
+ * 
  * This class keeps track of the &quot;head&quot; key-value pair for the
  * provided RecordReader and keeps a store of values matching a key when
  * this source is participating in a join.
  */
-public class WrappedRecordReader<K extends WritableComparable,
-                          U extends Writable>
-    implements ComposableRecordReader<K,U> {
+public class WrappedRecordReader<K extends WritableComparable<?>,
+    U extends Writable> extends ComposableRecordReader<K,U> {
 
-  private boolean empty = false;
+  protected boolean empty = false;
   private RecordReader<K,U> rr;
   private int id;  // index at which values will be inserted in collector
 
-  private K khead; // key at the top of this RR
-  private U vhead; // value assoc with khead
-  private WritableComparator cmp;
-
+  protected WritableComparator cmp = null;
+  private K key; // key at the top of this RR
+  private U value; // value assoc with key
   private ResetableIterator<U> vjoin;
-
+  private Configuration conf = new Configuration();
+  @SuppressWarnings("unchecked")
+  private Class<? extends WritableComparable> keyclass = null; 
+  private Class<? extends Writable> valueclass = null; 
+  
+  protected WrappedRecordReader(int id) {
+    this.id = id;
+    vjoin = new StreamBackedIterator<U>();
+  }
+  
   /**
    * For a given RecordReader rr, occupy position id in collector.
    */
   WrappedRecordReader(int id, RecordReader<K,U> rr,
-      Class<? extends WritableComparator> cmpcl) throws IOException {
+      Class<? extends WritableComparator> cmpcl) 
+  throws IOException, InterruptedException {
     this.id = id;
     this.rr = rr;
-    khead = rr.createKey();
-    vhead = rr.createValue();
-    try {
-      cmp = (null == cmpcl)
-        ? WritableComparator.get(khead.getClass())
-        : cmpcl.newInstance();
-    } catch (InstantiationException e) {
-      throw (IOException)new IOException().initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException)new IOException().initCause(e);
+    if (cmpcl != null) {
+      try {
+        this.cmp = cmpcl.newInstance();
+      } catch (InstantiationException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      }
     }
     vjoin = new StreamBackedIterator<U>();
-    next();
   }
 
+  public void initialize(InputSplit split,
+                         TaskAttemptContext context)
+  throws IOException, InterruptedException {
+    rr.initialize(split, context);
+    conf = context.getConfiguration();
+    nextKeyValue();
+    if (!empty) {
+      keyclass = key.getClass().asSubclass(WritableComparable.class);
+      valueclass = value.getClass();
+      if (cmp == null) {
+        cmp = WritableComparator.get(keyclass);
+      }
+    }
+  }
+
+  /**
+   * Request new key from proxied RR.
+   */
+  @SuppressWarnings("unchecked")
+  public K createKey() {
+    if (keyclass != null) {
+      return (K) ReflectionUtils.newInstance(keyclass, conf);
+    }
+    return (K) NullWritable.get();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public U createValue() {
+    if (valueclass != null) {
+      return (U) ReflectionUtils.newInstance(valueclass, conf);
+    }
+    return (U) NullWritable.get();
+  }
+  
   /** {@inheritDoc} */
   public int id() {
     return id;
@@ -77,14 +121,14 @@
    * Return the key at the head of this RR.
    */
   public K key() {
-    return khead;
+    return key;
   }
 
   /**
    * Clone the key at the head of this RR into the object supplied.
    */
   public void key(K qkey) throws IOException {
-    WritableUtils.cloneInto(qkey, khead);
+    ReflectionUtils.copy(conf, key, qkey);
   }
 
   /**
@@ -98,48 +142,36 @@
   /**
    * Skip key-value pairs with keys less than or equal to the key provided.
    */
-  public void skip(K key) throws IOException {
+  public void skip(K key) throws IOException, InterruptedException {
     if (hasNext()) {
-      while (cmp.compare(khead, key) <= 0 && next());
+      while (cmp.compare(key(), key) <= 0 && next());
     }
   }
 
   /**
-   * Read the next k,v pair into the head of this object; return true iff
-   * the RR and this are exhausted.
-   */
-  protected boolean next() throws IOException {
-    empty = !rr.next(khead, vhead);
-    return hasNext();
-  }
-
-  /**
    * Add an iterator to the collector at the position occupied by this
    * RecordReader over the values in this stream paired with the key
    * provided (ie register a stream of values from this source matching K
    * with a collector).
    */
-                                 // JoinCollector comes from parent, which has
-  @SuppressWarnings("unchecked") // no static type for the slot this sits in
+  @SuppressWarnings("unchecked")
   public void accept(CompositeRecordReader.JoinCollector i, K key)
-      throws IOException {
+      throws IOException, InterruptedException {
     vjoin.clear();
-    if (0 == cmp.compare(key, khead)) {
+    if (key() != null && 0 == cmp.compare(key, key())) {
       do {
-        vjoin.add(vhead);
-      } while (next() && 0 == cmp.compare(key, khead));
+        vjoin.add(value);
+      } while (next() && 0 == cmp.compare(key, key()));
     }
     i.add(id, vjoin);
   }
 
   /**
-   * Write key-value pair at the head of this stream to the objects provided;
-   * get next key-value pair from proxied RR.
+   * Read the next k,v pair into the head of this object; return true iff
+   * the RR and this are exhausted.
    */
-  public boolean next(K key, U value) throws IOException {
+  public boolean nextKeyValue() throws IOException, InterruptedException {
     if (hasNext()) {
-      WritableUtils.cloneInto(key, khead);
-      WritableUtils.cloneInto(value, vhead);
       next();
       return true;
     }
@@ -147,31 +179,35 @@
   }
 
   /**
-   * Request new key from proxied RR.
+   * Read the next k,v pair into the head of this object; return true iff
+   * the RR and this are exhausted.
    */
-  public K createKey() {
-    return rr.createKey();
+  private boolean next() throws IOException, InterruptedException {
+    empty = !rr.nextKeyValue();
+    key = rr.getCurrentKey();
+    value = rr.getCurrentValue();
+    return !empty;
   }
 
   /**
-   * Request new value from proxied RR.
+   * Get current key 
    */
-  public U createValue() {
-    return rr.createValue();
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return rr.getCurrentKey();
   }
 
   /**
-   * Request progress from proxied RR.
+   * Get current value
    */
-  public float getProgress() throws IOException {
-    return rr.getProgress();
+  public U getCurrentValue() throws IOException, InterruptedException {
+    return rr.getCurrentValue();
   }
 
   /**
-   * Request position from proxied RR.
+   * Request progress from proxied RR.
    */
-  public long getPos() throws IOException {
-    return rr.getPos();
+  public float getProgress() throws IOException, InterruptedException {
+    return rr.getProgress();
   }
 
   /**
@@ -202,5 +238,4 @@
     assert false : "hashCode not designed";
     return 42;
   }
-
 }

Copied: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html (from r807076, hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/package.html)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html?p2=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html&p1=hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/package.html&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/join/package.html (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/package.html Mon Aug 24 06:19:21 2009
@@ -31,11 +31,11 @@
 
 <table>
 <tr><th>property</th><th>required</th><th>value</th></tr>
-<tr><td>mapred.join.expr</td><td>yes</td>
+<tr><td>mapreduce.join.expr</td><td>yes</td>
     <td>Join expression to effect over input data</td></tr>
-<tr><td>mapred.join.keycomparator</td><td>no</td>
+<tr><td>mapreduce.join.keycomparator</td><td>no</td>
     <td><tt>WritableComparator</tt> class to use for comparing keys</td></tr>
-<tr><td>mapred.join.define.&lt;ident&gt;</td><td>no</td>
+<tr><td>mapreduce.join.define.&lt;ident&gt;</td><td>no</td>
     <td>Class mapped to identifier in join expression</td></tr>
 </table>
 
@@ -66,16 +66,16 @@
 <tt>CompositeInputFormat</tt> and define a join expression accepted by the
 preceding grammar. For example, both of the following are acceptable:</p>
 
-<pre>inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+<pre>inner(tbl(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
           "hdfs://host:8020/foo/bar"),
-      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+      tbl(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
           "hdfs://host:8020/foo/baz"))
 
-outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+outer(override(tbl(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
                    "hdfs://host:8020/foo/bar"),
-               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
+               tbl(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
                    "hdfs://host:8020/foo/baz")),
-      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
+      tbl(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
           "hdfs://host:8020/foo/rab"))
 </pre>
 
@@ -83,14 +83,14 @@
 aid construction of these verbose statements.</p>
 
 <p>As in the second example, joins may be nested. Users may provide a
-comparator class in the <tt>mapred.join.keycomparator</tt> property to specify
+comparator class in the <tt>mapreduce.join.keycomparator</tt> property to specify
 the ordering of their keys, or accept the default comparator as returned by
 <tt>WritableComparator.get(keyclass)</tt>.</p>
 
 <p>Users can specify their own join operations, typically by overriding
 <tt>JoinRecordReader</tt> or <tt>MultiFilterRecordReader</tt> and mapping that
 class to an identifier in the join expression using the
-<tt>mapred.join.define.<em>ident</em></tt> property, where <em>ident</em> is
+<tt>mapreduce.join.define.<em>ident</em></tt> property, where <em>ident</em> is
 the identifier appearing in the join expression. Users may elect to emit- or
 modify- values passing through their join operation. Consulting the existing
 operations for guidance is recommended. Adding arguments is considerably more

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Mon Aug 24 06:19:21 2009
@@ -53,6 +53,10 @@
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>
      <Match>
+       <Class name="~org.apache.hadoop.mapred.join.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
+     <Match>
        <Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=807096&r1=807095&r2=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Mon Aug 24 06:19:21 2009
@@ -18,22 +18,28 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.text.NumberFormat;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -43,6 +49,7 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Utility methods used in various Job Control unit tests.
@@ -237,6 +244,99 @@
     }
   }
 
+  public static class IncomparableKey implements WritableComparable<Object> {
+    public void write(DataOutput out) { }
+    public void readFields(DataInput in) { }
+    public int compareTo(Object o) {
+      throw new RuntimeException("Should never see this.");
+    }
+  }
+
+  public static class FakeSplit extends InputSplit implements Writable {
+    public void write(DataOutput out) throws IOException { }
+    public void readFields(DataInput in) throws IOException { }
+    public long getLength() { return 0L; }
+    public String[] getLocations() { return new String[0]; }
+  }
+
+  public static class Fake_IF<K,V>
+    extends InputFormat<K, V> 
+    implements Configurable {
+
+    public Fake_IF() { }
+
+    public List<InputSplit> getSplits(JobContext context) {
+      List<InputSplit> ret = new ArrayList<InputSplit>(); 
+      ret.add(new FakeSplit());
+      return ret;
+    }
+    public static void setKeyClass(Configuration conf, Class<?> k) {
+      conf.setClass("test.fakeif.keyclass", k, WritableComparable.class);
+    }
+
+    public static void setValClass(Configuration job, Class<?> v) {
+      job.setClass("test.fakeif.valclass", v, Writable.class);
+    }
+
+    protected Class<? extends K> keyclass;
+    protected Class<? extends V> valclass;
+    Configuration conf = null;
+
+    @SuppressWarnings("unchecked")
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      keyclass = (Class<? extends K>) conf.getClass("test.fakeif.keyclass",
+          NullWritable.class, WritableComparable.class);
+      valclass = (Class<? extends V>) conf.getClass("test.fakeif.valclass",
+          NullWritable.class, WritableComparable.class);
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+    
+    public RecordReader<K,V> createRecordReader(
+        InputSplit ignored, TaskAttemptContext context) {
+      return new RecordReader<K,V>() {
+        public boolean nextKeyValue() throws IOException { return false; }
+        public void initialize(InputSplit split, TaskAttemptContext context) 
+            throws IOException, InterruptedException {}
+        public K getCurrentKey() {
+        return null;
+        }
+        public V getCurrentValue() {
+          return null;
+        }
+        public void close() throws IOException { }
+        public float getProgress() throws IOException { return 0.0f; }
+      };
+    }
+  }
+  
+  public static class Fake_RR<K, V> extends RecordReader<K,V> {
+    private Class<? extends K> keyclass;
+    private Class<? extends V> valclass;
+    public boolean nextKeyValue() throws IOException { return false; }
+    @SuppressWarnings("unchecked")
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      keyclass = (Class<? extends K>) conf.getClass("test.fakeif.keyclass",
+        NullWritable.class, WritableComparable.class);
+      valclass = (Class<? extends V>) conf.getClass("test.fakeif.valclass",
+        NullWritable.class, WritableComparable.class);
+      
+    }
+    public K getCurrentKey() {
+      return ReflectionUtils.newInstance(keyclass, null);
+    }
+    public V getCurrentValue() {
+      return ReflectionUtils.newInstance(valclass, null);
+    }
+    public void close() throws IOException { }
+    public float getProgress() throws IOException { return 0.0f; }
+  }
+
   public static Job createJob(Configuration conf, Path inDir, Path outDir, 
       int numInputFiles, int numReds) throws IOException {
     String input = "The quick brown fox\n" + "has many silly\n"

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java (from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestDatamerge.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinDatamerge.java Mon Aug 24 06:19:21 2009
@@ -15,12 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 
 import junit.framework.Test;
 import junit.framework.TestCase;
@@ -34,31 +31,17 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
-public class TestDatamerge extends TestCase {
+public class TestJoinDatamerge extends TestCase {
 
   private static MiniDFSCluster cluster = null;
   public static Test suite() {
-    TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
+    TestSetup setup = new TestSetup(new TestSuite(TestJoinDatamerge.class)) {
       protected void setUp() throws Exception {
         Configuration conf = new Configuration();
         cluster = new MiniDFSCluster(conf, 2, true, null);
@@ -123,36 +106,45 @@
     return sb.toString();
   }
 
-  private static abstract class SimpleCheckerBase<V extends Writable>
-      implements Mapper<IntWritable, V, IntWritable, IntWritable>,
-                 Reducer<IntWritable, IntWritable, Text, Text> {
+  private static abstract class SimpleCheckerMapBase<V extends Writable>
+      extends Mapper<IntWritable, V, IntWritable, IntWritable>{
     protected final static IntWritable one = new IntWritable(1);
     int srcs;
-    public void close() { }
-    public void configure(JobConf job) {
-      srcs = job.getInt("testdatamerge.sources", 0);
+    
+    public void setup(Context context) {
+      srcs = context.getConfiguration().getInt("testdatamerge.sources", 0);
       assertTrue("Invalid src count: " + srcs, srcs > 0);
     }
-    public abstract void map(IntWritable key, V val,
-        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
-        throws IOException;
-    public void reduce(IntWritable key, Iterator<IntWritable> values,
-                       OutputCollector<Text, Text> output,
-                       Reporter reporter) throws IOException {
+  }
+
+  private static abstract class SimpleCheckerReduceBase
+      extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    protected final static IntWritable one = new IntWritable(1);
+
+    int srcs;
+    
+    public void setup(Context context) {
+      srcs = context.getConfiguration().getInt("testdatamerge.sources", 0);
+      assertTrue("Invalid src count: " + srcs, srcs > 0);
+    }
+
+    public void reduce(IntWritable key, Iterable<IntWritable> values,
+        Context context) throws IOException, InterruptedException {
       int seen = 0;
-      while (values.hasNext()) {
-        seen += values.next().get();
+      for (IntWritable value : values) {
+        seen += value.get();
       }
       assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
+      context.write(key, new IntWritable(seen));
     }
+    
     public abstract boolean verify(int key, int occ);
   }
 
-  private static class InnerJoinChecker
-      extends SimpleCheckerBase<TupleWritable> {
-    public void map(IntWritable key, TupleWritable val,
-        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
-        throws IOException {
+  private static class InnerJoinMapChecker
+      extends SimpleCheckerMapBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val, Context context)
+        throws IOException, InterruptedException {
       int k = key.get();
       final String kvstr = "Unexpected tuple: " + stringify(key, val);
       assertTrue(kvstr, 0 == k % (srcs * srcs));
@@ -160,19 +152,28 @@
         final int vali = ((IntWritable)val.get(i)).get();
         assertTrue(kvstr, (vali - i) * srcs == 10 * k);
       }
-      out.collect(key, one);
+      context.write(key, one);
+      // If the user modifies the key or any of the values in the tuple, it
+      // should not affect the rest of the join.
+      key.set(-1);
+      if (val.has(0)) {
+        ((IntWritable)val.get(0)).set(0);
+      }
     }
+  }
+
+  private static class InnerJoinReduceChecker
+    extends SimpleCheckerReduceBase {
     public boolean verify(int key, int occ) {
       return (key == 0 && occ == 2) ||
-             (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
+         (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
     }
   }
-
-  private static class OuterJoinChecker
-      extends SimpleCheckerBase<TupleWritable> {
-    public void map(IntWritable key, TupleWritable val,
-        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
-        throws IOException {
+  
+  private static class OuterJoinMapChecker
+      extends SimpleCheckerMapBase<TupleWritable> {
+    public void map(IntWritable key, TupleWritable val, Context context)
+        throws IOException, InterruptedException {
       int k = key.get();
       final String kvstr = "Unexpected tuple: " + stringify(key, val);
       if (0 == k % (srcs * srcs)) {
@@ -192,20 +193,30 @@
           }
         }
       }
-      out.collect(key, one);
+      context.write(key, one);
+      //If the user modifies the key or any of the values in the tuple, it
+      // should not affect the rest of the join.
+      key.set(-1);
+      if (val.has(0)) {
+        ((IntWritable)val.get(0)).set(0);
+      }
     }
+  }
+
+  private static class OuterJoinReduceChecker
+      extends SimpleCheckerReduceBase {
     public boolean verify(int key, int occ) {
-      if (key < srcs * srcs && (key % (srcs + 1)) == 0)
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0) {
         return 2 == occ;
+      }
       return 1 == occ;
     }
   }
-
-  private static class OverrideChecker
-      extends SimpleCheckerBase<IntWritable> {
-    public void map(IntWritable key, IntWritable val,
-        OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
-        throws IOException {
+  
+  private static class OverrideMapChecker
+      extends SimpleCheckerMapBase<IntWritable> {
+    public void map(IntWritable key, IntWritable val, Context context)
+        throws IOException, InterruptedException {
       int k = key.get();
       final int vali = val.get();
       final String kvstr = "Unexpected tuple: " + stringify(key, val);
@@ -215,53 +226,110 @@
         final int i = k % srcs;
         assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
       }
-      out.collect(key, one);
+      context.write(key, one);
+      //If the user modifies the key or any of the values in the tuple, it
+      // should not affect the rest of the join.
+      key.set(-1);
+      val.set(0);
     }
+  }
+  
+  private static class OverrideReduceChecker
+      extends SimpleCheckerReduceBase {
     public boolean verify(int key, int occ) {
-      if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
+      if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0) {
         return 2 == occ;
+      }
       return 1 == occ;
     }
   }
 
-  private static void joinAs(String jointype,
-      Class<? extends SimpleCheckerBase> c) throws Exception {
+  private static void joinAs(String jointype, 
+      Class<? extends SimpleCheckerMapBase<?>> map, 
+      Class<? extends SimpleCheckerReduceBase> reduce) throws Exception {
     final int srcs = 4;
     Configuration conf = new Configuration();
-    JobConf job = new JobConf(conf, c);
     Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
     Path[] src = writeSimpleSrc(base, conf, srcs);
-    job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
+    conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(jointype,
         SequenceFileInputFormat.class, src));
-    job.setInt("testdatamerge.sources", srcs);
-    job.setInputFormat(CompositeInputFormat.class);
+    conf.setInt("testdatamerge.sources", srcs);
+    Job job = new Job(conf);
+    job.setInputFormatClass(CompositeInputFormat.class);
     FileOutputFormat.setOutputPath(job, new Path(base, "out"));
 
-    job.setMapperClass(c);
-    job.setReducerClass(c);
+    job.setMapperClass(map);
+    job.setReducerClass(reduce);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(IntWritable.class);
-    JobClient.runJob(job);
-    base.getFileSystem(job).delete(base, true);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    if ("outer".equals(jointype)) {
+      checkOuterConsistency(job, src);
+    }
+    base.getFileSystem(conf).delete(base, true);
   }
 
   public void testSimpleInnerJoin() throws Exception {
-    joinAs("inner", InnerJoinChecker.class);
+    joinAs("inner", InnerJoinMapChecker.class, InnerJoinReduceChecker.class);
   }
 
   public void testSimpleOuterJoin() throws Exception {
-    joinAs("outer", OuterJoinChecker.class);
+    joinAs("outer", OuterJoinMapChecker.class, OuterJoinReduceChecker.class);
+  }
+  
+  private static void checkOuterConsistency(Job job, Path[] src) 
+      throws IOException {
+    Path outf = FileOutputFormat.getOutputPath(job);
+    FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
+    assertEquals("number of part files is more than 1. It is" + outlist.length,
+      1, outlist.length);
+    assertTrue("output file with zero length" + outlist[0].getLen(),
+      0 < outlist[0].getLen());
+    SequenceFile.Reader r =
+      new SequenceFile.Reader(cluster.getFileSystem(),
+          outlist[0].getPath(), job.getConfiguration());
+    IntWritable k = new IntWritable();
+    IntWritable v = new IntWritable();
+    while (r.next(k, v)) {
+      assertEquals("counts does not match", v.get(),
+        countProduct(k, src, job.getConfiguration()));
+    }
+    r.close();
   }
 
+  private static int countProduct(IntWritable key, Path[] src, 
+      Configuration conf) throws IOException {
+    int product = 1;
+    for (Path p : src) {
+      int count = 0;
+      SequenceFile.Reader r = new SequenceFile.Reader(
+        cluster.getFileSystem(), p, conf);
+      IntWritable k = new IntWritable();
+      IntWritable v = new IntWritable();
+      while (r.next(k, v)) {
+        if (k.equals(key)) {
+          count++;
+        }
+      }
+      r.close();
+      if (count != 0) {
+        product *= count;
+      }
+    }
+    return product;
+  }
+  
   public void testSimpleOverride() throws Exception {
-    joinAs("override", OverrideChecker.class);
+    joinAs("override", OverrideMapChecker.class, OverrideReduceChecker.class);
   }
 
   public void testNestedJoin() throws Exception {
     // outer(inner(S1,...,Sn),outer(S1,...Sn))
     final int SOURCES = 3;
     final int ITEMS = (SOURCES + 1) * (SOURCES + 1);
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
     int[][] source = new int[SOURCES][];
     for (int i = 0; i < SOURCES; ++i) {
@@ -271,7 +339,7 @@
       }
     }
     Path[] src = new Path[SOURCES];
-    SequenceFile.Writer out[] = createWriters(base, job, SOURCES, src);
+    SequenceFile.Writer out[] = createWriters(base, conf, SOURCES, src);
     IntWritable k = new IntWritable();
     for (int i = 0; i < SOURCES; ++i) {
       IntWritable v = new IntWritable();
@@ -287,13 +355,13 @@
     StringBuilder sb = new StringBuilder();
     sb.append("outer(inner(");
     for (int i = 0; i < SOURCES; ++i) {
-      sb.append(
-          CompositeInputFormat.compose(SequenceFileInputFormat.class,
-            src[i].toString()));
+      sb.append(CompositeInputFormat.compose(SequenceFileInputFormat.class,
+        src[i].toString()));
       if (i + 1 != SOURCES) sb.append(",");
     }
     sb.append("),outer(");
-    sb.append(CompositeInputFormat.compose(Fake_IF.class,"foobar"));
+    sb.append(CompositeInputFormat.compose(
+      MapReduceTestUtil.Fake_IF.class, "foobar"));
     sb.append(",");
     for (int i = 0; i < SOURCES; ++i) {
       sb.append(
@@ -301,28 +369,31 @@
             src[i].toString()));
       sb.append(",");
     }
-    sb.append(CompositeInputFormat.compose(Fake_IF.class,"raboof") + "))");
-    job.set("mapred.join.expr", sb.toString());
-    job.setInputFormat(CompositeInputFormat.class);
+    sb.append(CompositeInputFormat.compose(
+      MapReduceTestUtil.Fake_IF.class, "raboof") + "))");
+    conf.set(CompositeInputFormat.JOIN_EXPR, sb.toString());
+    MapReduceTestUtil.Fake_IF.setKeyClass(conf, IntWritable.class);
+    MapReduceTestUtil.Fake_IF.setValClass(conf, IntWritable.class);
+
+    Job job = new Job(conf);
     Path outf = new Path(base, "out");
     FileOutputFormat.setOutputPath(job, outf);
-    Fake_IF.setKeyClass(job, IntWritable.class);
-    Fake_IF.setValClass(job, IntWritable.class);
-
-    job.setMapperClass(IdentityMapper.class);
-    job.setReducerClass(IdentityReducer.class);
+    job.setInputFormatClass(CompositeInputFormat.class);
+    job.setMapperClass(Mapper.class);
+    job.setReducerClass(Reducer.class);
     job.setNumReduceTasks(0);
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(TupleWritable.class);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    JobClient.runJob(job);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
 
     FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
     assertEquals(1, outlist.length);
     assertTrue(0 < outlist[0].getLen());
     SequenceFile.Reader r =
       new SequenceFile.Reader(cluster.getFileSystem(),
-          outlist[0].getPath(), job);
+          outlist[0].getPath(), conf);
     TupleWritable v = new TupleWritable();
     while (r.next(k, v)) {
       assertFalse(((TupleWritable)v.get(1)).has(0));
@@ -344,77 +415,30 @@
       }
     }
     r.close();
-    base.getFileSystem(job).delete(base, true);
+    base.getFileSystem(conf).delete(base, true);
 
   }
 
   public void testEmptyJoin() throws Exception {
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
     Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
-    job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
-        Fake_IF.class, src));
-    job.setInputFormat(CompositeInputFormat.class);
+    conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose("outer",
+        MapReduceTestUtil.Fake_IF.class, src));
+    MapReduceTestUtil.Fake_IF.setKeyClass(conf, 
+      MapReduceTestUtil.IncomparableKey.class);
+    Job job = new Job(conf);
+    job.setInputFormatClass(CompositeInputFormat.class);
     FileOutputFormat.setOutputPath(job, new Path(base, "out"));
 
-    job.setMapperClass(IdentityMapper.class);
-    job.setReducerClass(IdentityReducer.class);
-    job.setOutputKeyClass(IncomparableKey.class);
+    job.setMapperClass(Mapper.class);
+    job.setReducerClass(Reducer.class);
+    job.setOutputKeyClass(MapReduceTestUtil.IncomparableKey.class);
     job.setOutputValueClass(NullWritable.class);
 
-    JobClient.runJob(job);
-    base.getFileSystem(job).delete(base, true);
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    base.getFileSystem(conf).delete(base, true);
   }
 
-  public static class Fake_IF<K,V>
-      implements InputFormat<K,V>, JobConfigurable {
-
-    public static class FakeSplit implements InputSplit {
-      public void write(DataOutput out) throws IOException { }
-      public void readFields(DataInput in) throws IOException { }
-      public long getLength() { return 0L; }
-      public String[] getLocations() { return new String[0]; }
-    }
-
-    public static void setKeyClass(JobConf job, Class<?> k) {
-      job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
-    }
-
-    public static void setValClass(JobConf job, Class<?> v) {
-      job.setClass("test.fakeif.valclass", v, Writable.class);
-    }
-
-    private Class<? extends K> keyclass;
-    private Class<? extends V> valclass;
-
-    @SuppressWarnings("unchecked")
-    public void configure(JobConf job) {
-      keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
-    IncomparableKey.class, WritableComparable.class);
-      valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
-    NullWritable.class, WritableComparable.class);
-    }
-
-    public Fake_IF() { }
-
-    public InputSplit[] getSplits(JobConf conf, int splits) {
-      return new InputSplit[] { new FakeSplit() };
-    }
-
-    public RecordReader<K,V> getRecordReader(
-        InputSplit ignored, JobConf conf, Reporter reporter) {
-      return new RecordReader<K,V>() {
-        public boolean next(K key, V value) throws IOException { return false; }
-        public K createKey() {
-          return ReflectionUtils.newInstance(keyclass, null);
-        }
-        public V createValue() {
-          return ReflectionUtils.newInstance(valclass, null);
-        }
-        public long getPos() throws IOException { return 0L; }
-        public void close() throws IOException { }
-        public float getProgress() throws IOException { return 0.0f; }
-      };
-    }
-  }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java?rev=807096&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinProperties.java Mon Aug 24 06:19:21 2009
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.join;
+
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+public class TestJoinProperties extends TestCase {
+
+  private static MiniDFSCluster cluster = null;
+  final static int SOURCES = 3;
+  final static int ITEMS = (SOURCES + 1) * (SOURCES + 1);
+  static int[][] source = new int[SOURCES][];
+  static Path[] src;
+  static Path base;
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestJoinProperties.class)) {
+      protected void setUp() throws Exception {
+        Configuration conf = new Configuration();
+        cluster = new MiniDFSCluster(conf, 2, true, null);
+        base = cluster.getFileSystem().makeQualified(new Path("/nested"));
+        src = generateSources(conf);
+      }
+      protected void tearDown() throws Exception {
+        if (cluster != null) {
+          cluster.shutdown();
+        }
+      }
+    };
+    return setup;
+  }
+
+  // Sources from 0 to srcs-2 have IntWritable key and IntWritable value
+  // src-1 source has IntWritable key and LongWritable value.
+  private static SequenceFile.Writer[] createWriters(Path testdir,
+      Configuration conf, int srcs, Path[] src) throws IOException {
+    for (int i = 0; i < srcs; ++i) {
+      src[i] = new Path(testdir, Integer.toString(i + 10, 36));
+    }
+    SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
+    for (int i = 0; i < srcs - 1; ++i) {
+      out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
+          src[i], IntWritable.class, IntWritable.class);
+    }
+    out[srcs - 1] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
+            src[srcs - 1], IntWritable.class, LongWritable.class);
+    return out;
+  }
+
+  private static String stringify(IntWritable key, Writable val) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("(" + key);
+    sb.append("," + val + ")");
+    return sb.toString();
+  }
+
+  private static Path[] generateSources(Configuration conf) 
+      throws IOException {
+    for (int i = 0; i < SOURCES; ++i) {
+      source[i] = new int[ITEMS];
+      for (int j = 0; j < ITEMS; ++j) {
+        source[i][j] = (i + 2) * (j + 1);
+      }
+    }
+    Path[] src = new Path[SOURCES];
+    SequenceFile.Writer out[] = createWriters(base, conf, SOURCES, src);
+    IntWritable k = new IntWritable();
+    for (int i = 0; i < SOURCES; ++i) {
+      Writable v;
+      if (i != SOURCES -1) {
+        v = new IntWritable();
+        ((IntWritable)v).set(i);
+      } else {
+        v = new LongWritable(); 
+        ((LongWritable)v).set(i);
+      }
+      for (int j = 0; j < ITEMS; ++j) {
+        k.set(source[i][j]);
+        out[i].append(k, v);
+      }
+      out[i].close();
+    }
+    return src;
+  }
+  
+  private String A() {
+    return CompositeInputFormat.compose(SequenceFileInputFormat.class,
+      src[0].toString());	  
+  }
+
+  private String B() {
+    return CompositeInputFormat.compose(SequenceFileInputFormat.class,
+      src[1].toString());	  
+  }
+  private String C() {
+    return CompositeInputFormat.compose(SequenceFileInputFormat.class,
+      src[2].toString());	  
+  }
+  
+ // construct op(op(A,B),C)
+  private String constructExpr1(String op) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(op + "(" +op +"(");
+    sb.append(A());
+    sb.append(",");
+    sb.append(B());
+    sb.append("),");
+    sb.append(C());
+    sb.append(")");
+    return sb.toString();
+  }
+  
+  // construct op(A,op(B,C))
+  private String constructExpr2(String op) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(op + "(");
+    sb.append(A());
+    sb.append(",");
+    sb.append(op +"(");
+    sb.append(B());
+    sb.append(",");
+    sb.append(C());
+    sb.append("))");
+    return sb.toString();
+  }
+
+  // construct op(A, B, C))
+  private String constructExpr3(String op) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(op + "(");
+    sb.append(A());
+    sb.append(",");
+    sb.append(B());
+    sb.append(",");
+    sb.append(C());
+    sb.append(")");
+    return sb.toString();
+  }
+
+  // construct override(inner(A, B), A)
+  private String constructExpr4() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("override(inner(");
+    sb.append(A());
+    sb.append(",");
+    sb.append(B());
+    sb.append("),");
+    sb.append(A());
+    sb.append(")");
+    return sb.toString();
+  }
+
+  enum TestType {OUTER_ASSOCIATIVITY, INNER_IDENTITY, INNER_ASSOCIATIVITY}
+  
+  private void validateKeyValue(WritableComparable<?> k, Writable v,
+      int tupleSize, boolean firstTuple, boolean secondTuple,
+      TestType ttype) throws IOException {
+    System.out.println("out k:" + k + " v:" + v);
+    if (ttype.equals(TestType.OUTER_ASSOCIATIVITY)) {
+      validateOuterKeyValue((IntWritable)k, (TupleWritable)v, tupleSize,
+        firstTuple, secondTuple);
+    } else if (ttype.equals(TestType.INNER_ASSOCIATIVITY)) {
+      validateInnerKeyValue((IntWritable)k, (TupleWritable)v, tupleSize,
+        firstTuple, secondTuple);
+    }
+    if (ttype.equals(TestType.INNER_IDENTITY)) {
+      validateKeyValue_INNER_IDENTITY((IntWritable)k, (IntWritable)v);
+    }
+  }
+
+  private void testExpr1(Configuration conf, String op, TestType ttype,
+      int expectedCount) throws Exception {
+    String joinExpr = constructExpr1(op);
+    conf.set(CompositeInputFormat.JOIN_EXPR, joinExpr);
+    int count = testFormat(conf, 2, true, false, ttype);
+    assertTrue("not all keys present", count == expectedCount);
+  }
+
+  private void testExpr2(Configuration conf, String op, TestType ttype,
+      int expectedCount) throws Exception {
+    String joinExpr = constructExpr2(op);
+    conf.set(CompositeInputFormat.JOIN_EXPR, joinExpr);
+    int count = testFormat(conf, 2, false, true, ttype);
+    assertTrue("not all keys present", count == expectedCount);
+  }
+
+  private void testExpr3(Configuration conf, String op, TestType ttype,
+      int expectedCount) throws Exception {
+    String joinExpr = constructExpr3(op);
+    conf.set(CompositeInputFormat.JOIN_EXPR, joinExpr);
+    int count = testFormat(conf, 3, false, false, ttype);
+    assertTrue("not all keys present", count == expectedCount);
+  }
+
+  private void testExpr4(Configuration conf) throws Exception {
+    String joinExpr = constructExpr4();
+    conf.set(CompositeInputFormat.JOIN_EXPR, joinExpr);
+    int count = testFormat(conf, 0, false, false, TestType.INNER_IDENTITY);
+    assertTrue("not all keys present", count == ITEMS);
+  }
+
+  // outer(outer(A, B), C) == outer(A,outer(B, C)) == outer(A, B, C)
+  public void testOuterAssociativity() throws Exception {
+    Configuration conf = new Configuration();
+    testExpr1(conf, "outer", TestType.OUTER_ASSOCIATIVITY, 33);
+    testExpr2(conf, "outer", TestType.OUTER_ASSOCIATIVITY, 33);
+    testExpr3(conf, "outer", TestType.OUTER_ASSOCIATIVITY, 33);
+  }
+ 
+  // inner(inner(A, B), C) == inner(A,inner(B, C)) == inner(A, B, C)
+  public void testInnerAssociativity() throws Exception {
+    Configuration conf = new Configuration();
+    testExpr1(conf, "inner", TestType.INNER_ASSOCIATIVITY, 2);
+    testExpr2(conf, "inner", TestType.INNER_ASSOCIATIVITY, 2);
+    testExpr3(conf, "inner", TestType.INNER_ASSOCIATIVITY, 2);
+  }
+
+  // override(inner(A, B), A) == A
+  public void testIdentity() throws Exception {
+    Configuration conf = new Configuration();
+    testExpr4(conf);
+  }
+  
+  private void validateOuterKeyValue(IntWritable k, TupleWritable v, 
+      int tupleSize, boolean firstTuple, boolean secondTuple) {
+	final String kvstr = "Unexpected tuple: " + stringify(k, v);
+	assertTrue(kvstr, v.size() == tupleSize);
+	int key = k.get();
+	IntWritable val0 = null;
+	IntWritable val1 = null;
+	LongWritable val2 = null;
+	if (firstTuple) {
+      TupleWritable v0 = ((TupleWritable)v.get(0));
+      if (key % 2 == 0 && key / 2 <= ITEMS) {
+        val0 = (IntWritable)v0.get(0);
+      } else {
+        assertFalse(kvstr, v0.has(0));
+      }
+      if (key % 3 == 0 && key / 3 <= ITEMS) {
+        val1 = (IntWritable)v0.get(1);
+      } else {
+        assertFalse(kvstr, v0.has(1));
+      }
+      if (key % 4 == 0 && key / 4 <= ITEMS) {
+        val2 = (LongWritable)v.get(1);
+      } else {
+        assertFalse(kvstr, v.has(2));
+      }
+    } else if (secondTuple) {
+      if (key % 2 == 0 && key / 2 <= ITEMS) {
+        val0 = (IntWritable)v.get(0);
+      } else {
+        assertFalse(kvstr, v.has(0));
+      }
+      TupleWritable v1 = ((TupleWritable)v.get(1));
+      if (key % 3 == 0 && key / 3 <= ITEMS) {
+        val1 = (IntWritable)v1.get(0);
+      } else {
+        assertFalse(kvstr, v1.has(0));
+      }
+      if (key % 4 == 0 && key / 4 <= ITEMS) {
+        val2 = (LongWritable)v1.get(1);
+      } else {
+        assertFalse(kvstr, v1.has(1));
+      }
+    } else {
+      if (key % 2 == 0 && key / 2 <= ITEMS) {
+        val0 = (IntWritable)v.get(0);
+      } else {
+        assertFalse(kvstr, v.has(0));
+      }
+      if (key % 3 == 0 && key / 3 <= ITEMS) {
+        val1 = (IntWritable)v.get(1);
+      } else {
+        assertFalse(kvstr, v.has(1));
+      }
+      if (key % 4 == 0 && key / 4 <= ITEMS) {
+        val2 = (LongWritable)v.get(2);
+      } else {
+        assertFalse(kvstr, v.has(2));
+      }
+    }
+	if (val0 != null) {
+      assertTrue(kvstr, val0.get() == 0);
+    }
+	if (val1 != null) {
+      assertTrue(kvstr, val1.get() == 1);
+    }
+	if (val2 != null) {
+      assertTrue(kvstr, val2.get() == 2);
+    }
+  }
+
+  private void validateInnerKeyValue(IntWritable k, TupleWritable v,
+      int tupleSize, boolean firstTuple, boolean secondTuple) {
+	final String kvstr = "Unexpected tuple: " + stringify(k, v);
+	assertTrue(kvstr, v.size() == tupleSize);
+	int key = k.get();
+	IntWritable val0 = null;
+	IntWritable val1 = null;
+	LongWritable val2 = null;
+	assertTrue(kvstr, key % 2 == 0 && key / 2 <= ITEMS);
+	assertTrue(kvstr, key % 3 == 0 && key / 3 <= ITEMS);
+	assertTrue(kvstr, key % 4 == 0 && key / 4 <= ITEMS);
+	if (firstTuple) {
+      TupleWritable v0 = ((TupleWritable)v.get(0));
+      val0 = (IntWritable)v0.get(0);
+      val1 = (IntWritable)v0.get(1);
+      val2 = (LongWritable)v.get(1);
+    } else if (secondTuple) {
+      val0 = (IntWritable)v.get(0);
+      TupleWritable v1 = ((TupleWritable)v.get(1));
+      val1 = (IntWritable)v1.get(0);
+      val2 = (LongWritable)v1.get(1);
+    } else {
+      val0 = (IntWritable)v.get(0);
+      val1 = (IntWritable)v.get(1);
+      val2 = (LongWritable)v.get(2);
+    }
+    assertTrue(kvstr, val0.get() == 0);
+    assertTrue(kvstr, val1.get() == 1);
+    assertTrue(kvstr, val2.get() == 2);
+  }
+
+  private void validateKeyValue_INNER_IDENTITY(IntWritable k, IntWritable v) {
+    final String kvstr = "Unexpected tuple: " + stringify(k, v);
+    int key = k.get();
+    assertTrue(kvstr, (key % 2 == 0 && key / 2 <= ITEMS));
+    assertTrue(kvstr, v.get() == 0);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public int testFormat(Configuration conf, int tupleSize,
+      boolean firstTuple, boolean secondTuple, TestType ttype)
+      throws Exception {
+    Job job = new Job(conf);
+    CompositeInputFormat format = new CompositeInputFormat();
+    int count = 0;
+    for (InputSplit split : (List<InputSplit>)format.getSplits(job)) {
+      TaskAttemptContext context = 
+        MapReduceTestUtil.createDummyMapTaskAttemptContext(conf);
+        RecordReader reader = format.createRecordReader(
+	            split, context);
+      MapContext mcontext = 
+        new MapContext(conf, 
+        context.getTaskAttemptID(), reader, null, null, 
+        MapReduceTestUtil.createDummyReporter(), split);
+      reader.initialize(split, mcontext);
+
+      WritableComparable key = null;
+      Writable value = null;
+      while (reader.nextKeyValue()) {
+        key = (WritableComparable) reader.getCurrentKey();
+        value = (Writable) reader.getCurrentValue();
+        validateKeyValue(key, value, 
+          tupleSize, firstTuple, secondTuple, ttype);
+        count++;
+      }
+    }
+    return count;
+  }
+
+}

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java (from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestTupleWritable.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestTupleWritable.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestTupleWritable.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestJoinTupleWritable.java Mon Aug 24 06:19:21 2009
@@ -15,14 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
@@ -35,9 +33,8 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
-public class TestTupleWritable extends TestCase {
+public class TestJoinTupleWritable extends TestCase {
 
   private TupleWritable makeTuple(Writable[] writs) {
     Writable[] sub1 = { writs[1], writs[2] };
@@ -178,7 +175,8 @@
     TupleWritable dTuple = new TupleWritable();
     dTuple.readFields(new DataInputStream(in));
     assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
-    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+    assertEquals("All tuple data has not been read from the stream", 
+      -1, in.read());
   }
   
   public void testWideWritable2() throws Exception {
@@ -195,7 +193,8 @@
     TupleWritable dTuple = new TupleWritable();
     dTuple.readFields(new DataInputStream(in));
     assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
-    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+    assertEquals("All tuple data has not been read from the stream", 
+      -1, in.read());
   }
   
   /**
@@ -218,7 +217,8 @@
     TupleWritable dTuple = new TupleWritable();
     dTuple.readFields(new DataInputStream(in));
     assertTrue("Failed to write/read tuple", sTuple.equals(dTuple));
-    assertEquals("All tuple data has not been read from the stream",-1,in.read());
+    assertEquals("All tuple data has not been read from the stream", 
+      -1, in.read());
   }
   
   public void testWideTuple() throws Exception {
@@ -236,7 +236,8 @@
         assertTrue(has);
       }
       else {
-        assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
+        assertFalse("Tuple position is incorrectly labelled as set: " + pos,
+          has);
       }
     }
   }
@@ -256,7 +257,8 @@
         assertTrue(has);
       }
       else {
-        assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
+        assertFalse("Tuple position is incorrectly labelled as set: " + pos,
+          has);
       }
     }
   }
@@ -279,97 +281,9 @@
         assertTrue(has);
       }
       else {
-        assertFalse("Tuple position is incorrectly labelled as set: " + pos, has);
+        assertFalse("Tuple position is incorrectly labelled as set: " + pos,
+          has);
       }
     }
   }
-  
-  /**
-   * Tests compatibility with pre-0.21 versions of TupleWritable
-   */
-  public void testPreVersion21Compatibility() throws Exception {
-    Writable[] manyWrits = makeRandomWritables(64);
-    PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
-    
-    for (int i =0; i<manyWrits.length; i++) {
-      if (i % 3 == 0) {
-        oldTuple.setWritten(i);
-      }
-    }
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    oldTuple.write(new DataOutputStream(out));
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    TupleWritable dTuple = new TupleWritable();
-    dTuple.readFields(new DataInputStream(in));
-    assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
-    assertEquals("All tuple data has not been read from the stream",-1,in.read());
-  }
-  
-  public void testPreVersion21CompatibilityEmptyTuple() throws Exception {
-    Writable[] manyWrits = new Writable[0];
-    PreVersion21TupleWritable oldTuple = new PreVersion21TupleWritable(manyWrits);
-    // don't set any values written
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    oldTuple.write(new DataOutputStream(out));
-    ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
-    TupleWritable dTuple = new TupleWritable();
-    dTuple.readFields(new DataInputStream(in));
-    assertTrue("Tuple writable is unable to read pre-0.21 versions of TupleWritable", oldTuple.isCompatible(dTuple));
-    assertEquals("All tuple data has not been read from the stream",-1,in.read());
-  }
-  
-  /**
-   * Writes to the DataOutput stream in the same way as pre-0.21 versions of
-   * {@link TupleWritable#write(DataOutput)}
-   */
-  private static class PreVersion21TupleWritable {
-    
-    private Writable[] values;
-    private long written = 0L;
-
-    private PreVersion21TupleWritable(Writable[] vals) {
-      written = 0L;
-      values = vals;
-    }
-        
-    private void setWritten(int i) {
-      written |= 1L << i;
-    }
-
-    private boolean has(int i) {
-      return 0 != ((1L << i) & written);
-    }
-    
-    private void write(DataOutput out) throws IOException {
-      WritableUtils.writeVInt(out, values.length);
-      WritableUtils.writeVLong(out, written);
-      for (int i = 0; i < values.length; ++i) {
-        Text.writeString(out, values[i].getClass().getName());
-      }
-      for (int i = 0; i < values.length; ++i) {
-        if (has(i)) {
-          values[i].write(out);
-        }
-      }
-    }
-    
-    public int size() {
-      return values.length;
-    }
-    
-    public boolean isCompatible(TupleWritable that) {
-      if (this.size() != that.size()) {
-        return false;
-      }      
-      for (int i = 0; i < values.length; ++i) {
-        if (has(i)!=that.has(i)) {
-          return false;
-        }
-        if (has(i) && !values[i].equals(that.get(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
 }

Copied: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java (from r807076, hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestWrappedRecordReaderClassloader.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java?p2=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java&p1=hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestWrappedRecordReaderClassloader.java&r1=807076&r2=807096&rev=807096&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/join/TestWrappedRecordReaderClassloader.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/join/TestWrappedRRClassloader.java Mon Aug 24 06:19:21 2009
@@ -15,144 +15,71 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapred.join;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+package org.apache.hadoop.mapreduce.lib.join;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil.Fake_RR;
 
-public class TestWrappedRecordReaderClassloader extends TestCase {
+public class TestWrappedRRClassloader extends TestCase {
   /**
-   * Tests the class loader set by {@link JobConf#setClassLoader(ClassLoader)}
+   * Tests the class loader set by 
+   * {@link Configuration#setClassLoader(ClassLoader)}
    * is inherited by any {@link WrappedRecordReader}s created by
    * {@link CompositeRecordReader}
    */
   public void testClassLoader() throws Exception {
-    JobConf job = new JobConf();
+    Configuration conf = new Configuration();
     Fake_ClassLoader classLoader = new Fake_ClassLoader();
-    job.setClassLoader(classLoader);
-    assertTrue(job.getClassLoader() instanceof Fake_ClassLoader);
+    conf.setClassLoader(classLoader);
+    assertTrue(conf.getClassLoader() instanceof Fake_ClassLoader);
 
-    FileSystem fs = FileSystem.get(job);
+    FileSystem fs = FileSystem.get(conf);
     Path testdir = new Path(System.getProperty("test.build.data", "/tmp"))
         .makeQualified(fs);
 
     Path base = new Path(testdir, "/empty");
     Path[] src = { new Path(base, "i0"), new Path("i1"), new Path("i2") };
-    job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
-        IF_ClassLoaderChecker.class, src));
+    conf.set(CompositeInputFormat.JOIN_EXPR, 
+      CompositeInputFormat.compose("outer", IF_ClassLoaderChecker.class, src));
 
-    CompositeInputFormat<NullWritable> inputFormat = new CompositeInputFormat<NullWritable>();
-    inputFormat.getRecordReader(inputFormat.getSplits(job, 1)[0], job,
-        Reporter.NULL);
+    CompositeInputFormat<NullWritable> inputFormat = 
+      new CompositeInputFormat<NullWritable>();
+    // create dummy TaskAttemptID
+    TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.MAP, 0, 0);
+    conf.set("mapred.task.id", tid.toString());
+    inputFormat.createRecordReader(inputFormat.getSplits(new Job(conf)).get(0), 
+      new TaskAttemptContext(conf, tid));
   }
 
   public static class Fake_ClassLoader extends ClassLoader {
   }
 
-  public static class IF_ClassLoaderChecker<K, V> implements InputFormat<K, V>,
-      JobConfigurable {
-
-    public static class FakeSplit implements InputSplit {
-      public void write(DataOutput out) throws IOException {
-      }
-
-      public void readFields(DataInput in) throws IOException {
-      }
-
-      public long getLength() {
-        return 0L;
-      }
-
-      public String[] getLocations() {
-        return new String[0];
-      }
-    }
-
-    public static void setKeyClass(JobConf job, Class<?> k) {
-      job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
-    }
-
-    public static void setValClass(JobConf job, Class<?> v) {
-      job.setClass("test.fakeif.valclass", v, Writable.class);
-    }
-
-    protected Class<? extends K> keyclass;
-    protected Class<? extends V> valclass;
-
-    @SuppressWarnings("unchecked")
-    public void configure(JobConf job) {
-      keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
-          NullWritable.class, WritableComparable.class);
-      valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
-          NullWritable.class, WritableComparable.class);
-    }
+  public static class IF_ClassLoaderChecker<K, V> 
+      extends MapReduceTestUtil.Fake_IF<K, V> {
 
     public IF_ClassLoaderChecker() {
     }
 
-    public InputSplit[] getSplits(JobConf conf, int splits) {
-      return new InputSplit[] { new FakeSplit() };
-    }
-
-    public RecordReader<K, V> getRecordReader(InputSplit ignored, JobConf job,
-        Reporter reporter) {
-      return new RR_ClassLoaderChecker<K, V>(job);
+    public RecordReader<K, V> createRecordReader(InputSplit ignored, 
+        TaskAttemptContext context) {
+      return new RR_ClassLoaderChecker<K, V>(context.getConfiguration());
     }
   }
 
-  public static class RR_ClassLoaderChecker<K, V> implements RecordReader<K, V> {
-    private Class<? extends K> keyclass;
-    private Class<? extends V> valclass;
+  public static class RR_ClassLoaderChecker<K, V> extends Fake_RR<K, V> {
 
     @SuppressWarnings("unchecked")
-    public RR_ClassLoaderChecker(JobConf job) {
+    public RR_ClassLoaderChecker(Configuration conf) {
       assertTrue("The class loader has not been inherited from "
           + CompositeRecordReader.class.getSimpleName(),
-          job.getClassLoader() instanceof Fake_ClassLoader);
-
-      keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
-          NullWritable.class, WritableComparable.class);
-      valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
-          NullWritable.class, WritableComparable.class);
-    }
-
-    public boolean next(K key, V value) throws IOException {
-      return false;
-    }
-
-    public K createKey() {
-      return ReflectionUtils.newInstance(keyclass, null);
-    }
-
-    public V createValue() {
-      return ReflectionUtils.newInstance(valclass, null);
-    }
-
-    public long getPos() throws IOException {
-      return 0L;
-    }
-
-    public void close() throws IOException {
-    }
+          conf.getClassLoader() instanceof Fake_ClassLoader);
 
-    public float getProgress() throws IOException {
-      return 0.0f;
     }
   }
 }



Mime
View raw message