avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r955064 - in /avro/trunk/lang/java/src: java/org/apache/avro/mapred/AvroUtf8InputFormat.java test/java/org/apache/avro/mapred/TestWordCountGeneric.java test/java/org/apache/avro/mapred/WordCountUtil.java
Date Tue, 15 Jun 2010 21:50:29 GMT
Author: cutting
Date: Tue Jun 15 21:50:28 2010
New Revision: 955064

URL: http://svn.apache.org/viewvc?rev=955064&view=rev
Log:
AVRO-577. Java: add MapReduce InputFormat for plain-text files.  Contributed by Tom White.

Added:
    avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
Modified:
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Added: avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java?rev=955064&view=auto
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java (added)
+++ avro/trunk/lang/java/src/java/org/apache/avro/mapred/AvroUtf8InputFormat.java Tue Jun
15 21:50:28 2010
@@ -0,0 +1,98 @@
+package org.apache.avro.mapred;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for text files.
+ * Each line is a {@link Utf8} key; values are null.
+ */
+public class AvroUtf8InputFormat
+  extends FileInputFormat<AvroWrapper<Utf8>, NullWritable>
+  implements JobConfigurable {
+
+  static class Utf8LineRecordReader implements
+    RecordReader<AvroWrapper<Utf8>, NullWritable> {
+
+    private LineRecordReader lineRecordReader;
+    
+    private LongWritable currentKeyHolder = new LongWritable();
+    private Text currentValueHolder = new Text();
+    
+    public Utf8LineRecordReader(Configuration job, 
+        FileSplit split) throws IOException {
+      this.lineRecordReader = new LineRecordReader(job, split);
+    }
+    
+    public void close() throws IOException {
+      lineRecordReader.close();
+    }
+
+    public long getPos() throws IOException {
+      return lineRecordReader.getPos();
+    }
+
+    public float getProgress() throws IOException {
+      return lineRecordReader.getProgress();
+    }
+
+    public boolean next(AvroWrapper<Utf8> key, NullWritable value)
+      throws IOException {
+      boolean success = lineRecordReader.next(currentKeyHolder,
+          currentValueHolder);
+      if (success) {
+        key.datum(new Utf8(currentValueHolder.getBytes())
+            .setLength(currentValueHolder.getLength()));
+      } else {
+        key.datum(null);
+      }
+      return success;
+    }
+
+    @Override
+    public AvroWrapper<Utf8> createKey() {
+      return new AvroWrapper<Utf8>(null);
+    }
+
+    @Override
+    public NullWritable createValue() {
+      return NullWritable.get();
+    }
+
+  }
+
+  private CompressionCodecFactory compressionCodecs = null;
+
+  public void configure(JobConf conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    return compressionCodecs.getCodec(file) == null;
+  }
+
+  @Override
+  public RecordReader<AvroWrapper<Utf8>, NullWritable>
+    getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+    throws IOException {
+
+    reporter.setStatus(split.toString());
+    return new Utf8LineRecordReader(job, (FileSplit) split);
+  }
+
+}

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java?rev=955064&r1=955063&r2=955064&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/TestWordCountGeneric.java Tue
Jun 15 21:50:28 2010
@@ -103,5 +103,35 @@ public class TestWordCountGeneric {
       outputPath.getFileSystem(job).delete(outputPath);
     }
   }
+  
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testAvroUtf8InputFormat() throws Exception {
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+    JobConf job = new JobConf();
+    try {
+      WordCountUtil.writeLinesTextFile();
+  
+      job.setJobName("wordcount");
+   
+      job.setInputFormat(AvroUtf8InputFormat.class);
+      AvroJob.setOutputGeneric(job, WordCount.SCHEMA$);
+  
+      job.setMapperClass(MapImpl.class);        
+      job.setCombinerClass(ReduceImpl.class);
+      job.setReducerClass(ReduceImpl.class);
+  
+      FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+      FileOutputFormat.setOutputPath(job, outputPath);
+      FileOutputFormat.setCompressOutput(job, true);
+  
+      JobClient.runJob(job);
+  
+      WordCountUtil.validateCountsFile();
+    } finally {
+      outputPath.getFileSystem(job).delete(outputPath);
+    }
+  }
 
 }

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=955064&r1=955063&r2=955064&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/WordCountUtil.java Tue Jun 15
21:50:28 2010
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.InputStream;
 import java.io.FileInputStream;
 import java.io.BufferedInputStream;
+import java.io.PrintStream;
 import java.util.StringTokenizer;
 import java.util.Map;
 import java.util.TreeMap;
@@ -46,6 +47,8 @@ class WordCountUtil {
     = new File(System.getProperty("test.dir", ".") + "/mapred");
   private static final File LINES_FILE
     = new File(new File(DIR, "in"), "lines.avro");
+  private static final File LINES_TEXT_FILE
+    = new File(new File(DIR, "in"), "lines.txt");
   private static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
 
@@ -79,6 +82,15 @@ class WordCountUtil {
       out.append(new Utf8(line));
     out.close();
   }
+  
+  public static void writeLinesTextFile() throws IOException {
+    FileUtil.fullyDelete(DIR);
+    LINES_FILE.getParentFile().mkdirs();
+    PrintStream out = new PrintStream(LINES_TEXT_FILE);
+    for (String line : LINES)
+      out.println(line);
+    out.close();
+  }
 
   public static void validateCountsFile() throws IOException {
     DatumReader<WordCount> reader = new SpecificDatumReader<WordCount>();



Mime
View raw message