avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1129056 - in /avro/branches/branch-1.5: ./ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Date Mon, 30 May 2011 08:31:28 GMT
Author: cutting
Date: Mon May 30 08:31:28 2011
New Revision: 1129056

URL: http://svn.apache.org/viewvc?rev=1129056&view=rev
Log:
Merge -c 1129053 from trunk to 1.5 branch.  Fixes: AVRO-830.

Added:
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java
      - copied unchanged from r1129053, avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroTextOutputFormat.java
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java
      - copied unchanged from r1129053, avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextOutputFormat.java
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java
      - copied unchanged from r1129053, avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroTextSort.java
Removed:
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroAsTextInputFormat.java
Modified:
    avro/branches/branch-1.5/   (props changed)
    avro/branches/branch-1.5/CHANGES.txt
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May 30 08:31:28 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1102332,1102335,1124127,1124971
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1102332,1102335,1124127,1124971,1129053

Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1129056&r1=1129055&r2=1129056&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Mon May 30 08:31:28 2011
@@ -4,6 +4,10 @@ Avro 1.5.2 (unreleased)
 
   NEW FEATURES
 
+    AVRO-830. Java: Add AvroTextOutputFormat to permit Hadoop
+    streaming jobs to easily write Avro format output with "bytes" as
+    schema.  (Tom White via cutting)
+
   IMPROVEMENTS
 
     AVRO-820. Java: Permit applications to catch exceptions thrown

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1129056&r1=1129055&r2=1129056&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
Mon May 30 08:31:28 2011
@@ -19,6 +19,7 @@
 package org.apache.avro.mapred;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.Map;
 import java.net.URLDecoder;
 
@@ -64,21 +65,9 @@ public class AvroOutputFormat <T>
   public static void setSyncInterval(JobConf job, int syncIntervalInBytes) {
     job.setInt(SYNC_INTERVAL_KEY, syncIntervalInBytes);
   }
-
-  @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable>
-    getRecordWriter(FileSystem ignore, JobConf job,
-                    String name, Progressable prog)
-    throws IOException {
-
-    boolean isMapOnly = job.getNumReduceTasks() == 0;
-    Schema schema = isMapOnly
-      ? AvroJob.getMapOutputSchema(job)
-      : AvroJob.getOutputSchema(job);
-
-    final DataFileWriter<T> writer =
-      new DataFileWriter<T>(new ReflectDatumWriter<T>());
-
+  
+  static <T> void configureDataFileWriter(DataFileWriter<T> writer,
+      JobConf job) throws UnsupportedEncodingException {
     if (FileOutputFormat.getCompressOutput(job)) {
       int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
@@ -87,7 +76,7 @@ public class AvroOutputFormat <T>
         : CodecFactory.fromString(codecName);
       writer.setCodec(factory);
     }
-
+    
     writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, DEFAULT_SYNC_INTERVAL));
 
     // copy metadata from job
@@ -100,6 +89,23 @@ public class AvroOutputFormat <T>
                        URLDecoder.decode(e.getValue(), "ISO-8859-1")
                        .getBytes("ISO-8859-1"));
     }
+  }
+
+  @Override
+  public RecordWriter<AvroWrapper<T>, NullWritable>
+    getRecordWriter(FileSystem ignore, JobConf job,
+                    String name, Progressable prog)
+    throws IOException {
+
+    boolean isMapOnly = job.getNumReduceTasks() == 0;
+    Schema schema = isMapOnly
+      ? AvroJob.getMapOutputSchema(job)
+      : AvroJob.getOutputSchema(job);
+
+    final DataFileWriter<T> writer =
+      new DataFileWriter<T>(new ReflectDatumWriter<T>());
+    
+    configureDataFileWriter(writer, job);
 
     Path path = FileOutputFormat.getTaskOutputPath(job, name+EXT);
     writer.create(schema, path.getFileSystem(job).create(path));

Modified: avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1129056&r1=1129055&r2=1129056&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Mon May 30 08:31:28 2011
@@ -44,6 +44,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.util.Utf8;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.file.DataFileWriter;
@@ -60,7 +61,7 @@ class WordCountUtil {
   private static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
   private static final File SORTED_FILE
-    = new File(new File(DIR, "out"), "part-00000");
+    = new File(new File(DIR, "out"), "part-00000.avro");
 
   public static final String[] LINES = new String[] {
     "the quick brown fox jumps over the lazy dog",
@@ -131,18 +132,25 @@ class WordCountUtil {
   }
   
   public static void validateSortedFile() throws Exception {
-    BufferedReader reader = new BufferedReader(new FileReader(SORTED_FILE));
+    DatumReader<ByteBuffer> reader = new GenericDatumReader<ByteBuffer>();
+    InputStream in = new BufferedInputStream(
+        new FileInputStream(SORTED_FILE));
+    DataFileStream<ByteBuffer> lines =
+        new DataFileStream<ByteBuffer>(in,reader);
     List<String> sortedLines = new ArrayList<String>();
     for (String line : LINES) {
       sortedLines.add(line);
     }
     Collections.sort(sortedLines);
     for (String expectedLine : sortedLines) {
-      assertEquals(expectedLine, reader.readLine().trim());
+      ByteBuffer buf = lines.next();
+      byte[] b = new byte[buf.remaining()];
+      buf.get(b);
+      assertEquals(expectedLine, new String(b, "UTF-8").trim());
     }
-    assertNull(reader.readLine());
+    assertFalse(lines.hasNext());
   }
-
+  
   // metadata tests
   private static final String STRING_KEY = "string-key";
   private static final String LONG_KEY = "long-key";



Mime
View raw message