avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1356503 - in /avro/trunk: CHANGES.txt lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
Date Mon, 02 Jul 2012 22:12:45 GMT
Author: cutting
Date: Mon Jul  2 22:12:43 2012
New Revision: 1356503

URL: http://svn.apache.org/viewvc?rev=1356503&view=rev
Log:
AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with map-only jobs.  Contributed
by Ashish Nagavaram.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Jul  2 22:12:43 2012
@@ -21,6 +21,9 @@ Avro 1.7.1 (unreleased)
 
   IMPROVEMENTS
 
+    AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with
+    map-only jobs. (Ashish Nagavaram via cutting)
+
   BUG FIXES
 
     AVRO-1114. Java: Update license headers for new mapreduce code.  (cutting)

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
Mon Jul  2 22:12:43 2012
@@ -70,7 +70,7 @@ import org.apache.hadoop.io.NullWritable
  * Usage pattern for job submission:
  * <pre>
  *
- * Job job = new Job();
+ * JobConf job = new JobConf();
  *
  * FileInputFormat.setInputPath(job, inDir);
  * FileOutputFormat.setOutputPath(job, outDir);
@@ -526,15 +526,19 @@ public class AvroMultipleOutputs {
    public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
 
    @SuppressWarnings({"unchecked"})
-   public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String
baseFileName, Progressable arg3) throws IOException
-   {
+   public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf job, String
baseFileName, Progressable arg3) throws IOException {
    String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
    String fileName = getUniqueName(job, baseFileName);
    Schema schema = schemaList.get(nameOutput+"_SCHEMA");
    JobConf outputConf = new JobConf(job);
    outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
-   if(schema!=null)
-    AvroJob.setOutputSchema(outputConf,schema);
+   boolean isMapOnly = job.getNumReduceTasks() == 0;
+   if (schema != null) {
+     if (isMapOnly)
+       AvroJob.setMapOutputSchema(outputConf, schema);
+     else
+       AvroJob.setOutputSchema(outputConf, schema);
+   }
    OutputFormat outputFormat = outputConf.getOutputFormat();
    return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3);
    }   

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
Mon Jul  2 22:12:43 2012
@@ -45,19 +45,31 @@ import org.junit.Test;
 
 public class TestAvroMultipleOutputs {
 
-      private static final String UTF8 = "UTF-8";
+  private static final String UTF8 = "UTF-8";
 
   public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long>> {
- 
-  
+    private AvroMultipleOutputs amos;
+
+    public void configure(JobConf Job) {
+      this.amos = new AvroMultipleOutputs(Job);
+    }
 
     @Override
       public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
                       Reporter reporter) throws IOException {
       StringTokenizer tokens = new StringTokenizer(text.toString());
-      while (tokens.hasMoreTokens())
-        collector.collect(new Pair<Utf8,Long>(new Utf8(tokens.nextToken()),1L));
+      while (tokens.hasMoreTokens()) {
+        String tok = tokens.nextToken();
+        collector.collect(new Pair<Utf8,Long>(new Utf8(tok),1L));
+        amos.getCollector("myavro2",reporter)
+          .collect(new Pair<Utf8,Long>(new Utf8(tok),1L).toString());
+      }
+        
+    }
+    public void close() throws IOException {
+      amos.close();
     }
+
   }
   
   public static class ReduceImpl
@@ -91,6 +103,8 @@ public class TestAvroMultipleOutputs {
     testJob();
     testProjection();
     testProjection1();
+    testJob_noreducer();
+    testProjection_noreducer();
   }
   
   @SuppressWarnings("deprecation")
@@ -118,7 +132,7 @@ public class TestAvroMultipleOutputs {
     FileOutputFormat.setCompressOutput(job, false);
     AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroOutputFormat.class, new Pair<Utf8,Long>(new
Utf8(""), 0L).getSchema());
     AvroMultipleOutputs.addNamedOutput(job,"myavro1",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
-    
+    AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
  
     WordCountUtil.setMeta(job);
 
 
@@ -201,4 +215,49 @@ public class TestAvroMultipleOutputs {
     }
     Assert.assertEquals(sumOfCounts, actualSumOfCounts);
   }
+
+  @SuppressWarnings("deprecation")
+  public void testJob_noreducer() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumReduceTasks(0);
+//    private static final String UTF8 = "UTF-8";
+    String dir = System.getProperty("test.dir", ".") + "/mapred";
+    Path outputPath = new Path(dir + "/out");
+
+    outputPath.getFileSystem(job).delete(outputPath);
+    WordCountUtil.writeLinesFile();
+
+    job.setJobName("AvroMultipleOutputs_noreducer");
+
+    AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+    AvroJob.setOutputSchema(job,
+                            new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+
+    AvroJob.setMapperClass(job, MapImpl.class);
+
+    FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+    FileOutputFormat.setOutputPath(job, outputPath);
+    FileOutputFormat.setCompressOutput(job, false);
+    AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class, Schema.create(Schema.Type.STRING));
+    JobClient.runJob(job);
+  }
+  
+  public void testProjection_noreducer() throws Exception {
+    JobConf job = new JobConf();
+    long onel = 1;
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    AvroJob.setInputSchema(job, readerSchema);
+    String dir= System.getProperty("test.dir", ".") + "/mapred";
+    Path inputPath = new Path(dir + "/out" + "/myavro2-m-00000.avro");
+    FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+    FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(), job);
+    AvroRecordReader<Utf8> recordReader_new = new AvroRecordReader<Utf8>(job,
fileSplit);
+    AvroWrapper<Utf8> inputPair_new = new AvroWrapper<Utf8>(null);
+    NullWritable ignore = NullWritable.get();
+    long testl=0;
+     while(recordReader_new.next(inputPair_new, ignore)) {
+       testl=Long.parseLong(inputPair_new.datum().toString().split(":")[2].replace("}","").trim());
+       Assert.assertEquals(onel,testl);
+    }
+  }
 }



Mime
View raw message