hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r800232 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/ sr...
Date Mon, 03 Aug 2009 07:34:21 GMT
Author: sharad
Date: Mon Aug  3 07:34:20 2009
New Revision: 800232

URL: http://svn.apache.org/viewvc?rev=800232&view=rev
Log:
MAPREDUCE-656. Change org.apache.hadoop.mapred.SequenceFile* classes to use new mapreduce api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug  3 07:34:20 2009
@@ -165,6 +165,9 @@
     MAPREDUCE-797. Adds combiner support to MRUnit MapReduceDriver.
     (Aaron Kimball via johan)    
 
+    MAPREDUCE-656. Change org.apache.hadoop.mapred.SequenceFile* classes
+    to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java Mon Aug  3 07:34:20 2009
@@ -34,7 +34,11 @@
 /**
  * InputFormat reading keys, values from SequenceFiles in binary (raw)
  * format.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat}
+ * instead
  */
+@Deprecated
 public class SequenceFileAsBinaryInputFormat
     extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Mon Aug  3 07:34:20 2009
@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.io.DataOutputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,51 +27,28 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Progressable;
 
 /** 
  * An {@link OutputFormat} that writes keys, values to 
  * {@link SequenceFile}s in binary(raw) format
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat}
+ * instead
  */
+@Deprecated
 public class SequenceFileAsBinaryOutputFormat 
  extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
 
   /** 
    * Inner class used for appendRaw
    */
-  static protected class WritableValueBytes implements ValueBytes {
-    private BytesWritable value;
-
-    public WritableValueBytes() {
-      this.value = null;
-    }
-    public WritableValueBytes(BytesWritable value) {
-      this.value = value;
-    }
-
-    public void reset(BytesWritable value) {
-      this.value = value;
-    }
-
-    public void writeUncompressedBytes(DataOutputStream outStream)
-      throws IOException {
-      outStream.write(value.getBytes(), 0, value.getLength());
-    }
-
-    public void writeCompressedBytes(DataOutputStream outStream)
-      throws IllegalArgumentException, IOException {
-      throw
-        new UnsupportedOperationException("WritableValueBytes doesn't support " 
-                                          + "RECORD compression"); 
-    }
-    public int getSize(){
-      return value.getLength();
-    }
+  static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
+      .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
   }
 
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Aug  3 07:34:20 2009
@@ -23,9 +23,16 @@
 import org.apache.hadoop.io.Text;
 
 /**
- * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader 
- * which converts the input keys and values to their String forms by calling toString() method. 
+ * This class is similar to SequenceFileInputFormat, 
+ * except it generates SequenceFileAsTextRecordReader 
+ * which converts the input keys and values to their 
+ * String forms by calling toString() method.
+ * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat}
+ * instead
  */
+@Deprecated
 public class SequenceFileAsTextInputFormat
   extends SequenceFileInputFormat<Text, Text> {
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Aug  3 07:34:20 2009
@@ -29,7 +29,11 @@
  * This class converts the input keys and values to their String forms by calling toString()
  * method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
  * class to TextInputFormat class.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextRecordReader}
+ * instead
  */
+@Deprecated
 public class SequenceFileAsTextRecordReader
   implements RecordReader<Text, Text> {
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Aug  3 07:34:20 2009
@@ -19,33 +19,25 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.DigestException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * A class that allows a map/red job to work on a sample of sequence files.
  * The sample is decided by the filter class set by the job.
- * 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter}
+ * instead
  */
-
+@Deprecated
 public class SequenceFileInputFilter<K, V>
   extends SequenceFileInputFormat<K, V> {
   
-  final private static String FILTER_CLASS = "sequencefile.filter.class";
-  final private static String FILTER_FREQUENCY
-    = "sequencefile.filter.frequency";
-  final private static String FILTER_REGEX = "sequencefile.filter.regex";
-    
+  final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib.
+      input.SequenceFileInputFilter.FILTER_CLASS;
+
   public SequenceFileInputFilter() {
   }
     
@@ -78,54 +70,38 @@
   /**
    * filter interface
    */
-  public interface Filter extends Configurable {
-    /** filter function
-     * Decide if a record should be filtered or not
-     * @param key record key
-     * @return true if a record is accepted; return false otherwise
-     */
-    public abstract boolean accept(Object key);
+  public interface Filter extends 
+      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter {
   }
     
   /**
    * base class for Filters
    */
-  public static abstract class FilterBase implements Filter {
-    Configuration conf;
-        
-    public Configuration getConf() {
-      return conf;
-    }
+  public static abstract class FilterBase extends org.apache.hadoop.mapreduce.
+      lib.input.SequenceFileInputFilter.FilterBase
+      implements Filter {
   }
     
   /** Records filter by matching key to regex
    */
   public static class RegexFilter extends FilterBase {
-    private Pattern p;
-    /** Define the filtering regex and stores it in conf
-     * @param conf where the regex is set
-     * @param regex regex used as a filter
-     */
+    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+      RegexFilter rf;
     public static void setPattern(Configuration conf, String regex)
-      throws PatternSyntaxException {
-      try {
-        Pattern.compile(regex);
-      } catch (PatternSyntaxException e) {
-        throw new IllegalArgumentException("Invalid pattern: "+regex);
-      }
-      conf.set(FILTER_REGEX, regex);
+        throws PatternSyntaxException {
+      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+        RegexFilter.setPattern(conf, regex);
     }
         
-    public RegexFilter() { }
+    public RegexFilter() { 
+      rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+             RegexFilter();
+    }
         
     /** configure the Filter by checking the configuration
      */
     public void setConf(Configuration conf) {
-      String regex = conf.get(FILTER_REGEX);
-      if (regex==null)
-        throw new RuntimeException(FILTER_REGEX + "not set");
-      this.p = Pattern.compile(regex);
-      this.conf = conf;
+      rf.setConf(conf);
     }
 
 
@@ -134,7 +110,7 @@
      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
     public boolean accept(Object key) {
-      return p.matcher(key.toString()).matches();
+      return rf.accept(key);
     }
   }
 
@@ -144,33 +120,28 @@
    * For example, if the frequency is 10, one out of 10 records is returned.
    */
   public static class PercentFilter extends FilterBase {
-    private int frequency;
-    private int count;
-
+    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+	      PercentFilter pf;
     /** set the frequency and stores it in conf
      * @param conf configuration
      * @param frequency filtering frequencey
      */
-    public static void setFrequency(Configuration conf, int frequency){
-      if (frequency<=0)
-        throw new IllegalArgumentException(
-                                           "Negative " + FILTER_FREQUENCY + ": "+frequency);
-      conf.setInt(FILTER_FREQUENCY, frequency);
+    public static void setFrequency(Configuration conf, int frequency) {
+       org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+	      PercentFilter.setFrequency(conf, frequency);
+    }
+	        
+    public PercentFilter() { 
+      pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+        PercentFilter();
     }
-        
-    public PercentFilter() { }
-        
+	        
     /** configure the filter by checking the configuration
      * 
      * @param conf configuration
      */
     public void setConf(Configuration conf) {
-      this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
-      if (this.frequency <=0) {
-        throw new RuntimeException(
-                                   "Negative "+FILTER_FREQUENCY+": "+this.frequency);
-      }
-      this.conf = conf;
+      pf.setConf(conf);
     }
 
     /** Filtering method
@@ -178,13 +149,7 @@
      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
     public boolean accept(Object key) {
-      boolean accepted = false;
-      if (count == 0)
-        accepted = true;
-      if (++count == frequency) {
-        count = 0;
-      }
-      return accepted;
+      return pf.accept(key);
     }
   }
 
@@ -193,45 +158,30 @@
    * MD5(key) % f == 0.
    */
   public static class MD5Filter extends FilterBase {
-    private int frequency;
-    private static final MessageDigest DIGESTER;
-    public static final int MD5_LEN = 16;
-    private byte [] digest = new byte[MD5_LEN];
-        
-    static {
-      try {
-        DIGESTER = MessageDigest.getInstance("MD5");
-      } catch (NoSuchAlgorithmException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-
+    public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib.
+      input.SequenceFileInputFilter.MD5Filter.MD5_LEN;
+    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf;
     /** set the filtering frequency in configuration
      * 
      * @param conf configuration
      * @param frequency filtering frequency
      */
-    public static void setFrequency(Configuration conf, int frequency){
-      if (frequency<=0)
-        throw new IllegalArgumentException(
-                                           "Negative " + FILTER_FREQUENCY + ": "+frequency);
-      conf.setInt(FILTER_FREQUENCY, frequency);
+    public static void setFrequency(Configuration conf, int frequency) {
+      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter.
+        setFrequency(conf, frequency);
     }
         
-    public MD5Filter() { }
+    public MD5Filter() { 
+      mf = new org.apache.hadoop.mapreduce.lib.input.
+        SequenceFileInputFilter.MD5Filter();
+    }
         
     /** configure the filter according to configuration
      * 
      * @param conf configuration
      */
     public void setConf(Configuration conf) {
-      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
-      if (this.frequency <=0) {
-        throw new RuntimeException(
-                                   "Negative "+FILTER_FREQUENCY+": "+this.frequency);
-      }
-      this.conf = conf;
+      mf.setConf(conf);
     }
 
     /** Filtering method
@@ -239,41 +189,7 @@
      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
      */
     public boolean accept(Object key) {
-      try {
-        long hashcode;
-        if (key instanceof Text) {
-          hashcode = MD5Hashcode((Text)key);
-        } else if (key instanceof BytesWritable) {
-          hashcode = MD5Hashcode((BytesWritable)key);
-        } else {
-          ByteBuffer bb;
-          bb = Text.encode(key.toString());
-          hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
-        }
-        if (hashcode/frequency*frequency==hashcode)
-          return true;
-      } catch(Exception e) {
-        LOG.warn(e);
-        throw new RuntimeException(e);
-      }
-      return false;
-    }
-        
-    private long MD5Hashcode(Text key) throws DigestException {
-      return MD5Hashcode(key.getBytes(), 0, key.getLength());
-    }
-        
-    private long MD5Hashcode(BytesWritable key) throws DigestException {
-      return MD5Hashcode(key.getBytes(), 0, key.getLength());
-    }
-    synchronized private long MD5Hashcode(byte[] bytes, 
-                                          int start, int length) throws DigestException {
-      DIGESTER.update(bytes, 0, length);
-      DIGESTER.digest(digest, 0, MD5_LEN);
-      long hashcode=0;
-      for (int i = 0; i < 8; i++)
-        hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
-      return hashcode;
+      return mf.accept(key);
     }
   }
     

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat reading keys, values from SequenceFiles in binary (raw)
+ * format.
+ */
+public class SequenceFileAsBinaryInputFormat
+    extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
+
+  public SequenceFileAsBinaryInputFormat() {
+    super();
+  }
+
+  public RecordReader<BytesWritable,BytesWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+      throws IOException {
+    return new SequenceFileAsBinaryRecordReader();
+  }
+
+  /**
+   * Read records from a SequenceFile as binary (raw) bytes.
+   */
+  public static class SequenceFileAsBinaryRecordReader
+      extends RecordReader<BytesWritable,BytesWritable> {
+    private SequenceFile.Reader in;
+    private long start;
+    private long end;
+    private boolean done = false;
+    private DataOutputBuffer buffer = new DataOutputBuffer();
+    private SequenceFile.ValueBytes vbytes;
+    private BytesWritable key = null;
+    private BytesWritable value = null;
+
+    public void initialize(InputSplit split, TaskAttemptContext context) 
+        throws IOException, InterruptedException {
+      Path path = ((FileSplit)split).getPath();
+      Configuration conf = context.getConfiguration();
+      FileSystem fs = path.getFileSystem(conf);
+      this.in = new SequenceFile.Reader(fs, path, conf);
+      this.end = ((FileSplit)split).getStart() + split.getLength();
+      if (((FileSplit)split).getStart() > in.getPosition()) {
+        in.sync(((FileSplit)split).getStart());    // sync to start
+      }
+      this.start = in.getPosition();
+      vbytes = in.createValueBytes();
+      done = start >= end;
+    }
+    
+    @Override
+    public BytesWritable getCurrentKey() 
+        throws IOException, InterruptedException {
+      return key;
+    }
+    
+    @Override
+    public BytesWritable getCurrentValue() 
+        throws IOException, InterruptedException {
+      return value;
+    }
+
+    /**
+     * Retrieve the name of the key class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
+     */
+    public String getKeyClassName() {
+      return in.getKeyClassName();
+    }
+
+    /**
+     * Retrieve the name of the value class for this SequenceFile.
+     * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
+     */
+    public String getValueClassName() {
+      return in.getValueClassName();
+    }
+
+    /**
+     * Read raw bytes from a SequenceFile.
+     */
+    public synchronized boolean nextKeyValue()
+        throws IOException, InterruptedException {
+      if (done) {
+        return false;
+      }
+      long pos = in.getPosition();
+      boolean eof = -1 == in.nextRawKey(buffer);
+      if (!eof) {
+        if (key == null) {
+          key = new BytesWritable();
+        }
+        if (value == null) {
+          value = new BytesWritable();
+        }
+        key.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+        in.nextRawValue(vbytes);
+        vbytes.writeUncompressedBytes(buffer);
+        value.set(buffer.getData(), 0, buffer.getLength());
+        buffer.reset();
+      }
+      return !(done = (eof || (pos >= end && in.syncSeen())));
+    }
+
+    public void close() throws IOException {
+      in.close();
+    }
+
+    /**
+     * Return the progress within the input split
+     * @return 0.0 to 1.0 of the input byte range
+     */
+    public float getProgress() throws IOException, InterruptedException {
+      if (end == start) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (float)((in.getPosition() - start) /
+                                      (double)(end - start)));
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class is similar to SequenceFileInputFormat, except it generates
+ * SequenceFileAsTextRecordReader which converts the input keys and values
+ * to their String forms by calling toString() method. 
+ */
+public class SequenceFileAsTextInputFormat
+  extends SequenceFileInputFormat<Text, Text> {
+
+  public SequenceFileAsTextInputFormat() {
+    super();
+  }
+
+  public RecordReader<Text, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    context.setStatus(split.toString());
+    return new SequenceFileAsTextRecordReader();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class converts the input keys and values to their String forms by
+ * calling toString() method. This class to SequenceFileAsTextInputFormat
+ * class is as LineRecordReader class to TextInputFormat class.
+ */
+public class SequenceFileAsTextRecordReader
+  extends RecordReader<Text, Text> {
+  
+  private final SequenceFileRecordReader<WritableComparable<?>, Writable>
+    sequenceFileRecordReader;
+
+  private Text key;
+  private Text value;
+
+  public SequenceFileAsTextRecordReader()
+    throws IOException {
+    sequenceFileRecordReader =
+      new SequenceFileRecordReader<WritableComparable<?>, Writable>();
+  }
+
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    sequenceFileRecordReader.initialize(split, context);
+  }
+
+  @Override
+  public Text getCurrentKey() 
+      throws IOException, InterruptedException {
+    return key;
+  }
+  
+  @Override
+  public Text getCurrentValue() 
+      throws IOException, InterruptedException {
+    return value;
+  }
+  
+  /** Read key/value pair in a line. */
+  public synchronized boolean nextKeyValue() 
+      throws IOException, InterruptedException {
+    if (!sequenceFileRecordReader.nextKeyValue()) {
+      return false;
+    }
+    if (key == null) {
+      key = new Text(); 
+    }
+    if (value == null) {
+      value = new Text(); 
+    }
+    key.set(sequenceFileRecordReader.getCurrentKey().toString());
+    value.set(sequenceFileRecordReader.getCurrentValue().toString());
+    return true;
+  }
+  
+  public float getProgress() throws IOException,  InterruptedException {
+    return sequenceFileRecordReader.getProgress();
+  }
+  
+  public synchronized void close() throws IOException {
+    sequenceFileRecordReader.close();
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A class that allows a map/red job to work on a sample of sequence files.
+ * The sample is decided by the filter class set by the job.
+ */
+public class SequenceFileInputFilter<K, V>
+    extends SequenceFileInputFormat<K, V> {
+  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+  
+  final public static String FILTER_CLASS = "sequencefile.filter.class";
+  final private static String FILTER_FREQUENCY
+    = "sequencefile.filter.frequency";
+  final private static String FILTER_REGEX = "sequencefile.filter.regex";
+    
+  public SequenceFileInputFilter() {
+  }
+    
+  /** Create a record reader for the given split
+   * @param split file split
+   * @param context the task-attempt context
+   * @return RecordReader
+   */
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    context.setStatus(split.toString());
+    return new FilterRecordReader<K, V>(context.getConfiguration());
+  }
+
+
+  /** set the filter class
+   * 
+   * @param job The job
+   * @param filterClass filter class
+   */
+  public static void setFilterClass(Job job, Class<?> filterClass) {
+    job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
+  }
+
+         
+  /**
+   * filter interface
+   */
+  public interface Filter extends Configurable {
+    /** filter function
+     * Decide if a record should be filtered or not
+     * @param key record key
+     * @return true if a record is accepted; return false otherwise
+     */
+    public abstract boolean accept(Object key);
+  }
+    
+  /**
+   * base class for Filters
+   */
+  public static abstract class FilterBase implements Filter {
+    Configuration conf;
+        
+    public Configuration getConf() {
+      return conf;
+    }
+  }
+    
+  /** Records filter by matching key to regex
+   */
+  public static class RegexFilter extends FilterBase {
+    private Pattern p;
+    /** Define the filtering regex and stores it in conf
+     * @param conf where the regex is set
+     * @param regex regex used as a filter
+     */
+    public static void setPattern(Configuration conf, String regex)
+        throws PatternSyntaxException {
+      try {
+        Pattern.compile(regex);
+      } catch (PatternSyntaxException e) {
+        throw new IllegalArgumentException("Invalid pattern: "+regex);
+      }
+      conf.set(FILTER_REGEX, regex);
+    }
+        
+    public RegexFilter() { }
+        
+    /** configure the Filter by checking the configuration
+     */
+    public void setConf(Configuration conf) {
+      String regex = conf.get(FILTER_REGEX);
+      if (regex == null)
+        throw new RuntimeException(FILTER_REGEX + "not set");
+      this.p = Pattern.compile(regex);
+      this.conf = conf;
+    }
+
+
+    /** Filtering method
+     * If key matches the regex, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      return p.matcher(key.toString()).matches();
+    }
+  }
+
+  /** This class returns a percentage of records
+   * The percentage is determined by a filtering frequency <i>f</i> using
+   * the criteria record# % f == 0.
+   * For example, if the frequency is 10, one out of 10 records is returned.
+   */
+  public static class PercentFilter extends FilterBase {
+    private int frequency;
+    private int count;
+
+    /** set the frequency and stores it in conf
+     * @param conf configuration
+     * @param frequency filtering frequencey
+     */
+    public static void setFrequency(Configuration conf, int frequency) {
+      if (frequency <= 0)
+        throw new IllegalArgumentException(
+          "Negative " + FILTER_FREQUENCY + ": " + frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
+        
+    public PercentFilter() { }
+        
+    /** configure the filter by checking the configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+      if (this.frequency <= 0) {
+        throw new RuntimeException(
+          "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
+      }
+      this.conf = conf;
+    }
+
+    /** Filtering method
+     * If record# % frequency==0, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      boolean accepted = false;
+      if (count == 0)
+        accepted = true;
+      if (++count == frequency) {
+        count = 0;
+      }
+      return accepted;
+    }
+  }
+
+  /** This class returns a set of records by examing the MD5 digest of its
+   * key against a filtering frequency <i>f</i>. The filtering criteria is
+   * MD5(key) % f == 0.
+   */
+  public static class MD5Filter extends FilterBase {
+    private int frequency;
+    private static final MessageDigest DIGESTER;
+    public static final int MD5_LEN = 16;
+    private byte [] digest = new byte[MD5_LEN];
+        
+    static {
+      try {
+        DIGESTER = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+
+    /** set the filtering frequency in configuration
+     * 
+     * @param conf configuration
+     * @param frequency filtering frequency
+     */
+    public static void setFrequency(Configuration conf, int frequency) {
+      if (frequency <= 0)
+        throw new IllegalArgumentException(
+          "Negative " + FILTER_FREQUENCY + ": " + frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
+        
+    public MD5Filter() { }
+        
+    /** configure the filter according to configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+      if (this.frequency <= 0) {
+        throw new RuntimeException(
+          "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
+      }
+      this.conf = conf;
+    }
+
+    /** Filtering method
+     * If MD5(key) % frequency==0, return true; otherwise return false
+     * @see Filter#accept(Object)
+     */
+    public boolean accept(Object key) {
+      try {
+        long hashcode;
+        if (key instanceof Text) {
+          hashcode = MD5Hashcode((Text)key);
+        } else if (key instanceof BytesWritable) {
+          hashcode = MD5Hashcode((BytesWritable)key);
+        } else {
+          ByteBuffer bb;
+          bb = Text.encode(key.toString());
+          hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
+        }
+        if (hashcode / frequency * frequency == hashcode)
+          return true;
+      } catch(Exception e) {
+        LOG.warn(e);
+        throw new RuntimeException(e);
+      }
+      return false;
+    }
+        
+    private long MD5Hashcode(Text key) throws DigestException {
+      return MD5Hashcode(key.getBytes(), 0, key.getLength());
+    }
+        
+    private long MD5Hashcode(BytesWritable key) throws DigestException {
+      return MD5Hashcode(key.getBytes(), 0, key.getLength());
+    }
+    
+    synchronized private long MD5Hashcode(byte[] bytes, 
+        int start, int length) throws DigestException {
+      DIGESTER.update(bytes, 0, length);
+      DIGESTER.digest(digest, 0, MD5_LEN);
+      long hashcode=0;
+      for (int i = 0; i < 8; i++)
+        hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
+      return hashcode;
+    }
+  }
+    
+  private static class FilterRecordReader<K, V>
+      extends SequenceFileRecordReader<K, V> {
+    
+    private Filter filter;
+    private K key;
+    private V value;
+        
+    public FilterRecordReader(Configuration conf)
+        throws IOException {
+      super();
+      // instantiate filter
+      filter = (Filter)ReflectionUtils.newInstance(
+        conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
+    }
+    
+    public synchronized boolean nextKeyValue() 
+        throws IOException, InterruptedException {
+      while (super.nextKeyValue()) {
+        key = super.getCurrentKey();
+        if (filter.accept(key)) {
+          value = super.getCurrentValue();
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    public K getCurrentKey() {
+      return key;
+    }
+    
+    @Override
+    public V getCurrentValue() {
+      return value;
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** 
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys, 
+ * values to {@link SequenceFile}s in binary(raw) format
+ */
+public class SequenceFileAsBinaryOutputFormat 
+    extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+
+  /** 
+   * Inner class used for appendRaw
+   */
+  static public class WritableValueBytes implements ValueBytes {
+    private BytesWritable value;
+
+    public WritableValueBytes() {
+      this.value = null;
+    }
+    
+    public WritableValueBytes(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void reset(BytesWritable value) {
+      this.value = value;
+    }
+
+    public void writeUncompressedBytes(DataOutputStream outStream)
+        throws IOException {
+      outStream.write(value.getBytes(), 0, value.getLength());
+    }
+
+    public void writeCompressedBytes(DataOutputStream outStream)
+        throws IllegalArgumentException, IOException {
+      throw new UnsupportedOperationException(
+        "WritableValueBytes doesn't support RECORD compression"); 
+    }
+    
+    public int getSize(){
+      return value.getLength();
+    }
+  }
+
+  /**
+   * Set the key class for the {@link SequenceFile}
+   * <p>This allows the user to specify the key class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param job the {@link Job} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputKeyClass(Job job, 
+      Class<?> theClass) {
+    job.getConfiguration().setClass("mapred.seqbinary.output.key.class",
+      theClass, Object.class);
+  }
+
+  /**
+   * Set the value class for the {@link SequenceFile}
+   * <p>This allows the user to specify the value class to be different 
+   * from the actual class ({@link BytesWritable}) used for writing </p>
+   * 
+   * @param job the {@link Job} to modify
+   * @param theClass the SequenceFile output key class.
+   */
+  static public void setSequenceFileOutputValueClass(Job job, 
+      Class<?> theClass) {
+    job.getConfiguration().setClass("mapred.seqbinary.output.value.class", 
+                  theClass, Object.class);
+  }
+
+  /**
+   * Get the key class for the {@link SequenceFile}
+   * 
+   * @return the key class of the {@link SequenceFile}
+   */
+  static public Class<? extends WritableComparable> 
+      getSequenceFileOutputKeyClass(JobContext job) { 
+    return job.getConfiguration().getClass("mapred.seqbinary.output.key.class",
+      job.getOutputKeyClass().asSubclass(WritableComparable.class), 
+      WritableComparable.class);
+  }
+
+  /**
+   * Get the value class for the {@link SequenceFile}
+   * 
+   * @return the value class of the {@link SequenceFile}
+   */
+  static public Class<? extends Writable> getSequenceFileOutputValueClass(
+      JobContext job) { 
+    return job.getConfiguration().getClass(
+      "mapred.seqbinary.output.value.class", 
+      job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
+  }
+  
+  @Override 
+  public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    final SequenceFile.Writer out = getSequenceWriter(context,
+      getSequenceFileOutputKeyClass(context),
+      getSequenceFileOutputValueClass(context)); 
+
+    return new RecordWriter<BytesWritable, BytesWritable>() {
+      private WritableValueBytes wvaluebytes = new WritableValueBytes();
+
+      public void write(BytesWritable bkey, BytesWritable bvalue)
+        throws IOException {
+        wvaluebytes.reset(bvalue);
+        out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
+        wvaluebytes.reset(null);
+      }
+
+      public void close(TaskAttemptContext context) throws IOException { 
+        out.close();
+      }
+    };
+  }
+
+  @Override 
+  public void checkOutputSpecs(JobContext job) throws IOException {
+    super.checkOutputSpecs(job);
+    if (getCompressOutput(job) && 
+        getOutputCompressionType(job) == CompressionType.RECORD ) {
+      throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+        + "doesn't support Record Compression" );
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Mon Aug  3 07:34:20 2009
@@ -38,17 +38,16 @@
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
 public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
 
-  public RecordWriter<K, V> 
-         getRecordWriter(TaskAttemptContext context
-                         ) throws IOException, InterruptedException {
+  protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
+      Class<?> keyClass, Class<?> valueClass) 
+      throws IOException {
     Configuration conf = context.getConfiguration();
-    
+	    
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(context)) {
       // find the kind of compression to do
       compressionType = getOutputCompressionType(context);
-
       // find the right codec
       Class<?> codecClass = getOutputCompressorClass(context, 
                                                      DefaultCodec.class);
@@ -58,13 +57,19 @@
     // get the path of the temporary output file 
     Path file = getDefaultWorkFile(context, "");
     FileSystem fs = file.getFileSystem(conf);
-    final SequenceFile.Writer out = 
-      SequenceFile.createWriter(fs, conf, file,
-                                context.getOutputKeyClass(),
-                                context.getOutputValueClass(),
-                                compressionType,
-                                codec,
-                                context);
+    return SequenceFile.createWriter(fs, conf, file,
+             keyClass,
+             valueClass,
+             compressionType,
+             codec,
+             context);
+  }
+  
+  public RecordWriter<K, V> 
+         getRecordWriter(TaskAttemptContext context
+                         ) throws IOException, InterruptedException {
+    final SequenceFile.Writer out = getSequenceWriter(context,
+      context.getOutputKeyClass(), context.getOutputValueClass());
 
     return new RecordWriter<K, V>() {
 

Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Mon Aug  3 07:34:20 2009
@@ -52,6 +52,10 @@
        <Class name="~org.apache.hadoop.mapred.lib.aggregate.*" />
        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
      </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
+       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+     </Match>
      <!--
        Ignore warnings for usage of System.exit. This is
        required and have been well thought out

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import junit.framework.TestCase;
+
+public class TestMRSequenceFileAsBinaryInputFormat extends TestCase {
+  private static final int RECORDS = 10000;
+
+  public void testBinary() throws IOException, InterruptedException {
+    Job job = new Job();
+    FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "testbinary.seq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    fs.delete(dir, true);
+    FileInputFormat.setInputPaths(job, dir);
+
+    Text tkey = new Text();
+    Text tval = new Text();
+
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+      job.getConfiguration(), file, Text.class, Text.class);
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        tkey.set(Integer.toString(r.nextInt(), 36));
+        tval.set(Long.toString(r.nextLong(), 36));
+        writer.append(tkey, tval);
+      }
+    } finally {
+      writer.close();
+    }
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(job.getConfiguration());
+    InputFormat<BytesWritable,BytesWritable> bformat =
+      new SequenceFileAsBinaryInputFormat();
+
+    int count = 0;
+    r.setSeed(seed);
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+    Text cmpkey = new Text();
+    Text cmpval = new Text();
+    DataInputBuffer buf = new DataInputBuffer();
+    FileInputFormat.setInputPaths(job, file);
+    for (InputSplit split : bformat.getSplits(job)) {
+      RecordReader<BytesWritable, BytesWritable> reader =
+            bformat.createRecordReader(split, context);
+      MapContext<BytesWritable, BytesWritable, BytesWritable, BytesWritable> 
+        mcontext = new MapContext<BytesWritable, BytesWritable,
+          BytesWritable, BytesWritable>(job.getConfiguration(), 
+          context.getTaskAttemptID(), reader, null, null, 
+          MapReduceTestUtil.createDummyReporter(), 
+          split);
+      reader.initialize(split, mcontext);
+      try {
+        while (reader.nextKeyValue()) {
+          bkey = reader.getCurrentKey();
+          bval = reader.getCurrentValue();
+          tkey.set(Integer.toString(r.nextInt(), 36));
+          tval.set(Long.toString(r.nextLong(), 36));
+          buf.reset(bkey.getBytes(), bkey.getLength());
+          cmpkey.readFields(buf);
+          buf.reset(bval.getBytes(), bval.getLength());
+          cmpval.readFields(buf);
+          assertTrue(
+            "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+            tkey.toString() + "*",
+            cmpkey.toString().equals(tkey.toString()));
+          assertTrue(
+            "Vals don't match: " + "*" + cmpval.toString() + ":" +
+            tval.toString() + "*",
+            cmpval.toString().equals(tval.toString()));
+          ++count;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileAsTextInputFormat extends TestCase {
+  private static int MAX_LENGTH = 10000;
+  private static Configuration conf = new Configuration();
+
+  public void testFormat() throws Exception {
+    Job job = new Job(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "test.seq");
+    
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    fs.delete(dir, true);
+
+    FileInputFormat.setInputPaths(job, dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+      // create a file with length entries
+      SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, conf, file,
+          IntWritable.class, LongWritable.class);
+      try {
+        for (int i = 0; i < length; i++) {
+          IntWritable key = new IntWritable(i);
+          LongWritable value = new LongWritable(10 * i);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+
+      TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+      // try splitting the file in a variety of sizes
+      InputFormat<Text, Text> format =
+        new SequenceFileAsTextInputFormat();
+      
+      for (int i = 0; i < 3; i++) {
+        // check each split
+        BitSet bits = new BitSet(length);
+        int numSplits =
+          random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+        FileInputFormat.setMaxInputSplitSize(job, 
+          fs.getFileStatus(file).getLen() / numSplits);
+        for (InputSplit split : format.getSplits(job)) {
+          RecordReader<Text, Text> reader =
+            format.createRecordReader(split, context);
+          MapContext<Text, Text, Text, Text> mcontext = 
+            new MapContext<Text, Text, Text, Text>(job.getConfiguration(), 
+            context.getTaskAttemptID(), reader, null, null, 
+            MapReduceTestUtil.createDummyReporter(), 
+            split);
+          reader.initialize(split, mcontext);
+          Class<?> readerClass = reader.getClass();
+          assertEquals("reader class is SequenceFileAsTextRecordReader.",
+            SequenceFileAsTextRecordReader.class, readerClass);        
+          Text key;
+          try {
+            int count = 0;
+            while (reader.nextKeyValue()) {
+              key = reader.getCurrentKey();
+              int keyInt = Integer.parseInt(key.toString());
+              assertFalse("Key in multiple partitions.", bits.get(keyInt));
+              bits.set(keyInt);
+              count++;
+            }
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestMRSequenceFileAsTextInputFormat().testFormat();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileInputFilter extends TestCase {
+  private static final Log LOG = 
+    LogFactory.getLog(TestMRSequenceFileInputFilter.class.getName());
+
+  private static final int MAX_LENGTH = 15000;
+  private static final Configuration conf = new Configuration();
+  private static final Job job;
+  private static final FileSystem fs;
+  private static final Path inDir = 
+    new Path(System.getProperty("test.build.data",".") + "/mapred");
+  private static final Path inFile = new Path(inDir, "test.seq");
+  private static final Random random = new Random(1);
+  
+  static {
+    try {
+      job = new Job(conf);
+      FileInputFormat.setInputPaths(job, inDir);
+      fs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void createSequenceFile(int numRecords) throws Exception {
+    // create a file with length entries
+    SequenceFile.Writer writer =
+      SequenceFile.createWriter(fs, conf, inFile,
+                                Text.class, BytesWritable.class);
+    try {
+      for (int i = 1; i <= numRecords; i++) {
+        Text key = new Text(Integer.toString(i));
+        byte[] data = new byte[random.nextInt(10)];
+        random.nextBytes(data);
+        BytesWritable value = new BytesWritable(data);
+        writer.append(key, value);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+
+  private int countRecords(int numSplits) 
+      throws IOException, InterruptedException {
+    InputFormat<Text, BytesWritable> format =
+      new SequenceFileInputFilter<Text, BytesWritable>();
+    if (numSplits == 0) {
+      numSplits =
+        random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+    }
+    FileInputFormat.setMaxInputSplitSize(job, 
+      fs.getFileStatus(inFile).getLen() / numSplits);
+    TaskAttemptContext context = MapReduceTestUtil.
+      createDummyMapTaskAttemptContext(job.getConfiguration());
+    // check each split
+    int count = 0;
+    for (InputSplit split : format.getSplits(job)) {
+      RecordReader<Text, BytesWritable> reader =
+        format.createRecordReader(split, context);
+      MapContext<Text, BytesWritable, Text, BytesWritable> mcontext = 
+        new MapContext<Text, BytesWritable, Text, BytesWritable>(
+        job.getConfiguration(), 
+        context.getTaskAttemptID(), reader, null, null, 
+        MapReduceTestUtil.createDummyReporter(), split);
+      reader.initialize(split, mcontext);
+      try {
+        while (reader.nextKeyValue()) {
+          LOG.info("Accept record " + reader.getCurrentKey().toString());
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    return count;
+  }
+  
+  public void testRegexFilter() throws Exception {
+    // set the filter class
+    LOG.info("Testing Regex Filter with patter: \\A10*");
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.RegexFilter.class);
+    SequenceFileInputFilter.RegexFilter.setPattern(
+      job.getConfiguration(), "\\A10*");
+    
+    // clean input dir
+    fs.delete(inDir, true);
+  
+    // for a variety of lengths
+    for (int length = 1; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: " + length);
+      createSequenceFile(length);
+      int count = countRecords(0);
+      assertEquals(count, length==0 ? 0 : (int)Math.log10(length) + 1);
+    }
+    
+    // clean up
+    fs.delete(inDir, true);
+  }
+
+  public void testPercentFilter() throws Exception {
+    LOG.info("Testing Percent Filter with frequency: 1000");
+    // set the filter class
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.PercentFilter.class);
+    SequenceFileInputFilter.PercentFilter.setFrequency(
+      job.getConfiguration(), 1000);
+      
+    // clean input dir
+    fs.delete(inDir, true);
+    
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: "+length);
+      createSequenceFile(length);
+      int count = countRecords(1);
+      LOG.info("Accepted " + count + " records");
+      int expectedCount = length / 1000;
+      if (expectedCount * 1000 != length)
+        expectedCount++;
+      assertEquals(count, expectedCount);
+    }
+      
+    // clean up
+    fs.delete(inDir, true);
+  }
+  
+  public void testMD5Filter() throws Exception {
+    // set the filter class
+    LOG.info("Testing MD5 Filter with frequency: 1000");
+    SequenceFileInputFilter.setFilterClass(job, 
+      SequenceFileInputFilter.MD5Filter.class);
+    SequenceFileInputFilter.MD5Filter.setFrequency(
+      job.getConfiguration(), 1000);
+      
+    // clean input dir
+    fs.delete(inDir, true);
+    
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      LOG.info("******Number of records: " + length);
+      createSequenceFile(length);
+      LOG.info("Accepted " + countRecords(0) + " records");
+    }
+    // clean up
+    fs.delete(inDir, true);
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestMRSequenceFileInputFilter filter = new TestMRSequenceFileInputFilter();
+    filter.testRegexFilter();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Mon Aug  3 07:34:20 2009
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestMRSequenceFileAsBinaryOutputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestMRSequenceFileAsBinaryOutputFormat.class.getName());
+
+  private static final int RECORDS = 10000;
+  
+  public void testBinary() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    Job job = new Job(conf);
+    
+    Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
+                    "outseq");
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+
+    FileOutputFormat.setOutputPath(job, outdir);
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+                                          IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+                                          DoubleWritable.class ); 
+
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+                                                       CompressionType.BLOCK);
+
+    BytesWritable bkey = new BytesWritable();
+    BytesWritable bval = new BytesWritable();
+
+    TaskAttemptContext context = 
+      MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
+    OutputFormat<BytesWritable, BytesWritable> outputFormat = 
+      new SequenceFileAsBinaryOutputFormat();
+    OutputCommitter committer = outputFormat.getOutputCommitter(context);
+    committer.setupJob(job);
+    RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
+      getRecordWriter(context);
+
+    IntWritable iwritable = new IntWritable();
+    DoubleWritable dwritable = new DoubleWritable();
+    DataOutputBuffer outbuf = new DataOutputBuffer();
+    LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+    try {
+      for (int i = 0; i < RECORDS; ++i) {
+        iwritable = new IntWritable(r.nextInt());
+        iwritable.write(outbuf);
+        bkey.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        dwritable = new DoubleWritable(r.nextDouble());
+        dwritable.write(outbuf);
+        bval.set(outbuf.getData(), 0, outbuf.getLength());
+        outbuf.reset();
+        writer.write(bkey, bval);
+      }
+    } finally {
+      writer.close(context);
+    }
+    committer.commitTask(context);
+    committer.cleanupJob(job);
+
+    InputFormat<IntWritable, DoubleWritable> iformat =
+      new SequenceFileInputFormat<IntWritable, DoubleWritable>();
+    int count = 0;
+    r.setSeed(seed);
+    SequenceFileInputFormat.setInputPaths(job, outdir);
+    LOG.info("Reading data by SequenceFileInputFormat");
+    for (InputSplit split : iformat.getSplits(job)) {
+      RecordReader<IntWritable, DoubleWritable> reader =
+        iformat.createRecordReader(split, context);
+      MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable> 
+        mcontext = new MapContext<IntWritable, DoubleWritable,
+          BytesWritable, BytesWritable>(job.getConfiguration(), 
+          context.getTaskAttemptID(), reader, null, null, 
+          MapReduceTestUtil.createDummyReporter(), 
+          split);
+      reader.initialize(split, mcontext);
+      try {
+        int sourceInt;
+        double sourceDouble;
+        while (reader.nextKeyValue()) {
+          sourceInt = r.nextInt();
+          sourceDouble = r.nextDouble();
+          iwritable = reader.getCurrentKey();
+          dwritable = reader.getCurrentValue();
+          assertEquals(
+              "Keys don't match: " + "*" + iwritable.get() + ":" + 
+                                           sourceInt + "*",
+              sourceInt, iwritable.get());
+          assertTrue(
+              "Vals don't match: " + "*" + dwritable.get() + ":" +
+                                           sourceDouble + "*",
+              Double.compare(dwritable.get(), sourceDouble) == 0 );
+          ++count;
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    assertEquals("Some records not found", RECORDS, count);
+  }
+
+  public void testSequenceOutputClassDefaultsToMapRedOutputClass() 
+         throws IOException {
+    Job job = new Job();
+    // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(BooleanWritable.class);
+
+    assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass", 
+      FloatWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+    assertEquals("SequenceFileOutputValueClass should default to " 
+      + "ouputValueClass", 
+      BooleanWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
+      IntWritable.class );
+    SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
+      DoubleWritable.class ); 
+
+    assertEquals("SequenceFileOutputKeyClass not updated", 
+      IntWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+    assertEquals("SequenceFileOutputValueClass not updated", 
+      DoubleWritable.class,
+      SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+  }
+
+  public void testcheckOutputSpecsForbidRecordCompression() 
+      throws IOException {
+    Job job = new Job();
+    FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+    Path outputdir = new Path(System.getProperty("test.build.data", "/tmp") 
+                              + "/output");
+    fs.delete(outputdir, true);
+
+    // Without outputpath, FileOutputFormat.checkoutputspecs will throw 
+    // InvalidJobConfException
+    FileOutputFormat.setOutputPath(job, outputdir);
+
+    // SequenceFileAsBinaryOutputFormat doesn't support record compression
+    // It should throw an exception when checked by checkOutputSpecs
+    SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+      CompressionType.BLOCK);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+    } catch (Exception e) {
+      fail("Block compression should be allowed for " 
+        + "SequenceFileAsBinaryOutputFormat:Caught " + e.getClass().getName());
+    }
+
+    SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
+      CompressionType.RECORD);
+    try {
+      new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+      fail("Record compression should not be allowed for " 
+        + "SequenceFileAsBinaryOutputFormat");
+    } catch (InvalidJobConfException ie) {
+      // expected
+    } catch (Exception e) {
+      fail("Expected " + InvalidJobConfException.class.getName() 
+        + "but caught " + e.getClass().getName() );
+    }
+  }
+}



Mime
View raw message