Author: sharad
Date: Mon Aug 3 07:34:20 2009
New Revision: 800232
URL: http://svn.apache.org/viewvc?rev=800232&view=rev
Log:
MAPREDUCE-656. Change org.apache.hadoop.mapred.SequenceFile* classes to use new mapreduce api. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 3 07:34:20 2009
@@ -165,6 +165,9 @@
MAPREDUCE-797. Adds combiner support to MRUnit MapReduceDriver.
(Aaron Kimball via johan)
+ MAPREDUCE-656. Change org.apache.hadoop.mapred.SequenceFile* classes
+ to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
BUG FIXES
MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
(Aaron Kimball via matei)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryInputFormat.java Mon Aug 3 07:34:20 2009
@@ -34,7 +34,11 @@
/**
* InputFormat reading keys, values from SequenceFiles in binary (raw)
* format.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat}
+ * instead
*/
+@Deprecated
public class SequenceFileAsBinaryInputFormat
extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Mon Aug 3 07:34:20 2009
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.io.DataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -28,51 +27,28 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Progressable;
/**
* An {@link OutputFormat} that writes keys, values to
* {@link SequenceFile}s in binary(raw) format
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat}
+ * instead
*/
+@Deprecated
public class SequenceFileAsBinaryOutputFormat
extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
/**
* Inner class used for appendRaw
*/
- static protected class WritableValueBytes implements ValueBytes {
- private BytesWritable value;
-
- public WritableValueBytes() {
- this.value = null;
- }
- public WritableValueBytes(BytesWritable value) {
- this.value = value;
- }
-
- public void reset(BytesWritable value) {
- this.value = value;
- }
-
- public void writeUncompressedBytes(DataOutputStream outStream)
- throws IOException {
- outStream.write(value.getBytes(), 0, value.getLength());
- }
-
- public void writeCompressedBytes(DataOutputStream outStream)
- throws IllegalArgumentException, IOException {
- throw
- new UnsupportedOperationException("WritableValueBytes doesn't support "
- + "RECORD compression");
- }
- public int getSize(){
- return value.getLength();
- }
+ static protected class WritableValueBytes extends org.apache.hadoop.mapreduce
+ .lib.output.SequenceFileAsBinaryOutputFormat.WritableValueBytes {
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Aug 3 07:34:20 2009
@@ -23,9 +23,16 @@
import org.apache.hadoop.io.Text;
/**
- * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader
- * which converts the input keys and values to their String forms by calling toString() method.
+ * This class is similar to SequenceFileInputFormat,
+ * except it generates SequenceFileAsTextRecordReader
+ * which converts the input keys and values to their
+ * String forms by calling toString() method.
+ *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat}
+ * instead
*/
+@Deprecated
public class SequenceFileAsTextInputFormat
extends SequenceFileInputFormat<Text, Text> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Aug 3 07:34:20 2009
@@ -29,7 +29,11 @@
* This class converts the input keys and values to their String forms by calling toString()
* method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
* class to TextInputFormat class.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextRecordReader}
+ * instead
*/
+@Deprecated
public class SequenceFileAsTextRecordReader
implements RecordReader<Text, Text> {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Aug 3 07:34:20 2009
@@ -19,33 +19,25 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.DigestException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
/**
* A class that allows a map/red job to work on a sample of sequence files.
* The sample is decided by the filter class set by the job.
- *
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter}
+ * instead
*/
-
+@Deprecated
public class SequenceFileInputFilter<K, V>
extends SequenceFileInputFormat<K, V> {
- final private static String FILTER_CLASS = "sequencefile.filter.class";
- final private static String FILTER_FREQUENCY
- = "sequencefile.filter.frequency";
- final private static String FILTER_REGEX = "sequencefile.filter.regex";
-
+ final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib.
+ input.SequenceFileInputFilter.FILTER_CLASS;
+
public SequenceFileInputFilter() {
}
@@ -78,54 +70,38 @@
/**
* filter interface
*/
- public interface Filter extends Configurable {
- /** filter function
- * Decide if a record should be filtered or not
- * @param key record key
- * @return true if a record is accepted; return false otherwise
- */
- public abstract boolean accept(Object key);
+ public interface Filter extends
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter {
}
/**
* base class for Filters
*/
- public static abstract class FilterBase implements Filter {
- Configuration conf;
-
- public Configuration getConf() {
- return conf;
- }
+ public static abstract class FilterBase extends org.apache.hadoop.mapreduce.
+ lib.input.SequenceFileInputFilter.FilterBase
+ implements Filter {
}
/** Records filter by matching key to regex
*/
public static class RegexFilter extends FilterBase {
- private Pattern p;
- /** Define the filtering regex and stores it in conf
- * @param conf where the regex is set
- * @param regex regex used as a filter
- */
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ RegexFilter rf;
public static void setPattern(Configuration conf, String regex)
- throws PatternSyntaxException {
- try {
- Pattern.compile(regex);
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException("Invalid pattern: "+regex);
- }
- conf.set(FILTER_REGEX, regex);
+ throws PatternSyntaxException {
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ RegexFilter.setPattern(conf, regex);
}
- public RegexFilter() { }
+ public RegexFilter() {
+ rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ RegexFilter();
+ }
/** configure the Filter by checking the configuration
*/
public void setConf(Configuration conf) {
- String regex = conf.get(FILTER_REGEX);
- if (regex==null)
- throw new RuntimeException(FILTER_REGEX + "not set");
- this.p = Pattern.compile(regex);
- this.conf = conf;
+ rf.setConf(conf);
}
@@ -134,7 +110,7 @@
* @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
public boolean accept(Object key) {
- return p.matcher(key.toString()).matches();
+ return rf.accept(key);
}
}
@@ -144,33 +120,28 @@
* For example, if the frequency is 10, one out of 10 records is returned.
*/
public static class PercentFilter extends FilterBase {
- private int frequency;
- private int count;
-
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ PercentFilter pf;
/** set the frequency and stores it in conf
* @param conf configuration
* @param frequency filtering frequencey
*/
- public static void setFrequency(Configuration conf, int frequency){
- if (frequency<=0)
- throw new IllegalArgumentException(
- "Negative " + FILTER_FREQUENCY + ": "+frequency);
- conf.setInt(FILTER_FREQUENCY, frequency);
+ public static void setFrequency(Configuration conf, int frequency) {
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ PercentFilter.setFrequency(conf, frequency);
+ }
+
+ public PercentFilter() {
+ pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
+ PercentFilter();
}
-
- public PercentFilter() { }
-
+
/** configure the filter by checking the configuration
*
* @param conf configuration
*/
public void setConf(Configuration conf) {
- this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
- if (this.frequency <=0) {
- throw new RuntimeException(
- "Negative "+FILTER_FREQUENCY+": "+this.frequency);
- }
- this.conf = conf;
+ pf.setConf(conf);
}
/** Filtering method
@@ -178,13 +149,7 @@
* @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
public boolean accept(Object key) {
- boolean accepted = false;
- if (count == 0)
- accepted = true;
- if (++count == frequency) {
- count = 0;
- }
- return accepted;
+ return pf.accept(key);
}
}
@@ -193,45 +158,30 @@
* MD5(key) % f == 0.
*/
public static class MD5Filter extends FilterBase {
- private int frequency;
- private static final MessageDigest DIGESTER;
- public static final int MD5_LEN = 16;
- private byte [] digest = new byte[MD5_LEN];
-
- static {
- try {
- DIGESTER = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
-
+ public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib.
+ input.SequenceFileInputFilter.MD5Filter.MD5_LEN;
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf;
/** set the filtering frequency in configuration
*
* @param conf configuration
* @param frequency filtering frequency
*/
- public static void setFrequency(Configuration conf, int frequency){
- if (frequency<=0)
- throw new IllegalArgumentException(
- "Negative " + FILTER_FREQUENCY + ": "+frequency);
- conf.setInt(FILTER_FREQUENCY, frequency);
+ public static void setFrequency(Configuration conf, int frequency) {
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter.
+ setFrequency(conf, frequency);
}
- public MD5Filter() { }
+ public MD5Filter() {
+ mf = new org.apache.hadoop.mapreduce.lib.input.
+ SequenceFileInputFilter.MD5Filter();
+ }
/** configure the filter according to configuration
*
* @param conf configuration
*/
public void setConf(Configuration conf) {
- this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
- if (this.frequency <=0) {
- throw new RuntimeException(
- "Negative "+FILTER_FREQUENCY+": "+this.frequency);
- }
- this.conf = conf;
+ mf.setConf(conf);
}
/** Filtering method
@@ -239,41 +189,7 @@
* @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
*/
public boolean accept(Object key) {
- try {
- long hashcode;
- if (key instanceof Text) {
- hashcode = MD5Hashcode((Text)key);
- } else if (key instanceof BytesWritable) {
- hashcode = MD5Hashcode((BytesWritable)key);
- } else {
- ByteBuffer bb;
- bb = Text.encode(key.toString());
- hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
- }
- if (hashcode/frequency*frequency==hashcode)
- return true;
- } catch(Exception e) {
- LOG.warn(e);
- throw new RuntimeException(e);
- }
- return false;
- }
-
- private long MD5Hashcode(Text key) throws DigestException {
- return MD5Hashcode(key.getBytes(), 0, key.getLength());
- }
-
- private long MD5Hashcode(BytesWritable key) throws DigestException {
- return MD5Hashcode(key.getBytes(), 0, key.getLength());
- }
- synchronized private long MD5Hashcode(byte[] bytes,
- int start, int length) throws DigestException {
- DIGESTER.update(bytes, 0, length);
- DIGESTER.digest(digest, 0, MD5_LEN);
- long hashcode=0;
- for (int i = 0; i < 8; i++)
- hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
- return hashcode;
+ return mf.accept(key);
}
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsBinaryInputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * InputFormat reading keys, values from SequenceFiles in binary (raw)
+ * format.
+ */
+public class SequenceFileAsBinaryInputFormat
+ extends SequenceFileInputFormat<BytesWritable,BytesWritable> {
+
+ public SequenceFileAsBinaryInputFormat() {
+ super();
+ }
+
+ public RecordReader<BytesWritable,BytesWritable> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new SequenceFileAsBinaryRecordReader();
+ }
+
+ /**
+ * Read records from a SequenceFile as binary (raw) bytes.
+ */
+ public static class SequenceFileAsBinaryRecordReader
+ extends RecordReader<BytesWritable,BytesWritable> {
+ private SequenceFile.Reader in;
+ private long start;
+ private long end;
+ private boolean done = false;
+ private DataOutputBuffer buffer = new DataOutputBuffer();
+ private SequenceFile.ValueBytes vbytes;
+ private BytesWritable key = null;
+ private BytesWritable value = null;
+
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Path path = ((FileSplit)split).getPath();
+ Configuration conf = context.getConfiguration();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new SequenceFile.Reader(fs, path, conf);
+ this.end = ((FileSplit)split).getStart() + split.getLength();
+ if (((FileSplit)split).getStart() > in.getPosition()) {
+ in.sync(((FileSplit)split).getStart()); // sync to start
+ }
+ this.start = in.getPosition();
+ vbytes = in.createValueBytes();
+ done = start >= end;
+ }
+
+ @Override
+ public BytesWritable getCurrentKey()
+ throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue()
+ throws IOException, InterruptedException {
+ return value;
+ }
+
+ /**
+ * Retrieve the name of the key class for this SequenceFile.
+ * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
+ */
+ public String getKeyClassName() {
+ return in.getKeyClassName();
+ }
+
+ /**
+ * Retrieve the name of the value class for this SequenceFile.
+ * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
+ */
+ public String getValueClassName() {
+ return in.getValueClassName();
+ }
+
+ /**
+ * Read raw bytes from a SequenceFile.
+ */
+ public synchronized boolean nextKeyValue()
+ throws IOException, InterruptedException {
+ if (done) {
+ return false;
+ }
+ long pos = in.getPosition();
+ boolean eof = -1 == in.nextRawKey(buffer);
+ if (!eof) {
+ if (key == null) {
+ key = new BytesWritable();
+ }
+ if (value == null) {
+ value = new BytesWritable();
+ }
+ key.set(buffer.getData(), 0, buffer.getLength());
+ buffer.reset();
+ in.nextRawValue(vbytes);
+ vbytes.writeUncompressedBytes(buffer);
+ value.set(buffer.getData(), 0, buffer.getLength());
+ buffer.reset();
+ }
+ return !(done = (eof || (pos >= end && in.syncSeen())));
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Return the progress within the input split
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException, InterruptedException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (float)((in.getPosition() - start) /
+ (double)(end - start)));
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextInputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class is similar to SequenceFileInputFormat, except it generates
+ * SequenceFileAsTextRecordReader which converts the input keys and values
+ * to their String forms by calling toString() method.
+ */
+public class SequenceFileAsTextInputFormat
+ extends SequenceFileInputFormat<Text, Text> {
+
+ public SequenceFileAsTextInputFormat() {
+ super();
+ }
+
+ public RecordReader<Text, Text> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ context.setStatus(split.toString());
+ return new SequenceFileAsTextRecordReader();
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileAsTextRecordReader.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class converts the input keys and values to their String forms by
+ * calling toString() method. This class to SequenceFileAsTextInputFormat
+ * class is as LineRecordReader class to TextInputFormat class.
+ */
+public class SequenceFileAsTextRecordReader
+ extends RecordReader<Text, Text> {
+
+ private final SequenceFileRecordReader<WritableComparable<?>, Writable>
+ sequenceFileRecordReader;
+
+ private Text key;
+ private Text value;
+
+ public SequenceFileAsTextRecordReader()
+ throws IOException {
+ sequenceFileRecordReader =
+ new SequenceFileRecordReader<WritableComparable<?>, Writable>();
+ }
+
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ sequenceFileRecordReader.initialize(split, context);
+ }
+
+ @Override
+ public Text getCurrentKey()
+ throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public Text getCurrentValue()
+ throws IOException, InterruptedException {
+ return value;
+ }
+
+ /** Read key/value pair in a line. */
+ public synchronized boolean nextKeyValue()
+ throws IOException, InterruptedException {
+ if (!sequenceFileRecordReader.nextKeyValue()) {
+ return false;
+ }
+ if (key == null) {
+ key = new Text();
+ }
+ if (value == null) {
+ value = new Text();
+ }
+ key.set(sequenceFileRecordReader.getCurrentKey().toString());
+ value.set(sequenceFileRecordReader.getCurrentValue().toString());
+ return true;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return sequenceFileRecordReader.getProgress();
+ }
+
+ public synchronized void close() throws IOException {
+ sequenceFileRecordReader.close();
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFilter.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A class that allows a map/red job to work on a sample of sequence files.
+ * The sample is decided by the filter class set by the job.
+ */
+public class SequenceFileInputFilter<K, V>
+ extends SequenceFileInputFormat<K, V> {
+ public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+
+ final public static String FILTER_CLASS = "sequencefile.filter.class";
+ final private static String FILTER_FREQUENCY
+ = "sequencefile.filter.frequency";
+ final private static String FILTER_REGEX = "sequencefile.filter.regex";
+
+ public SequenceFileInputFilter() {
+ }
+
+ /** Create a record reader for the given split
+ * @param split file split
+ * @param context the task-attempt context
+ * @return RecordReader
+ */
+ public RecordReader<K, V> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException {
+ context.setStatus(split.toString());
+ return new FilterRecordReader<K, V>(context.getConfiguration());
+ }
+
+
+ /** set the filter class
+ *
+ * @param job The job
+ * @param filterClass filter class
+ */
+ public static void setFilterClass(Job job, Class<?> filterClass) {
+ job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
+ }
+
+
+ /**
+ * filter interface
+ */
+ public interface Filter extends Configurable {
+ /** filter function
+ * Decide if a record should be filtered or not
+ * @param key record key
+ * @return true if a record is accepted; return false otherwise
+ */
+ public abstract boolean accept(Object key);
+ }
+
+ /**
+ * base class for Filters
+ */
+ public static abstract class FilterBase implements Filter {
+ Configuration conf;
+
+ public Configuration getConf() {
+ return conf;
+ }
+ }
+
+ /** Records filter by matching key to regex
+ */
+ public static class RegexFilter extends FilterBase {
+ private Pattern p;
+ /** Define the filtering regex and stores it in conf
+ * @param conf where the regex is set
+ * @param regex regex used as a filter
+ */
+ public static void setPattern(Configuration conf, String regex)
+ throws PatternSyntaxException {
+ try {
+ Pattern.compile(regex);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Invalid pattern: "+regex);
+ }
+ conf.set(FILTER_REGEX, regex);
+ }
+
+ public RegexFilter() { }
+
+ /** configure the Filter by checking the configuration
+ */
+ public void setConf(Configuration conf) {
+ String regex = conf.get(FILTER_REGEX);
+ if (regex == null)
+ throw new RuntimeException(FILTER_REGEX + "not set");
+ this.p = Pattern.compile(regex);
+ this.conf = conf;
+ }
+
+
+ /** Filtering method
+ * If key matches the regex, return true; otherwise return false
+ * @see Filter#accept(Object)
+ */
+ public boolean accept(Object key) {
+ return p.matcher(key.toString()).matches();
+ }
+ }
+
+ /** This class returns a percentage of records
+ * The percentage is determined by a filtering frequency <i>f</i> using
+ * the criteria record# % f == 0.
+ * For example, if the frequency is 10, one out of 10 records is returned.
+ */
+ public static class PercentFilter extends FilterBase {
+ private int frequency;
+ private int count;
+
+ /** set the frequency and stores it in conf
+ * @param conf configuration
+ * @param frequency filtering frequencey
+ */
+ public static void setFrequency(Configuration conf, int frequency) {
+ if (frequency <= 0)
+ throw new IllegalArgumentException(
+ "Negative " + FILTER_FREQUENCY + ": " + frequency);
+ conf.setInt(FILTER_FREQUENCY, frequency);
+ }
+
+ public PercentFilter() { }
+
+ /** configure the filter by checking the configuration
+ *
+ * @param conf configuration
+ */
+ public void setConf(Configuration conf) {
+ this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+ if (this.frequency <= 0) {
+ throw new RuntimeException(
+ "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
+ }
+ this.conf = conf;
+ }
+
+ /** Filtering method
+ * If record# % frequency==0, return true; otherwise return false
+ * @see Filter#accept(Object)
+ */
+ public boolean accept(Object key) {
+ boolean accepted = false;
+ if (count == 0)
+ accepted = true;
+ if (++count == frequency) {
+ count = 0;
+ }
+ return accepted;
+ }
+ }
+
+ /** This class returns a set of records by examing the MD5 digest of its
+ * key against a filtering frequency <i>f</i>. The filtering criteria is
+ * MD5(key) % f == 0.
+ */
+ public static class MD5Filter extends FilterBase {
+ private int frequency;
+ private static final MessageDigest DIGESTER;
+ public static final int MD5_LEN = 16;
+ private byte [] digest = new byte[MD5_LEN];
+
+ static {
+ try {
+ DIGESTER = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /** set the filtering frequency in configuration
+ *
+ * @param conf configuration
+ * @param frequency filtering frequency
+ */
+ public static void setFrequency(Configuration conf, int frequency) {
+ if (frequency <= 0)
+ throw new IllegalArgumentException(
+ "Negative " + FILTER_FREQUENCY + ": " + frequency);
+ conf.setInt(FILTER_FREQUENCY, frequency);
+ }
+
+ public MD5Filter() { }
+
+ /** configure the filter according to configuration
+ *
+ * @param conf configuration
+ */
+ public void setConf(Configuration conf) {
+ this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+ if (this.frequency <= 0) {
+ throw new RuntimeException(
+ "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
+ }
+ this.conf = conf;
+ }
+
+ /** Filtering method
+ * If MD5(key) % frequency==0, return true; otherwise return false
+ * @see Filter#accept(Object)
+ */
+ public boolean accept(Object key) {
+ try {
+ long hashcode;
+ if (key instanceof Text) {
+ hashcode = MD5Hashcode((Text)key);
+ } else if (key instanceof BytesWritable) {
+ hashcode = MD5Hashcode((BytesWritable)key);
+ } else {
+ ByteBuffer bb;
+ bb = Text.encode(key.toString());
+ hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
+ }
+ if (hashcode / frequency * frequency == hashcode)
+ return true;
+ } catch(Exception e) {
+ LOG.warn(e);
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ private long MD5Hashcode(Text key) throws DigestException {
+ return MD5Hashcode(key.getBytes(), 0, key.getLength());
+ }
+
+ private long MD5Hashcode(BytesWritable key) throws DigestException {
+ return MD5Hashcode(key.getBytes(), 0, key.getLength());
+ }
+
+ synchronized private long MD5Hashcode(byte[] bytes,
+ int start, int length) throws DigestException {
+ DIGESTER.update(bytes, 0, length);
+ DIGESTER.digest(digest, 0, MD5_LEN);
+ long hashcode=0;
+ for (int i = 0; i < 8; i++)
+ hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
+ return hashcode;
+ }
+ }
+
+ private static class FilterRecordReader<K, V>
+ extends SequenceFileRecordReader<K, V> {
+
+ private Filter filter;
+ private K key;
+ private V value;
+
+ public FilterRecordReader(Configuration conf)
+ throws IOException {
+ super();
+ // instantiate filter
+ filter = (Filter)ReflectionUtils.newInstance(
+ conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
+ }
+
+ public synchronized boolean nextKeyValue()
+ throws IOException, InterruptedException {
+ while (super.nextKeyValue()) {
+ key = super.getCurrentKey();
+ if (filter.accept(key)) {
+ value = super.getCurrentValue();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public K getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public V getCurrentValue() {
+ return value;
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes keys,
+ * values to {@link SequenceFile}s in binary(raw) format
+ */
+public class SequenceFileAsBinaryOutputFormat
+ extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+
+ /**
+ * Inner class used for appendRaw
+ */
+ static public class WritableValueBytes implements ValueBytes {
+ private BytesWritable value;
+
+ public WritableValueBytes() {
+ this.value = null;
+ }
+
+ public WritableValueBytes(BytesWritable value) {
+ this.value = value;
+ }
+
+ public void reset(BytesWritable value) {
+ this.value = value;
+ }
+
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException {
+ outStream.write(value.getBytes(), 0, value.getLength());
+ }
+
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ throw new UnsupportedOperationException(
+ "WritableValueBytes doesn't support RECORD compression");
+ }
+
+ public int getSize(){
+ return value.getLength();
+ }
+ }
+
+ /**
+ * Set the key class for the {@link SequenceFile}
+ * <p>This allows the user to specify the key class to be different
+ * from the actual class ({@link BytesWritable}) used for writing </p>
+ *
+ * @param job the {@link Job} to modify
+ * @param theClass the SequenceFile output key class.
+ */
+ static public void setSequenceFileOutputKeyClass(Job job,
+ Class<?> theClass) {
+ job.getConfiguration().setClass("mapred.seqbinary.output.key.class",
+ theClass, Object.class);
+ }
+
+ /**
+ * Set the value class for the {@link SequenceFile}
+ * <p>This allows the user to specify the value class to be different
+ * from the actual class ({@link BytesWritable}) used for writing </p>
+ *
+ * @param job the {@link Job} to modify
+ * @param theClass the SequenceFile output key class.
+ */
+ static public void setSequenceFileOutputValueClass(Job job,
+ Class<?> theClass) {
+ job.getConfiguration().setClass("mapred.seqbinary.output.value.class",
+ theClass, Object.class);
+ }
+
+ /**
+ * Get the key class for the {@link SequenceFile}
+ *
+ * @return the key class of the {@link SequenceFile}
+ */
+ static public Class<? extends WritableComparable>
+ getSequenceFileOutputKeyClass(JobContext job) {
+ return job.getConfiguration().getClass("mapred.seqbinary.output.key.class",
+ job.getOutputKeyClass().asSubclass(WritableComparable.class),
+ WritableComparable.class);
+ }
+
+ /**
+ * Get the value class for the {@link SequenceFile}
+ *
+ * @return the value class of the {@link SequenceFile}
+ */
+ static public Class<? extends Writable> getSequenceFileOutputValueClass(
+ JobContext job) {
+ return job.getConfiguration().getClass(
+ "mapred.seqbinary.output.value.class",
+ job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
+ }
+
+ @Override
+ public RecordWriter<BytesWritable, BytesWritable> getRecordWriter(
+ TaskAttemptContext context) throws IOException {
+ final SequenceFile.Writer out = getSequenceWriter(context,
+ getSequenceFileOutputKeyClass(context),
+ getSequenceFileOutputValueClass(context));
+
+ return new RecordWriter<BytesWritable, BytesWritable>() {
+ private WritableValueBytes wvaluebytes = new WritableValueBytes();
+
+ public void write(BytesWritable bkey, BytesWritable bvalue)
+ throws IOException {
+ wvaluebytes.reset(bvalue);
+ out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
+ wvaluebytes.reset(null);
+ }
+
+ public void close(TaskAttemptContext context) throws IOException {
+ out.close();
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext job) throws IOException {
+ super.checkOutputSpecs(job);
+ if (getCompressOutput(job) &&
+ getOutputCompressionType(job) == CompressionType.RECORD ) {
+ throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+ + "doesn't support Record Compression" );
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Mon Aug 3 07:34:20 2009
@@ -38,17 +38,16 @@
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {
- public RecordWriter<K, V>
- getRecordWriter(TaskAttemptContext context
- ) throws IOException, InterruptedException {
+ protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
+ Class<?> keyClass, Class<?> valueClass)
+ throws IOException {
Configuration conf = context.getConfiguration();
-
+
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(context)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(context);
-
// find the right codec
Class<?> codecClass = getOutputCompressorClass(context,
DefaultCodec.class);
@@ -58,13 +57,19 @@
// get the path of the temporary output file
Path file = getDefaultWorkFile(context, "");
FileSystem fs = file.getFileSystem(conf);
- final SequenceFile.Writer out =
- SequenceFile.createWriter(fs, conf, file,
- context.getOutputKeyClass(),
- context.getOutputValueClass(),
- compressionType,
- codec,
- context);
+ return SequenceFile.createWriter(fs, conf, file,
+ keyClass,
+ valueClass,
+ compressionType,
+ codec,
+ context);
+ }
+
+ public RecordWriter<K, V>
+ getRecordWriter(TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ final SequenceFile.Writer out = getSequenceWriter(context,
+ context.getOutputKeyClass(), context.getOutputValueClass());
return new RecordWriter<K, V>() {
Modified: hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml?rev=800232&r1=800231&r2=800232&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml Mon Aug 3 07:34:20 2009
@@ -52,6 +52,10 @@
<Class name="~org.apache.hadoop.mapred.lib.aggregate.*" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+ </Match>
<!--
Ignore warnings for usage of System.exit. This is
required and have been well thought out
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsBinaryInputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import junit.framework.TestCase;
+
+public class TestMRSequenceFileAsBinaryInputFormat extends TestCase {
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException, InterruptedException {
+ Job job = new Job();
+ FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "testbinary.seq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ fs.delete(dir, true);
+ FileInputFormat.setInputPaths(job, dir);
+
+ Text tkey = new Text();
+ Text tval = new Text();
+
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+ job.getConfiguration(), file, Text.class, Text.class);
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ writer.append(tkey, tval);
+ }
+ } finally {
+ writer.close();
+ }
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ InputFormat<BytesWritable,BytesWritable> bformat =
+ new SequenceFileAsBinaryInputFormat();
+
+ int count = 0;
+ r.setSeed(seed);
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+ Text cmpkey = new Text();
+ Text cmpval = new Text();
+ DataInputBuffer buf = new DataInputBuffer();
+ FileInputFormat.setInputPaths(job, file);
+ for (InputSplit split : bformat.getSplits(job)) {
+ RecordReader<BytesWritable, BytesWritable> reader =
+ bformat.createRecordReader(split, context);
+ MapContext<BytesWritable, BytesWritable, BytesWritable, BytesWritable>
+ mcontext = new MapContext<BytesWritable, BytesWritable,
+ BytesWritable, BytesWritable>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ try {
+ while (reader.nextKeyValue()) {
+ bkey = reader.getCurrentKey();
+ bval = reader.getCurrentValue();
+ tkey.set(Integer.toString(r.nextInt(), 36));
+ tval.set(Long.toString(r.nextLong(), 36));
+ buf.reset(bkey.getBytes(), bkey.getLength());
+ cmpkey.readFields(buf);
+ buf.reset(bval.getBytes(), bval.getLength());
+ cmpval.readFields(buf);
+ assertTrue(
+ "Keys don't match: " + "*" + cmpkey.toString() + ":" +
+ tkey.toString() + "*",
+ cmpkey.toString().equals(tkey.toString()));
+ assertTrue(
+ "Vals don't match: " + "*" + cmpval.toString() + ":" +
+ tval.toString() + "*",
+ cmpval.toString().equals(tval.toString()));
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileAsTextInputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileAsTextInputFormat extends TestCase {
+ private static int MAX_LENGTH = 10000;
+ private static Configuration conf = new Configuration();
+
+ public void testFormat() throws Exception {
+ Job job = new Job(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "test.seq");
+
+ int seed = new Random().nextInt();
+ Random random = new Random(seed);
+
+ fs.delete(dir, true);
+
+ FileInputFormat.setInputPaths(job, dir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, file,
+ IntWritable.class, LongWritable.class);
+ try {
+ for (int i = 0; i < length; i++) {
+ IntWritable key = new IntWritable(i);
+ LongWritable value = new LongWritable(10 * i);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ // try splitting the file in a variety of sizes
+ InputFormat<Text, Text> format =
+ new SequenceFileAsTextInputFormat();
+
+ for (int i = 0; i < 3; i++) {
+ // check each split
+ BitSet bits = new BitSet(length);
+ int numSplits =
+ random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+ FileInputFormat.setMaxInputSplitSize(job,
+ fs.getFileStatus(file).getLen() / numSplits);
+ for (InputSplit split : format.getSplits(job)) {
+ RecordReader<Text, Text> reader =
+ format.createRecordReader(split, context);
+ MapContext<Text, Text, Text, Text> mcontext =
+ new MapContext<Text, Text, Text, Text>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ Class<?> readerClass = reader.getClass();
+ assertEquals("reader class is SequenceFileAsTextRecordReader.",
+ SequenceFileAsTextRecordReader.class, readerClass);
+ Text key;
+ try {
+ int count = 0;
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ int keyInt = Integer.parseInt(key.toString());
+ assertFalse("Key in multiple partitions.", bits.get(keyInt));
+ bits.set(keyInt);
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestMRSequenceFileAsTextInputFormat().testFormat();
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMRSequenceFileInputFilter.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.*;
+
+public class TestMRSequenceFileInputFilter extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestMRSequenceFileInputFilter.class.getName());
+
+ private static final int MAX_LENGTH = 15000;
+ private static final Configuration conf = new Configuration();
+ private static final Job job;
+ private static final FileSystem fs;
+ private static final Path inDir =
+ new Path(System.getProperty("test.build.data",".") + "/mapred");
+ private static final Path inFile = new Path(inDir, "test.seq");
+ private static final Random random = new Random(1);
+
+ static {
+ try {
+ job = new Job(conf);
+ FileInputFormat.setInputPaths(job, inDir);
+ fs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void createSequenceFile(int numRecords) throws Exception {
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, inFile,
+ Text.class, BytesWritable.class);
+ try {
+ for (int i = 1; i <= numRecords; i++) {
+ Text key = new Text(Integer.toString(i));
+ byte[] data = new byte[random.nextInt(10)];
+ random.nextBytes(data);
+ BytesWritable value = new BytesWritable(data);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+ }
+
+
+ private int countRecords(int numSplits)
+ throws IOException, InterruptedException {
+ InputFormat<Text, BytesWritable> format =
+ new SequenceFileInputFilter<Text, BytesWritable>();
+ if (numSplits == 0) {
+ numSplits =
+ random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
+ }
+ FileInputFormat.setMaxInputSplitSize(job,
+ fs.getFileStatus(inFile).getLen() / numSplits);
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ // check each split
+ int count = 0;
+ for (InputSplit split : format.getSplits(job)) {
+ RecordReader<Text, BytesWritable> reader =
+ format.createRecordReader(split, context);
+ MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
+ new MapContext<Text, BytesWritable, Text, BytesWritable>(
+ job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ try {
+ while (reader.nextKeyValue()) {
+ LOG.info("Accept record " + reader.getCurrentKey().toString());
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ return count;
+ }
+
+ public void testRegexFilter() throws Exception {
+ // set the filter class
+ LOG.info("Testing Regex Filter with patter: \\A10*");
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.RegexFilter.class);
+ SequenceFileInputFilter.RegexFilter.setPattern(
+ job.getConfiguration(), "\\A10*");
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 1; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: " + length);
+ createSequenceFile(length);
+ int count = countRecords(0);
+ assertEquals(count, length==0 ? 0 : (int)Math.log10(length) + 1);
+ }
+
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public void testPercentFilter() throws Exception {
+ LOG.info("Testing Percent Filter with frequency: 1000");
+ // set the filter class
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.PercentFilter.class);
+ SequenceFileInputFilter.PercentFilter.setFrequency(
+ job.getConfiguration(), 1000);
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: "+length);
+ createSequenceFile(length);
+ int count = countRecords(1);
+ LOG.info("Accepted " + count + " records");
+ int expectedCount = length / 1000;
+ if (expectedCount * 1000 != length)
+ expectedCount++;
+ assertEquals(count, expectedCount);
+ }
+
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public void testMD5Filter() throws Exception {
+ // set the filter class
+ LOG.info("Testing MD5 Filter with frequency: 1000");
+ SequenceFileInputFilter.setFilterClass(job,
+ SequenceFileInputFilter.MD5Filter.class);
+ SequenceFileInputFilter.MD5Filter.setFrequency(
+ job.getConfiguration(), 1000);
+
+ // clean input dir
+ fs.delete(inDir, true);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ LOG.info("******Number of records: " + length);
+ createSequenceFile(length);
+ LOG.info("Accepted " + countRecords(0) + " records");
+ }
+ // clean up
+ fs.delete(inDir, true);
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestMRSequenceFileInputFilter filter = new TestMRSequenceFileInputFilter();
+ filter.testRegexFilter();
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java?rev=800232&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestMRSequenceFileAsBinaryOutputFormat.java Mon Aug 3 07:34:20 2009
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.*;
+
+public class TestMRSequenceFileAsBinaryOutputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestMRSequenceFileAsBinaryOutputFormat.class.getName());
+
+ private static final int RECORDS = 10000;
+
+ public void testBinary() throws IOException, InterruptedException {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf);
+
+ Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
+ "outseq");
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+
+ FileOutputFormat.setOutputPath(job, outdir);
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+
+ BytesWritable bkey = new BytesWritable();
+ BytesWritable bval = new BytesWritable();
+
+ TaskAttemptContext context =
+ MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
+ OutputFormat<BytesWritable, BytesWritable> outputFormat =
+ new SequenceFileAsBinaryOutputFormat();
+ OutputCommitter committer = outputFormat.getOutputCommitter(context);
+ committer.setupJob(job);
+ RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
+ getRecordWriter(context);
+
+ IntWritable iwritable = new IntWritable();
+ DoubleWritable dwritable = new DoubleWritable();
+ DataOutputBuffer outbuf = new DataOutputBuffer();
+ LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
+ try {
+ for (int i = 0; i < RECORDS; ++i) {
+ iwritable = new IntWritable(r.nextInt());
+ iwritable.write(outbuf);
+ bkey.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ dwritable = new DoubleWritable(r.nextDouble());
+ dwritable.write(outbuf);
+ bval.set(outbuf.getData(), 0, outbuf.getLength());
+ outbuf.reset();
+ writer.write(bkey, bval);
+ }
+ } finally {
+ writer.close(context);
+ }
+ committer.commitTask(context);
+ committer.cleanupJob(job);
+
+ InputFormat<IntWritable, DoubleWritable> iformat =
+ new SequenceFileInputFormat<IntWritable, DoubleWritable>();
+ int count = 0;
+ r.setSeed(seed);
+ SequenceFileInputFormat.setInputPaths(job, outdir);
+ LOG.info("Reading data by SequenceFileInputFormat");
+ for (InputSplit split : iformat.getSplits(job)) {
+ RecordReader<IntWritable, DoubleWritable> reader =
+ iformat.createRecordReader(split, context);
+ MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
+ mcontext = new MapContext<IntWritable, DoubleWritable,
+ BytesWritable, BytesWritable>(job.getConfiguration(),
+ context.getTaskAttemptID(), reader, null, null,
+ MapReduceTestUtil.createDummyReporter(),
+ split);
+ reader.initialize(split, mcontext);
+ try {
+ int sourceInt;
+ double sourceDouble;
+ while (reader.nextKeyValue()) {
+ sourceInt = r.nextInt();
+ sourceDouble = r.nextDouble();
+ iwritable = reader.getCurrentKey();
+ dwritable = reader.getCurrentValue();
+ assertEquals(
+ "Keys don't match: " + "*" + iwritable.get() + ":" +
+ sourceInt + "*",
+ sourceInt, iwritable.get());
+ assertTrue(
+ "Vals don't match: " + "*" + dwritable.get() + ":" +
+ sourceDouble + "*",
+ Double.compare(dwritable.get(), sourceDouble) == 0 );
+ ++count;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some records not found", RECORDS, count);
+ }
+
+ public void testSequenceOutputClassDefaultsToMapRedOutputClass()
+ throws IOException {
+ Job job = new Job();
+ // Setting Random class to test getSequenceFileOutput{Key,Value}Class
+ job.setOutputKeyClass(FloatWritable.class);
+ job.setOutputValueClass(BooleanWritable.class);
+
+ assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass",
+ FloatWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+ assertEquals("SequenceFileOutputValueClass should default to "
+ + "ouputValueClass",
+ BooleanWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
+ IntWritable.class );
+ SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
+ DoubleWritable.class );
+
+ assertEquals("SequenceFileOutputKeyClass not updated",
+ IntWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
+ assertEquals("SequenceFileOutputValueClass not updated",
+ DoubleWritable.class,
+ SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
+ }
+
+ public void testcheckOutputSpecsForbidRecordCompression()
+ throws IOException {
+ Job job = new Job();
+ FileSystem fs = FileSystem.getLocal(job.getConfiguration());
+ Path outputdir = new Path(System.getProperty("test.build.data", "/tmp")
+ + "/output");
+ fs.delete(outputdir, true);
+
+ // Without outputpath, FileOutputFormat.checkoutputspecs will throw
+ // InvalidJobConfException
+ FileOutputFormat.setOutputPath(job, outputdir);
+
+ // SequenceFileAsBinaryOutputFormat doesn't support record compression
+ // It should throw an exception when checked by checkOutputSpecs
+ SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.BLOCK);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+ } catch (Exception e) {
+ fail("Block compression should be allowed for "
+ + "SequenceFileAsBinaryOutputFormat:Caught " + e.getClass().getName());
+ }
+
+ SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
+ CompressionType.RECORD);
+ try {
+ new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
+ fail("Record compression should not be allowed for "
+ + "SequenceFileAsBinaryOutputFormat");
+ } catch (InvalidJobConfException ie) {
+ // expected
+ } catch (Exception e) {
+ fail("Expected " + InvalidJobConfException.class.getName()
+ + "but caught " + e.getClass().getName() );
+ }
+ }
+}
|