hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r808036 [3/4] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/data_join/ src/contrib/dynamic-scheduler/ src/contrib/eclipse-plugin/ src/...
Date Wed, 26 Aug 2009 15:01:34 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableInputFormat.java Wed Aug 26 15:01:29 2009
@@ -30,7 +30,11 @@
 /**
  * Refinement of InputFormat requiring implementors to provide
  * ComposableRecordReader instead of RecordReader.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableInputFormat} instead
  */
+@Deprecated
 public interface ComposableInputFormat<K extends WritableComparable,
                                        V extends Writable>
     extends InputFormat<K,V> {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ComposableRecordReader.java Wed Aug 26 15:01:29 2009
@@ -26,7 +26,10 @@
 
 /**
  * Additional operations required of a RecordReader to participate in a join.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ComposableRecordReader} instead
  */
+@Deprecated
 public interface ComposableRecordReader<K extends WritableComparable,
                                  V extends Writable>
     extends RecordReader<K,V>, Comparable<ComposableRecordReader<K,?>> {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Wed Aug 26 15:01:29 2009
@@ -44,7 +44,10 @@
  * in the join.
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat} instead
  */
+@Deprecated
 public class CompositeInputFormat<K extends WritableComparable>
       implements ComposableInputFormat<K,TupleWritable> {
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeInputSplit.java Wed Aug 26 15:01:29 2009
@@ -31,7 +31,11 @@
 /**
  * This InputSplit contains a set of child InputSplits. Any InputSplit inserted
  * into this collection must have a public default constructor.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeInputSplit} instead
  */
+@Deprecated
 public class CompositeInputSplit implements InputSplit {
 
   private int fill = 0;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Wed Aug 26 15:01:29 2009
@@ -34,7 +34,11 @@
 /**
  * A RecordReader that can effect joins of RecordReaders sharing a common key
  * type and partitioning.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.CompositeRecordReader} instead
  */
+@Deprecated
 public abstract class CompositeRecordReader<
     K extends WritableComparable, // key type
     V extends Writable,           // accepts RecordReader<K,V> as children

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/InnerJoinRecordReader.java Wed Aug 26 15:01:29 2009
@@ -26,7 +26,11 @@
 
 /**
  * Full inner join.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.InnerJoinRecordReader} instead.
  */
+@Deprecated
 public class InnerJoinRecordReader<K extends WritableComparable>
     extends JoinRecordReader<K> {
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/JoinRecordReader.java Wed Aug 26 15:01:29 2009
@@ -29,7 +29,10 @@
 
 /**
  * Base class for Composite joins returning Tuples of arbitrary Writables.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.JoinRecordReader} instead
  */
+@Deprecated
 public abstract class JoinRecordReader<K extends WritableComparable>
     extends CompositeRecordReader<K,Writable,TupleWritable>
     implements ComposableRecordReader<K,TupleWritable> {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java Wed Aug 26 15:01:29 2009
@@ -32,7 +32,10 @@
 /**
  * Base class for Composite join returning values derived from multiple
  * sources, but generally not tuples.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.MultiFilterRecordReader} instead
  */
+@Deprecated
 public abstract class MultiFilterRecordReader<K extends WritableComparable,
                                               V extends Writable>
     extends CompositeRecordReader<K,V,V>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OuterJoinRecordReader.java Wed Aug 26 15:01:29 2009
@@ -26,7 +26,11 @@
 
 /**
  * Full outer join.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.OuterJoinRecordReader} instead
  */
+@Deprecated
 public class OuterJoinRecordReader<K extends WritableComparable>
     extends JoinRecordReader<K> {
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/OverrideRecordReader.java Wed Aug 26 15:01:29 2009
@@ -32,7 +32,10 @@
  * For example, <tt>override(S1,S2,S3)</tt> will prefer values
  * from S3 over S2, and values from S2 over S1 for all keys
  * emitted from all sources.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.OverrideRecordReader} instead
  */
+@Deprecated
 public class OverrideRecordReader<K extends WritableComparable,
                                   V extends Writable>
     extends MultiFilterRecordReader<K,V> {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/Parser.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/Parser.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/Parser.java Wed Aug 26 15:01:29 2009
@@ -59,7 +59,9 @@
  * straightforward. One need only override the relevant method(s) (usually only
  * {@link CompositeRecordReader#combine}) and include a property to map its
  * value to an identifier in the parser.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.join.Parser} instead
  */
+@Deprecated
 public class Parser {
   public enum TType { CIF, IDENT, COMMA, LPAREN, RPAREN, QUOT, NUM, }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ResetableIterator.java Wed Aug 26 15:01:29 2009
@@ -17,77 +17,22 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 
 /**
  * This defines an interface to a stateful Iterator that can replay elements
  * added to it directly.
  * Note that this does not extend {@link java.util.Iterator}.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ResetableIterator} instead
  */
-public interface ResetableIterator<T extends Writable> {
+@Deprecated
+public interface ResetableIterator<T extends Writable> 
+    extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator<T> {
 
   public static class EMPTY<U extends Writable>
-    implements ResetableIterator<U> {
-    public boolean hasNext() { return false; }
-    public void reset() { }
-    public void close() throws IOException { }
-    public void clear() { }
-    public boolean next(U val) throws IOException {
-      return false;
-    }
-    public boolean replay(U val) throws IOException {
-      return false;
-    }
-    public void add(U item) throws IOException {
-      throw new UnsupportedOperationException();
-    }
+      extends org.apache.hadoop.mapreduce.lib.join.ResetableIterator.EMPTY<U>
+      implements ResetableIterator<U> {
   }
-
-  /**
-   * True if a call to next may return a value. This is permitted false
-   * positives, but not false negatives.
-   */
-  public boolean hasNext();
-
-  /**
-   * Assign next value to actual.
-   * It is required that elements added to a ResetableIterator be returned in
-   * the same order after a call to {@link #reset} (FIFO).
-   *
-   * Note that a call to this may fail for nested joins (i.e. more elements
-   * available, but none satisfying the constraints of the join)
-   */
-  public boolean next(T val) throws IOException;
-
-  /**
-   * Assign last value returned to actual.
-   */
-  public boolean replay(T val) throws IOException;
-
-  /**
-   * Set iterator to return to the start of its range. Must be called after
-   * calling {@link #add} to avoid a ConcurrentModificationException.
-   */
-  public void reset();
-
-  /**
-   * Add an element to the collection of elements to iterate over.
-   */
-  public void add(T item) throws IOException;
-
-  /**
-   * Close datasources and release resources. Calling methods on the iterator
-   * after calling close has undefined behavior.
-   */
-  // XXX is this necessary?
-  public void close() throws IOException;
-
-  /**
-   * Close datasources, but do not release internal resources. Calling this
-   * method should permit the object to be reused with a different datasource.
-   */
-  public void clear();
-
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/StreamBackedIterator.java Wed Aug 26 15:01:29 2009
@@ -17,83 +17,17 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 
 /**
  * This class provides an implementation of ResetableIterator. This
  * implementation uses a byte array to store elements added to it.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator} instead
  */
+@Deprecated
 public class StreamBackedIterator<X extends Writable>
+    extends org.apache.hadoop.mapreduce.lib.join.StreamBackedIterator<X>
     implements ResetableIterator<X> {
-
-  private static class ReplayableByteInputStream extends ByteArrayInputStream {
-    public ReplayableByteInputStream(byte[] arr) {
-      super(arr);
-    }
-    public void resetStream() {
-      mark = 0;
-      reset();
-    }
-  }
-
-  private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
-  private DataOutputStream outfbuf = new DataOutputStream(outbuf);
-  private ReplayableByteInputStream inbuf;
-  private DataInputStream infbuf;
-
-  public StreamBackedIterator() { }
-
-  public boolean hasNext() {
-    return infbuf != null && inbuf.available() > 0;
-  }
-
-  public boolean next(X val) throws IOException {
-    if (hasNext()) {
-      inbuf.mark(0);
-      val.readFields(infbuf);
-      return true;
-    }
-    return false;
-  }
-
-  public boolean replay(X val) throws IOException {
-    inbuf.reset();
-    if (0 == inbuf.available())
-      return false;
-    val.readFields(infbuf);
-    return true;
-  }
-
-  public void reset() {
-    if (null != outfbuf) {
-      inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
-      infbuf =  new DataInputStream(inbuf);
-      outfbuf = null;
-    }
-    inbuf.resetStream();
-  }
-
-  public void add(X item) throws IOException {
-    item.write(outfbuf);
-  }
-
-  public void close() throws IOException {
-    if (null != infbuf)
-      infbuf.close();
-    if (null != outfbuf)
-      outfbuf.close();
-  }
-
-  public void clear() {
-    if (null != inbuf)
-      inbuf.resetStream();
-    outbuf.reset();
-    outfbuf = new DataOutputStream(outbuf);
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/TupleWritable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/TupleWritable.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/TupleWritable.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/TupleWritable.java Wed Aug 26 15:01:29 2009
@@ -18,16 +18,7 @@
 
 package org.apache.hadoop.mapred.join;
 
-import java.io.DataOutput;
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * Writable type storing multiple {@link org.apache.hadoop.io.Writable}s.
@@ -40,17 +31,19 @@
  * incompatible with, but contrary to the general case.
  *
  * @see org.apache.hadoop.io.Writable
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.TupleWritable} instead
  */
-public class TupleWritable implements Writable, Iterable<Writable> {
-
-  private BitSet written;
-  private Writable[] values;
+@Deprecated
+public class TupleWritable 
+    extends org.apache.hadoop.mapreduce.lib.join.TupleWritable {
 
   /**
    * Create an empty tuple with no allocated storage for writables.
    */
   public TupleWritable() {
-    written = new BitSet(0);
+    super();
   }
 
   /**
@@ -58,148 +51,7 @@
    * &quot;written&quot; values.
    */
   public TupleWritable(Writable[] vals) {
-    written = new BitSet(vals.length);
-    values = vals;
-  }
-
-  /**
-   * Return true if tuple has an element at the position provided.
-   */
-  public boolean has(int i) {
-    return written.get(i);
-  }
-
-  /**
-   * Get ith Writable from Tuple.
-   */
-  public Writable get(int i) {
-    return values[i];
-  }
-
-  /**
-   * The number of children in this Tuple.
-   */
-  public int size() {
-    return values.length;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public boolean equals(Object other) {
-    if (other instanceof TupleWritable) {
-      TupleWritable that = (TupleWritable)other;
-      if (!this.written.equals(that.written)) {
-        return false;
-      }
-      for (int i = 0; i < values.length; ++i) {
-        if (!has(i)) continue;
-        if (!values[i].equals(that.get(i))) {
-          return false;
-        }
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public int hashCode() {
-    assert false : "hashCode not designed";
-    return written.hashCode();
-  }
-
-  /**
-   * Return an iterator over the elements in this tuple.
-   * Note that this doesn't flatten the tuple; one may receive tuples
-   * from this iterator.
-   */
-  public Iterator<Writable> iterator() {
-    final TupleWritable t = this;
-    return new Iterator<Writable>() {
-      int bitIndex = written.nextSetBit(0);
-      public boolean hasNext() {
-        return bitIndex >= 0;
-      }
-      public Writable next() {
-        int returnIndex = bitIndex;
-        if (returnIndex < 0)
-          throw new NoSuchElementException();
-        bitIndex = written.nextSetBit(bitIndex+1);
-        return t.get(returnIndex);
-      }
-      public void remove() {
-        if (!written.get(bitIndex)) {
-          throw new IllegalStateException("Attempt to remove non-existent val");
-        }
-        written.clear(bitIndex);
-      }
-    };
-  }
-
-  /**
-   * Convert Tuple to String as in the following.
-   * <tt>[<child1>,<child2>,...,<childn>]</tt>
-   */
-  public String toString() {
-    StringBuffer buf = new StringBuffer("[");
-    for (int i = 0; i < values.length; ++i) {
-      buf.append(has(i) ? values[i].toString() : "");
-      buf.append(",");
-    }
-    if (values.length != 0)
-      buf.setCharAt(buf.length() - 1, ']');
-    else
-      buf.append(']');
-    return buf.toString();
-  }
-
-  // Writable
-
-  /** Writes each Writable to <code>out</code>.
-   * TupleWritable format:
-   * {@code
-   *  <count><type1><type2>...<typen><obj1><obj2>...<objn>
-   * }
-   */
-  public void write(DataOutput out) throws IOException {
-    WritableUtils.writeVInt(out, values.length);
-    writeBitSet(out, values.length, written);
-    for (int i = 0; i < values.length; ++i) {
-      Text.writeString(out, values[i].getClass().getName());
-    }
-    for (int i = 0; i < values.length; ++i) {
-      if (has(i)) {
-        values[i].write(out);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @SuppressWarnings("unchecked") // No static typeinfo on Tuples
-  public void readFields(DataInput in) throws IOException {
-    int card = WritableUtils.readVInt(in);
-    values = new Writable[card];
-    readBitSet(in, card, written);
-    Class<? extends Writable>[] cls = new Class[card];
-    try {
-      for (int i = 0; i < card; ++i) {
-        cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
-      }
-      for (int i = 0; i < card; ++i) {
-          values[i] = cls[i].newInstance();
-        if (has(i)) {
-          values[i].readFields(in);
-        }
-      }
-    } catch (ClassNotFoundException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    } catch (IllegalAccessException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    } catch (InstantiationException e) {
-      throw (IOException)new IOException("Failed tuple init").initCause(e);
-    }
+    super(vals);
   }
 
   /**
@@ -225,64 +77,5 @@
     written.clear();
   }
 
-  /**
-   * Writes the bit set to the stream. The first 64 bit-positions of the bit set
-   * are written as a VLong for backwards-compatibility with older versions of
-   * TupleWritable. All bit-positions >= 64 are encoded as a byte for every 8
-   * bit-positions.
-   */
-  private static final void writeBitSet(DataOutput stream, int nbits, BitSet bitSet)
-      throws IOException {
-    long bits = 0L;
-        
-    int bitSetIndex = bitSet.nextSetBit(0);
-    for (;bitSetIndex >= 0 && bitSetIndex < Long.SIZE;
-            bitSetIndex=bitSet.nextSetBit(bitSetIndex+1)) {
-      bits |= 1L << bitSetIndex;
-    }
-    WritableUtils.writeVLong(stream,bits);
-    
-    if (nbits > Long.SIZE) {
-      bits = 0L;
-      for (int lastWordWritten = 0; bitSetIndex >= 0 && bitSetIndex < nbits; 
-              bitSetIndex = bitSet.nextSetBit(bitSetIndex+1)) {
-        int bitsIndex = bitSetIndex % Byte.SIZE;
-        int word = (bitSetIndex-Long.SIZE) / Byte.SIZE;
-        if (word > lastWordWritten) {
-          stream.writeByte((byte)bits);
-          bits = 0L;
-          for (lastWordWritten++;lastWordWritten<word;lastWordWritten++) {
-            stream.writeByte((byte)bits);
-          }
-        }
-        bits |= 1L << bitsIndex;
-      }
-      stream.writeByte((byte)bits);
-    }
-  }
 
-  /**
-   * Reads a bitset from the stream that has been written with
-   * {@link #writeBitSet(DataOutput, int, BitSet)}.
-   */
-  private static final void readBitSet(DataInput stream, int nbits, 
-      BitSet bitSet) throws IOException {
-    bitSet.clear();
-    long initialBits = WritableUtils.readVLong(stream);
-    long last = 0L;
-    while (0L != initialBits) {
-      last = Long.lowestOneBit(initialBits);
-      initialBits ^= last;
-      bitSet.set(Long.numberOfTrailingZeros(last));
-    }
-    
-    for (int offset=Long.SIZE; offset < nbits; offset+=Byte.SIZE) {
-      byte bits = stream.readByte();
-      while (0 != bits) {
-        last = Long.lowestOneBit(bits);
-        bits ^= last;
-        bitSet.set(Long.numberOfTrailingZeros(last) + offset);
-      }
-    }
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Wed Aug 26 15:01:29 2009
@@ -31,7 +31,10 @@
  * This class keeps track of the &quot;head&quot; key-value pair for the
  * provided RecordReader and keeps a store of values matching a key when
  * this source is participating in a join.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.WrappedRecordReader} instead
  */
+@Deprecated
 public class WrappedRecordReader<K extends WritableComparable,
                           U extends Writable>
     implements ComposableRecordReader<K,U> {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Wed Aug 26 15:01:29 2009
@@ -25,14 +25,18 @@
 import java.net.URLClassLoader;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Iterator;
 import java.util.StringTokenizer;
 
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -323,28 +327,22 @@
    * A command line parser for the CLI-based Pipes job submitter.
    */
   static class CommandLineParser {
-    private DefaultOptionBuilder option = 
-      new DefaultOptionBuilder("-","-", false);
-    private ArgumentBuilder arg = new ArgumentBuilder();
-    private GroupBuilder optionList = new GroupBuilder();
+    private Options options = new Options();
     
     void addOption(String longName, boolean required, String description, 
                    String paramName) {
-      arg.withName(paramName).withMinimum(1).withMaximum(1);
-      optionList.withOption(option.withLongName(longName).
-                                   withArgument(arg.create()).
-                                   withDescription(description).
-                                   withRequired(required).create());
+      Option option = OptionBuilder.withArgName(paramName).hasArgs(1).withDescription(description).isRequired(required).create(longName);
+      options.addOption(option);
     }
     
     void addArgument(String name, boolean required, String description) {
-      arg.withName(name).withMinimum(1).withMaximum(1);
-      optionList.withOption(arg.create());
+      Option option = OptionBuilder.withArgName(name).hasArgs(1).withDescription(description).isRequired(required).create();
+      options.addOption(option);
+
     }
 
     Parser createParser() {
-      Parser result = new Parser();
-      result.setGroup(optionList.create());
+      Parser result = new BasicParser();
       return result;
     }
     
@@ -362,7 +360,7 @@
       System.out.println("  [-writer <class>] // Java RecordWriter");
       System.out.println("  [-program <executable>] // executable URI");
       System.out.println("  [-reduces <num>] // number of reduces");
-      System.out.println("  [-lazyOutput] // createOutputLazily");
+      System.out.println("  [-lazyOutput <true/false>] // createOutputLazily");
       System.out.println();
       GenericOptionsParser.printGenericCommandUsage(System.out);
     }
@@ -373,7 +371,7 @@
                                           JobConf conf, 
                                           Class<InterfaceType> cls
                                          ) throws ClassNotFoundException {
-    return conf.getClassByName((String) cl.getValue(key)).asSubclass(cls);
+    return conf.getClassByName((String) cl.getOptionValue(key)).asSubclass(cls);
   }
 
   @Override
@@ -406,64 +404,64 @@
     try {
       
       GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args);
-      CommandLine results = parser.parse(genericParser.getRemainingArgs());
+      CommandLine results = parser.parse(cli.options, genericParser.getRemainingArgs());
       
       JobConf job = new JobConf(getConf());
       
-      if (results.hasOption("-input")) {
+      if (results.hasOption("input")) {
         FileInputFormat.setInputPaths(job, 
-                          (String) results.getValue("-input"));
+                          (String) results.getOptionValue("input"));
       }
-      if (results.hasOption("-output")) {
+      if (results.hasOption("output")) {
         FileOutputFormat.setOutputPath(job, 
-          new Path((String) results.getValue("-output")));
+          new Path((String) results.getOptionValue("output")));
       }
-      if (results.hasOption("-jar")) {
-        job.setJar((String) results.getValue("-jar"));
+      if (results.hasOption("jar")) {
+        job.setJar((String) results.getOptionValue("jar"));
       }
-      if (results.hasOption("-inputformat")) {
+      if (results.hasOption("inputformat")) {
         setIsJavaRecordReader(job, true);
-        job.setInputFormat(getClass(results, "-inputformat", job,
+        job.setInputFormat(getClass(results, "inputformat", job,
                                      InputFormat.class));
       }
-      if (results.hasOption("-javareader")) {
+      if (results.hasOption("javareader")) {
         setIsJavaRecordReader(job, true);
       }
-      if (results.hasOption("-map")) {
+      if (results.hasOption("map")) {
         setIsJavaMapper(job, true);
-        job.setMapperClass(getClass(results, "-map", job, Mapper.class));
+        job.setMapperClass(getClass(results, "map", job, Mapper.class));
       }
-      if (results.hasOption("-partitioner")) {
-        job.setPartitionerClass(getClass(results, "-partitioner", job,
+      if (results.hasOption("partitioner")) {
+        job.setPartitionerClass(getClass(results, "partitioner", job,
                                           Partitioner.class));
       }
-      if (results.hasOption("-reduce")) {
+      if (results.hasOption("reduce")) {
         setIsJavaReducer(job, true);
-        job.setReducerClass(getClass(results, "-reduce", job, Reducer.class));
+        job.setReducerClass(getClass(results, "reduce", job, Reducer.class));
       }
-      if (results.hasOption("-reduces")) {
+      if (results.hasOption("reduces")) {
         job.setNumReduceTasks(Integer.parseInt((String) 
-                                                results.getValue("-reduces")));
+                                           results.getOptionValue("reduces")));
       }
-      if (results.hasOption("-writer")) {
+      if (results.hasOption("writer")) {
         setIsJavaRecordWriter(job, true);
-        job.setOutputFormat(getClass(results, "-writer", job, 
+        job.setOutputFormat(getClass(results, "writer", job, 
                                       OutputFormat.class));
       }
       
-      if (results.hasOption("-lazyOutput")) {
-        if (Boolean.parseBoolean((String)results.getValue("-lazyOutput"))) {
+      if (results.hasOption("lazyOutput")) {
+        if (Boolean.parseBoolean((String)results.getOptionValue("lazyOutput"))) {
           LazyOutputFormat.setOutputFormatClass(job,
               job.getOutputFormat().getClass());
         }
       }
       
-      if (results.hasOption("-program")) {
-        setExecutable(job, (String) results.getValue("-program"));
+      if (results.hasOption("program")) {
+        setExecutable(job, (String) results.getOptionValue("program"));
       }
-      if (results.hasOption("-jobconf")) {
+      if (results.hasOption("jobconf")) {
         LOG.warn("-jobconf option is deprecated, please use -D instead.");
-        String options = (String)results.getValue("-jobconf");
+        String options = (String)results.getOptionValue("jobconf");
         StringTokenizer tokenizer = new StringTokenizer(options, ",");
         while (tokenizer.hasMoreTokens()) {
           String keyVal = tokenizer.nextToken().trim();
@@ -491,7 +489,8 @@
       
       runJob(job);
       return 0;
-    } catch (OptionException oe) {
+    } catch (ParseException pe) {
+      LOG.info("Error : " + pe);
       cli.printUsage();
       return 1;
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Counters.java Wed Aug 26 15:01:29 2009
@@ -37,11 +37,7 @@
   }
 
   public Counter findCounter(String groupName, String counterName) {
-    CounterGroup grp = groups.get(groupName);
-    if (grp == null) {
-      grp = new CounterGroup(groupName);
-      groups.put(groupName, grp);
-    }
+    CounterGroup grp = getGroup(groupName);
     return grp.findCounter(counterName);
   }
 
@@ -78,7 +74,12 @@
    * with the specified name.
    */
   public synchronized CounterGroup getGroup(String groupName) {
-    return groups.get(groupName);
+    CounterGroup grp = groups.get(groupName);
+    if (grp == null) {
+      grp = new CounterGroup(groupName);
+      groups.put(groupName, grp);
+    }
+    return grp;
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Wed Aug 26 15:01:29 2009
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.mapreduce.filecache;
 
-import org.apache.commons.logging.*;
 import java.io.*;
 import java.util.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 
 import java.net.URI;
 
@@ -35,7 +35,7 @@
  * </p>
  * 
  * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
- * via the org.apache.hadoop.mapred.JobConf. The
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
  * <code>DistributedCache</code> assumes that the files specified via urls are
  * already present on the {@link FileSystem} at the path specified by the url
  * and are accessible by every machine in the cluster.</p>
@@ -82,8 +82,8 @@
  *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
  *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
  *     
- *     3. Use the cached files in the org.apache.hadoop.mapred.Mapper
- *     or org.apache.hadoop.mapred.Reducer:
+ *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
+ *     or {@link org.apache.hadoop.mapred.Reducer}:
  *     
  *     public static class MapClass extends MapReduceBase  
  *     implements Mapper&lt;K, V, K, V&gt; {
@@ -108,20 +108,24 @@
  *     }
  *     
  * </pre></blockquote></p>
- * 
+ *
+ * It is also very common to use the DistributedCache by using
+ * {@link org.apache.hadoop.util.GenericOptionsParser}.
+ *
+ * This class includes methods that should be used by users
+ * (specifically those mentioned in the example above, as well
+ * as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}),
+ * as well as methods intended for use by the MapReduce framework
+ * (e.g., {@link org.apache.hadoop.mapred.JobClient}).  For implementation
+ * details, see {@link TrackerDistributedCacheManager} and 
+ * {@link TaskDistributedCacheManager}.
+ *
+ * @see TrackerDistributedCacheManager
+ * @see TaskDistributedCacheManager
+ * @see org.apache.hadoop.mapred.JobConf
+ * @see org.apache.hadoop.mapred.JobClient
  */
 public class DistributedCache {
-  // cacheID to cacheStatus mapping
-  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
-  
-  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
-  
-  // default total cache size
-  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
-
-  private static final Log LOG =
-    LogFactory.getLog(DistributedCache.class);
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -144,15 +148,18 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, FileStatus fileStatus,
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
         confFileStamp, currentWorkDir, true);
   }
+
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -178,48 +185,19 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
       Path baseDir, FileStatus fileStatus,
       boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) 
-  throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    CacheStatus lcacheStatus;
-    Path localizedPath;
-    synchronized (cachedArchives) {
-      lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null) {
-        // was never localized
-        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
-        cachedArchives.put(cacheId, lcacheStatus);
-      }
+      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
 
-      synchronized (lcacheStatus) {
-        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
-        lcacheStatus.refcount++;
-      }
-    }
-
-    // try deleting stuff if you can
-    long size = 0;
-    synchronized (baseDirSize) {
-      Long get = baseDirSize.get(baseDir);
-      if ( get != null ) {
-    	size = get.longValue();
-      }
-    }
-    // setting the cache size to a default of 10GB
-    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
-    if (allowedSize < size) {
-      // try some cache deletions
-      deleteCache(conf);
-    }
-    return localizedPath;
+    return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
+        baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+        honorSymLinkConf);
   }
 
-  
   /**
    * Get the locally cached file or archive; it could either be 
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
@@ -241,17 +219,18 @@
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
-
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static Path getLocalCache(URI cache, Configuration conf, 
                                    Path baseDir, boolean isArchive,
                                    long confFileStamp, Path currentWorkDir) 
-  throws IOException {
+      throws IOException {
     return getLocalCache(cache, conf, 
                          baseDir, null, isArchive,
                          confFileStamp, currentWorkDir);
   }
-  
+
   /**
    * This is the opposite of getlocalcache. When you are done with
    * using the cache, you need to release the cache
@@ -259,46 +238,14 @@
    * @param conf configuration which contains the filesystem the cache 
    * is contained in.
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void releaseCache(URI cache, Configuration conf)
-    throws IOException {
-    String cacheId = makeRelative(cache, conf);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-      if (lcacheStatus == null)
-        return;
-      synchronized (lcacheStatus) {
-        lcacheStatus.refcount--;
-      }
-    }
+      throws IOException {
+    new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
   }
   
-  // To delete the caches which have a refcount of zero
-  
-  private static void deleteCache(Configuration conf) throws IOException {
-    // try deleting cache Status with refcount of zero
-    synchronized (cachedArchives) {
-      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
-        String cacheId = (String) it.next();
-        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
-        synchronized (lcacheStatus) {
-          if (lcacheStatus.refcount == 0) {
-            // delete this cache entry
-            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
-            synchronized (baseDirSize) {
-              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
-              if ( dirSize != null ) {
-            	dirSize -= lcacheStatus.size;
-            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
-              }
-            }
-            it.remove();
-          }
-        }
-      }
-    }
-  }
-
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -306,189 +253,17 @@
    * hostname/absolute path -- if it is just /absolute_path -- then the
    * relative path is hostname of DFS this mapred cluster is running
    * on/absolute_path
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static String makeRelative(URI cache, Configuration conf)
-    throws IOException {
-    String host = cache.getHost();
-    if (host == null) {
-      host = cache.getScheme();
-    }
-    if (host == null) {
-      URI defaultUri = FileSystem.get(conf).getUri();
-      host = defaultUri.getHost();
-      if (host == null) {
-        host = defaultUri.getScheme();
-      }
-    }
-    String path = host + cache.getPath();
-    path = path.replace(":/","/");                // remove windows device colon
-    return path;
-  }
-
-  private static Path cacheFilePath(Path p) {
-    return new Path(p, p.getName());
-  }
-
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  private static Path localizeCache(Configuration conf, 
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    FileStatus fileStatus,
-                                    boolean isArchive, 
-                                    Path currentWorkDir,boolean honorSymLinkConf) 
-  throws IOException {
-    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
-    if(cache.getFragment() == null) {
-    	doSymlink = false;
-    }
-    FileSystem fs = FileSystem.get(cache, conf);
-    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
-                           cacheStatus, fileStatus)) {
-      if (isArchive) {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                             link);
-        }
-        return cacheStatus.localLoadPath;
-      }
-      else {
-        if (doSymlink){
-          if (!flink.exists())
-            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                             link);
-        }
-        return cacheFilePath(cacheStatus.localLoadPath);
-      }
-    } else {
-      // remove the old archive
-      // if the old archive cannot be removed since it is being used by another
-      // job
-      // return null
-      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
-        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
-                              + " is in use and cannot be refreshed");
-      
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(cacheStatus.localLoadPath, true);
-      synchronized (baseDirSize) {
-    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-    	if ( dirSize != null ) {
-    	  dirSize -= cacheStatus.size;
-    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
-    	}
-      }
-      Path parchive = new Path(cacheStatus.localLoadPath,
-                               new Path(cacheStatus.localLoadPath.getName()));
-      
-      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
-        throw new IOException("Mkdirs failed to create directory " + 
-                              cacheStatus.localLoadPath.toString());
-      }
-
-      String cacheId = cache.getPath();
-      fs.copyToLocalFile(new Path(cacheId), parchive);
-      if (isArchive) {
-        String tmpArchive = parchive.toString().toLowerCase();
-        File srcFile = new File(parchive.toString());
-        File destDir = new File(parchive.getParent().toString());
-        if (tmpArchive.endsWith(".jar")) {
-          RunJar.unJar(srcFile, destDir);
-        } else if (tmpArchive.endsWith(".zip")) {
-          FileUtil.unZip(srcFile, destDir);
-        } else if (isTarFile(tmpArchive)) {
-          FileUtil.unTar(srcFile, destDir);
-        }
-        // else will not do anyhting
-        // and copy the file into the dir as it is
-      }
-      
-      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
-      cacheStatus.size = cacheSize;
-      synchronized (baseDirSize) {
-      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
-      	if( dirSize == null ) {
-      	  dirSize = Long.valueOf(cacheSize);
-      	} else {
-      	  dirSize += cacheSize;
-      	}
-      	baseDirSize.put(cacheStatus.baseDir, dirSize);
-      }
-      
-      // do chmod here 
-      try {
-        //Setting recursive permission to grant everyone read and execute
-        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
-      } catch(InterruptedException e) {
-    	LOG.warn("Exception in chmod" + e.toString());
-      }
-
-      // update cacheStatus to reflect the newly cached file
-      cacheStatus.currentStatus = true;
-      cacheStatus.mtime = getTimestamp(conf, cache);
-    }
-    
-    if (isArchive){
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
-                           link);
-      }
-      return cacheStatus.localLoadPath;
-    }
-    else {
-      if (doSymlink){
-        if (!flink.exists())
-          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
-                           link);
-      }
-      return cacheFilePath(cacheStatus.localLoadPath);
-    }
-  }
-
-  private static boolean isTarFile(String filename) {
-    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
-           filename.endsWith(".tar"));
-  }
-  
-  // Checks if the cache has already been localized and is fresh
-  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
-                                          URI cache, long confFileStamp, 
-                                          CacheStatus lcacheStatus,
-                                          FileStatus fileStatus) 
-  throws IOException {
-    // check for existence of the cache
-    if (lcacheStatus.currentStatus == false) {
-      return false;
-    } else {
-      long dfsFileStamp;
-      if (fileStatus != null) {
-        dfsFileStamp = fileStatus.getModificationTime();
-      } else {
-        dfsFileStamp = getTimestamp(conf, cache);
-      }
-
-      // ensure that the file on hdfs hasn't been modified since the job started 
-      if (dfsFileStamp != confFileStamp) {
-        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
-        throw new IOException("File: " + cache + 
-                              " has changed on HDFS since job started");
-      }
-      
-      if (dfsFileStamp != lcacheStatus.mtime) {
-        // needs refreshing
-        return false;
-      }
-    }
-    
-    return true;
+      throws IOException {
+    return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
   }
 
   /**
    * Returns mtime of a given cache file on hdfs.
+   * 
    * @param conf configuration
    * @param cache cache file 
    * @return mtime of a given cache file on hdfs
@@ -501,32 +276,24 @@
 
     return fileSystem.getFileStatus(filePath).getModificationTime();
   }
-  
+
   /**
    * This method create symlinks for all files in a given dir in another directory
    * @param conf the configuration
    * @param jobCacheDir the target directory for creating symlinks
    * @param workDir the directory in which the symlinks are created
    * @throws IOException
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
-    throws IOException{
-    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
-           workDir == null || (!workDir.isDirectory())) {
-      return;
-    }
-    boolean createSymlink = getSymlink(conf);
-    if (createSymlink){
-      File[] list = jobCacheDir.listFiles();
-      for (int i=0; i < list.length; i++){
-        FileUtil.symLink(list[i].getAbsolutePath(),
-                         new File(workDir, list[i].getName()).toString());
-      }
-    }  
+      throws IOException{
+    TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
   }
   
   /**
-   * Set the configuration with the given set of archives
+   * Set the configuration with the given set of archives.  Intended
+   * to be used by user code.
    * @param archives The list of archives that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -536,7 +303,8 @@
   }
 
   /**
-   * Set the configuration with the given set of files
+   * Set the configuration with the given set of files.  Intended to be
+   * used by user code.
    * @param files The list of files that need to be localized
    * @param conf Configuration which will be changed
    */
@@ -546,7 +314,8 @@
   }
 
   /**
-   * Get cache archives set in the Configuration
+   * Get cache archives set in the Configuration.  Used by
+   * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
    * @return A URI array of the caches set in the Configuration
    * @throws IOException
@@ -556,18 +325,19 @@
   }
 
   /**
-   * Get cache files set in the Configuration
+   * Get cache files set in the Configuration.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
    * @return A URI array of the files set in the Configuration
    * @throws IOException
    */
-
   public static URI[] getCacheFiles(Configuration conf) throws IOException {
     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
   }
 
   /**
-   * Return the path array of the localized caches
+   * Return the path array of the localized caches.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized archives
    * @return A path array of localized caches
    * @throws IOException
@@ -579,7 +349,8 @@
   }
 
   /**
-   * Return the path array of the localized files
+   * Return the path array of the localized files.  Intended to be used
+   * by user code.
    * @param conf Configuration that contains the localized files
    * @return A path array of localized files
    * @throws IOException
@@ -590,7 +361,8 @@
   }
 
   /**
-   * Get the timestamps of the archives
+   * Get the timestamps of the archives.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -601,7 +373,8 @@
 
 
   /**
-   * Get the timestamps of the files
+   * Get the timestamps of the files.  Used by internal
+   * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
    * @return a string array of timestamps 
    * @throws IOException
@@ -611,7 +384,8 @@
   }
 
   /**
-   * This is to check the timestamp of the archives to be localized
+   * This is to check the timestamp of the archives to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of archives.
    * The order should be the same as the order in which the archives are added.
@@ -621,7 +395,8 @@
   }
 
   /**
-   * This is to check the timestamp of the files to be localized
+   * This is to check the timestamp of the files to be localized.
+   * Used by internal MapReduce code.
    * @param conf Configuration which stores the timestamp's
    * @param timestamps comma separated list of timestamps of files.
    * The order should be the same as the order in which the files are added.
@@ -631,7 +406,8 @@
   }
   
   /**
-   * Set the conf to contain the location for localized archives 
+   * Set the conf to contain the location for localized archives.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local archives
    */
@@ -640,7 +416,8 @@
   }
 
   /**
-   * Set the conf to contain the location for localized files 
+   * Set the conf to contain the location for localized files.  Used
+   * by internal DistributedCache code.
    * @param conf The conf to modify to contain the localized caches
    * @param str a comma separated list of local files
    */
@@ -649,7 +426,8 @@
   }
 
   /**
-   * Add a archives to be localized to the conf
+   * Add a archives to be localized to the conf.  Intended to
+   * be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -660,7 +438,8 @@
   }
   
   /**
-   * Add a file to be localized to the conf
+   * Add a file to be localized to the conf.  Intended
+   * to be used by user code.
    * @param uri The uri of the cache to be localized
    * @param conf Configuration to add the cache to
    */
@@ -672,7 +451,7 @@
 
   /**
    * Add an file path to the current set of classpath entries It adds the file
-   * to cache as well.
+   * to cache as well.  Intended to be used by user code.
    * 
    * @param file Path of the file to be added
    * @param conf Configuration that contains the classpath setting
@@ -689,7 +468,8 @@
   }
 
   /**
-   * Get the file entries in classpath as an array of Path
+   * Get the file entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -708,7 +488,7 @@
 
   /**
    * Add an archive path to the current set of classpath entries. It adds the
-   * archive to cache as well.
+   * archive to cache as well.  Intended to be used by user code.
    * 
    * @param archive Path of the archive to be added
    * @param conf Configuration that contains the classpath setting
@@ -725,7 +505,8 @@
   }
 
   /**
-   * Get the archive entries in classpath as an array of Path
+   * Get the archive entries in classpath as an array of Path.
+   * Used by internal DistributedCache code.
    * 
    * @param conf Configuration that contains the classpath setting
    */
@@ -744,7 +525,8 @@
 
   /**
    * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * of the task to all the cache files/archives.
+   * Intended to be used by user code.
    * @param conf the jobconf 
    */
   public static void createSymlink(Configuration conf){
@@ -754,6 +536,7 @@
   /**
    * This method checks to see if symlinks are to be create for the 
    * localized cache files in the current working directory 
+   * Used by internal DistributedCache code.
    * @param conf the jobconf
    * @return true if symlinks are to be created- else return false
    */
@@ -769,7 +552,7 @@
    * This method checks if there is a conflict in the fragment names 
    * of the uris. Also makes sure that each uri has a fragment. It 
    * is only to be called if you want to create symlinks for 
-   * the various archives and files.
+   * the various archives and files.  May be used by user code.
    * @param uriFiles The uri array of urifiles
    * @param uriArchives the uri array of uri archives
    */
@@ -811,52 +594,14 @@
     return true;
   }
 
-  private static class CacheStatus {
-    // false, not loaded yet, true is loaded
-    boolean currentStatus;
-
-    // the local load path of this cache
-    Path localLoadPath;
-    
-    //the base dir where the cache lies
-    Path baseDir;
-    
-    //the size of this cache
-    long size;
-
-    // number of instances using this cache
-    int refcount;
-
-    // the cache-file modification time
-    long mtime;
-
-    public CacheStatus(Path baseDir, Path localLoadPath) {
-      super();
-      this.currentStatus = false;
-      this.localLoadPath = localLoadPath;
-      this.refcount = 0;
-      this.mtime = -1;
-      this.baseDir = baseDir;
-      this.size = 0;
-    }
-  }
-
   /**
    * Clear the entire contents of the cache and delete the backing files. This
    * should only be used when the server is reinitializing, because the users
    * are going to lose their files.
+   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
+   * instead.
    */
   public static void purgeCache(Configuration conf) throws IOException {
-    synchronized (cachedArchives) {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
-        try {
-          localFs.delete(f.getValue().localLoadPath, true);
-        } catch (IOException ie) {
-          LOG.debug("Error cleaning up cache", ie);
-        }
-      }
-      cachedArchives.clear();
-    }
+    new TrackerDistributedCacheManager(conf).purgeCache();
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests Wed Aug 26 15:01:29 2009
@@ -36,4 +36,6 @@
 **/TestTextOutputFormat.java
 **/TestTrackerBlacklistAcrossJobs.java
 **/TestTaskTrackerBlacklisting.java
+**/TestTaskTrackerLocalization
+**/TestTrackerDistributedCacheManager
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml Wed Aug 26 15:01:29 2009
@@ -53,6 +53,10 @@
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>
      <Match>
+       <Class name="~org.apache.hadoop.mapred.join.*" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
+     <Match>
        <Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>
@@ -80,6 +84,11 @@
        <Bug pattern="DM_EXIT" />
      </Match>
      <Match>
+       <Class name="org.apache.hadoop.mapred.JobTracker" />
+       <Field name="clock" />
+       <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
+     </Match>
+     <Match>
        <Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
        <Method name="run" />
        <Bug pattern="DM_EXIT" />

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:01:29 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
 /hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:804974-805826
+/hadoop/mapreduce/trunk/src/test/mapred:804974-807678

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml Wed Aug 26 15:01:29 2009
@@ -14,5 +14,9 @@
   <value>hosts.exclude</value>
   <description></description>
 </property>
-
+<property>
+  <name>mapred.job.tracker.retire.jobs</name>
+  <value>false</value>
+  <description></description>
+</property>
 </configuration>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java Wed Aug 26 15:01:29 2009
@@ -55,7 +55,7 @@
     // Check if the input path exists and is non-empty
     if (fs.exists(dir)) {
       FileStatus[] list = fs.listStatus(dir);
-      if (list != null && list.length > 0) {
+      if (list.length > 0) {
         throw new IOException("Input path: " + dir + " already exists... ");
       }
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Wed Aug 26 15:01:29 2009
@@ -22,6 +22,8 @@
 import java.util.Collection;
 import java.util.Iterator;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +53,7 @@
      * This job tracker starts itself in its constructor
      */
     FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException, 
-    InterruptedException {
+    InterruptedException, LoginException {
       super(conf, clock);
       this.trackers = tts;
       //initialize max{Map/Reduce} task capacities to twice the clustersize

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Wed Aug 26 15:01:29 2009
@@ -19,7 +19,9 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 
@@ -67,7 +69,7 @@
     private volatile boolean isActive = true;
     
     JobConf jc = null;
-    Clock clock = null;
+    Clock clock = JobTracker.DEFAULT_CLOCK;
         
     public JobTrackerRunner(JobConf conf) {
       jc = conf;
@@ -108,11 +110,9 @@
         jc.set("mapred.local.dir",f.getAbsolutePath());
         jc.setClass("topology.node.switch.mapping.impl", 
             StaticMapping.class, DNSToSwitchMapping.class);
-        if (clock == null) {
-          tracker = JobTracker.startTracker(jc);
-        } else {
-          tracker = JobTracker.startTracker(jc, clock);
-        }
+        String id = 
+          new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
+        tracker = JobTracker.startTracker(jc, clock, id);
         tracker.offerService();
       } catch (Throwable e) {
         LOG.error("Job tracker crashed", e);
@@ -328,6 +328,13 @@
     if(conf == null) {
       conf = new JobConf();
     }
+    return configureJobConf(conf, namenode, jobTrackerPort, jobTrackerInfoPort, 
+                            ugi);
+  }
+  
+  static JobConf configureJobConf(JobConf conf, String namenode, 
+                                  int jobTrackerPort, int jobTrackerInfoPort, 
+                                  UnixUserGroupInformation ugi) {
     JobConf result = new JobConf(conf);
     FileSystem.setDefaultUri(result, namenode);
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
@@ -573,11 +580,20 @@
    * Start the jobtracker.
    */
   public void startJobTracker() {
+    startJobTracker(true);
+  }
+
+  public void startJobTracker(boolean wait) {
     //  Create the JobTracker
     jobTracker = new JobTrackerRunner(conf, clock);
     jobTrackerThread = new Thread(jobTracker);
         
     jobTrackerThread.start();
+
+    if (!wait) {
+      return;
+    }
+
     while (jobTracker.isActive() && !jobTracker.isUp()) {
       try {                                     // let daemons get started
         Thread.sleep(1000);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java Wed Aug 26 15:01:29 2009
@@ -137,6 +137,7 @@
     
     //validate skipped records
     Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
+    assertNotNull(skipDir);
     Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir));
     List<String> mapSkipped = new ArrayList<String>();
     List<String> redSkipped = new ArrayList<String>();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java Wed Aug 26 15:01:29 2009
@@ -19,6 +19,7 @@
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.UUID;
 
@@ -89,13 +90,17 @@
   private static int countSideEffectFiles(JobConf conf, final String prefix)
       throws IOException {
     FileSystem localFs = FileSystem.getLocal(conf);
-    FileStatus[] files = localFs.listStatus(
-        new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
-      @Override public boolean accept(Path path) {
-        return path.getName().startsWith(prefix + "-");
-      }
-    });
-    return files.length;
+    try {
+      FileStatus[] files = localFs.listStatus(
+          new Path(conf.get(SIDE_EFFECT_DIR_PROPERTY)), new PathFilter() {
+        @Override public boolean accept(Path path) {
+          return path.getName().startsWith(prefix + "-");
+        }
+      });
+      return files.length;
+    } catch (FileNotFoundException fnfe) {
+      return 0;
+    }
   }
 
   private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Wed Aug 26 15:01:29 2009
@@ -22,296 +22,70 @@
 import java.io.IOException;
 
 import junit.framework.TestCase;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 
 /**
- * Test if the job expiry works fine. 
+ * Test if the job retire works fine. 
  */
 public class TestJobRetire extends TestCase {
   static final Path testDir = 
     new Path(System.getProperty("test.build.data","/tmp"), 
              "job-expiry-testing");
-  private static final Log LOG = LogFactory.getLog(TestJobRetire.class);
-
-  private void testJobConfFile(JobID id, boolean exists) {
-    // get the job conf filename
-    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
-    File file = new File(name);
- 
-    assertEquals("JobConf file check failed", exists, file.exists());
-  }
-
-  /** Test if the job after completion waits for atleast 
-   *  mapred.jobtracker.retirejob.interval.min amount of time.
-   */
-  public void testMinIntervalBeforeRetire() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 5000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
-
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); //10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 5 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
-      Path inDir = new Path(testDir, "input");
-      Path outDir = new Path(testDir, "output");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is there in the memory for min time
-      assertTrue(rj.isSuccessful());
-
-      // snapshot expiry thread count
-      int snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(max + 1); // adv to expiry max time
-
-      // wait for the thread to run
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
-
-      assertNotNull(jc.getJob(id));
-
-      //check that the job is not retired
-      assertFalse(jc.getJob(id).isRetired());
-      
-      // snapshot expiry thread count
-      snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(min - max); // adv to expiry min time
-
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
-
-      // check if the job is missing
-      assertNull(jc.getJob(id));
-      assertNull(jobtracker.getJob(id));
-
-      testJobConfFile(id, false);
-    } finally {
-      if (mr != null) { mr.shutdown();}
-    }
-  }
 
-  /** Test if the job after completion get expired after
-   *  mapred.jobtracker.retirejob.interval amount after the time.
-   */
   public void testJobRetire() throws Exception {
     MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
     try {
-      FakeClock clock = new FakeClock();
       JobConf conf = new JobConf();
 
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
+      conf.setBoolean("mapred.job.tracker.retire.jobs", true);
+      conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
+      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
       JobConf jobConf = mr.createJobConf();
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
       
       Path inDir = new Path(testDir, "input1");
       Path outDir = new Path(testDir, "output1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is there in the memory for min time
-      assertTrue(rj.isSuccessful());
-
-      // snapshot expiry thread count
-      int snapshot = jobtracker.retireJobs.runCount;
-      clock.advance(max + 1); // adv to expiry max time
-
-      while (jobtracker.retireJobs.runCount == snapshot) {
-        // wait for the thread to run
-        UtilsForTests.waitFor(1000);
-      }
- 
-      // check if the job is missing
-      assertNull(jc.getJob(id));
-      assertNull(jobtracker.getJob(id));
 
-      testJobConfFile(id, false);
-    } finally {
-      if (mr != null) { mr.shutdown();}
-    }
-  }
+      JobID id1 = validateJobRetire(jobConf, inDir, outDir, jobtracker);
 
-  /** Test if the job after gets expired after
-   *  mapred.jobtracker.completeuserjobs.maximum jobs.
-   */
-  public void testMaxJobRetire() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
-      
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 0);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0, 
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-      
-      Path inDir = new Path(testDir, "input2.1");
-      Path outDir = new Path(testDir, "output2.1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is successful
-      assertTrue(rj.isSuccessful());
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input2.2");
-      outDir = new Path(testDir, "output2.2");
-      RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj2.waitForCompletion();
-      JobID id2 = rj2.getID();
-
-      // check if the job#1 is missing
-      assertNull(jc.getJob(id));
-      assertNull("Job still not missing from jobtracker", jobtracker.getJob(id));
-      
-      // check if the job#2 exists
-      assertNotNull(jc.getJob(id2));
-      assertNotNull("Job " + id2 + " missing at the jobtracker before expiry", 
-                    jobtracker.getJob(id2));
+      outDir = new Path(testDir, "output2");
+      JobID id2 = validateJobRetire(jobConf, inDir, outDir, jobtracker);
 
-      testJobConfFile(id, false);
-      testJobConfFile(id2, true);
+      assertNull("Job not removed from cache", jobtracker.getJobStatus(id1));
+
+      assertEquals("Total job in cache not correct", 
+          1, jobtracker.getAllJobs().length);
     } finally {
-      if (mr != null) {mr.shutdown();}
+      if (mr != null) { mr.shutdown();}
     }
   }
 
-  /** Test if the job after gets expired but basic info is cached with jobtracker
-   */
-  public void testRetiredJobCache() throws Exception {
-    MiniMRCluster mr = null;
-    int min = 10000;
-    int max = 20000;
-    try {
-      FakeClock clock = new FakeClock();
-      JobConf conf = new JobConf();
+  private JobID validateJobRetire(JobConf jobConf, Path inDir, Path outDir, 
+      JobTracker jobtracker) throws IOException {
 
-      conf.setLong("mapred.jobtracker.retirejob.check", 1000); // 1 sec
-      conf.setInt("mapred.jobtracker.retirejob.interval.min", min); // 10 secs
-      conf.setInt("mapred.jobtracker.retirejob.interval", max); // 20 secs
-      conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", false);
-      conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 1);
-      conf.setInt("mapred.job.tracker.retiredjobs.cache.size", 1);
-      mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0,
-                             clock);
-      JobConf jobConf = mr.createJobConf();
-      JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
-      Path inDir = new Path(testDir, "input3.1");
-      Path outDir = new Path(testDir, "output3.1");
-      RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj.waitForCompletion();
-      JobID id = rj.getID();
-      JobClient jc = new JobClient(jobConf);
-
-      // check if the job is successful
-      assertTrue(rj.isSuccessful());
-      JobStatus status1 = jobtracker.getJobStatus(id);
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input3.2");
-      outDir = new Path(testDir, "output3.2");
-      RunningJob rj2 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj2.waitForCompletion();
-      JobID id2 = rj2.getID();
-      JobStatus status2 = jobtracker.getJobStatus(id2);
-
-      // check if the job#1 is missing in jt but cached status
-      assertNotNull("Job status missing from status cache", jc.getJob(id));
-      // check the status at jobtracker
-      assertEquals("Status mismatch for job " + id, status1.toString(), 
-                   jobtracker.getJobStatus(id).toString());
-      testRetiredCachedJobStatus(status1, rj);
-      assertNull("Job still not missing from jobtracker", jobtracker.getJob(id));
-
-      // check if the job#2 exists
-      assertNotNull(jc.getJob(id2));
-      // check the status .. 
-      
-      assertNotNull("Job " + id2 + " missing at the jobtracker before expiry",
-                    jobtracker.getJob(id2));
-
-      testJobConfFile(id, false);
-      testJobConfFile(id2, true);
-
-      clock.advance(min + 1); // adv to expiry min time
-
-      inDir = new Path(testDir, "input3.3");
-      outDir = new Path(testDir, "output3.3");
-      RunningJob rj3 = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
-      rj3.waitForCompletion();
-      JobID id3 = rj3.getID();
-
-      // check if the job#1 is missing in all the caches
-      assertNull("Job status still in status cache", jc.getJob(id));
-      // check if the job#2 is missing in jt but cached status
-      assertNotNull(jc.getJob(id2));
-      assertEquals("Status mismatch for job " + id2, status2.toString(), 
-                   jobtracker.getJobStatus(id2).toString());
-      testRetiredCachedJobStatus(status2, rj2);
-      assertNull("Job " + id2 + " missing at the jobtracker before expiry",
-                 jobtracker.getJob(id2));
-      // check if the job#3 exists
-      assertNotNull(jc.getJob(id3));
-      assertNotNull("Job " + id3 + " missing at the jobtracker before expiry",
-                    jobtracker.getJob(id3));
-    } finally {
-      if (mr != null) {mr.shutdown();}
+    RunningJob rj = UtilsForTests.runJob(jobConf, inDir, outDir, 0, 0);
+    rj.waitForCompletion();
+    assertTrue(rj.isSuccessful());
+    JobID id = rj.getID();
+
+    JobInProgress job = jobtracker.getJob(id);
+    //wait for job to get retired
+    for (int i = 0; i < 10 && job != null; i++) {
+      UtilsForTests.waitFor(1000);
+      job = jobtracker.getJob(id);
     }
+    assertNull("Job did not retire", job);
+    assertTrue("History url not set", rj.getHistoryUrl() != null && 
+    rj.getHistoryUrl().length() > 0);
+    assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
+    
+    // get the job conf filename
+    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
+    File file = new File(name);
+ 
+    assertFalse("JobConf file not deleted", file.exists());
+    return id;
   }
 
-  private static void testRetiredCachedJobStatus(JobStatus status, 
-                                                 RunningJob rj) 
-  throws IOException {
-    assertEquals(status.getJobID(), rj.getID());
-    assertEquals(status.mapProgress(), rj.mapProgress());
-    assertEquals(status.reduceProgress(), rj.reduceProgress());
-    assertEquals(status.setupProgress(), rj.setupProgress());
-    assertEquals(status.cleanupProgress(), rj.cleanupProgress());
-    assertEquals(status.getRunState(), rj.getJobState());
-    assertEquals(status.getJobName(), rj.getJobName());
-    assertEquals(status.getTrackingUrl(), rj.getTrackingURL());
-    assertEquals(status.isRetired(), true);
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java Wed Aug 26 15:01:29 2009
@@ -18,6 +18,9 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 import org.apache.hadoop.hdfs.TestHDFSServerPorts;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -57,7 +60,7 @@
    * Check whether the JobTracker can be started.
    */
   private JobTracker startJobTracker(JobConf conf, JTRunner runner) 
-  throws IOException {
+  throws IOException, LoginException {
     conf.set("mapred.job.tracker", "localhost:0");
     conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
     JobTracker jt = null;
@@ -87,7 +90,7 @@
    * Check whether the JobTracker can be started.
    */
   private boolean canStartJobTracker(JobConf conf) 
-  throws IOException, InterruptedException {
+  throws IOException, InterruptedException, LoginException {
     JobTracker jt = null;
     try {
       jt = JobTracker.startTracker(conf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Wed Aug 26 15:01:29 2009
@@ -173,7 +173,7 @@
   public static void runPI(MiniMRCluster mr, JobConf jobconf) 
       throws IOException, InterruptedException, ClassNotFoundException {
     LOG.info("runPI");
-    double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
+    double estimate = org.apache.hadoop.examples.QuasiMonteCarlo.estimatePi(
         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
     assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));



Mime
View raw message