avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1095493 - in /avro/trunk: ./ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Date Wed, 20 Apr 2011 19:55:38 GMT
Author: cutting
Date: Wed Apr 20 19:55:37 2011
New Revision: 1095493

URL: http://svn.apache.org/viewvc?rev=1095493&view=rev
Log:
AVRO-763. Java MapReduce API: add support for configure() and close() methods to mappers and
reducers.  Contributed by Marshall Pierce.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Apr 20 19:55:37 2011
@@ -52,6 +52,9 @@ Avro 1.5.1 (unreleased)
 
     AVRO-798. Add checksum to Snappy compressed blocks. (cutting)
 
+    AVRO-763. Java MapReduce API: add support for configure() and
+    close() methods to mappers and reducers. (Marshall Pierce via cutting)
+
   BUG FIXES
 
     AVRO-786. Java: Fix equals() to work on objects containing maps. (cutting)

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java Wed Apr
20 19:55:37 2011
@@ -18,17 +18,20 @@
 
 package org.apache.avro.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
 
 /** A mapper for Avro data.
  *
  * <p>Applications subclass this class and pass their subclass to {@link
- * AvroJob#setMapperClass}, overriding {@link #map}.
+ * AvroJob#setMapperClass(JobConf, Class)}, overriding {@link #map(Object, AvroCollector,
Reporter)}.
  */
-public class AvroMapper<IN,OUT> extends Configured {
+public class AvroMapper<IN, OUT> extends Configured implements JobConfigurable, Closeable
{
 
   /** Called with each map input datum.  By default, collects inputs. */
   @SuppressWarnings("unchecked")
@@ -38,4 +41,15 @@ public class AvroMapper<IN,OUT> extends 
   }
 
 
+  /** Subclasses can override this as desired. */
+  @Override
+  public void close() throws IOException {
+    // no op
+  }
+
+  /** Subclasses can override this as desired. */
+  @Override
+  public void configure(JobConf jobConf) {
+    // no op
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java Wed
Apr 20 19:55:37 2011
@@ -18,19 +18,22 @@
 
 package org.apache.avro.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
 
 /** A reducer for Avro data.
  *
  * <p>Applications should subclass this class and pass their subclass to {@link
- * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
- * Subclasses override {@link #reduce}.
+ * AvroJob#setReducerClass(JobConf, Class)} and perhaps {@link AvroJob#setCombinerClass(JobConf,
Class)}.
+ * Subclasses override {@link #reduce(Object, Iterable, AvroCollector, Reporter)}.
  */
 
-public class AvroReducer<K,V,OUT> extends Configured {
+public class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable
{
 
   private Pair<K,V> outputPair;
 
@@ -48,4 +51,15 @@ public class AvroReducer<K,V,OUT> extend
     }
   }
 
+  /** Subclasses can override this as desired. */
+  @Override
+  public void close() throws IOException {
+    // no op
+  }
+
+  /** Subclasses can override this as desired. */
+  @Override
+  public void configure(JobConf jobConf) {
+    // no op
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java Wed
Apr 20 19:55:37 2011
@@ -21,11 +21,11 @@ package org.apache.avro.mapred;
 import java.io.IOException;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
@@ -45,6 +45,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
       (conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
        conf);
     this.isMapOnly = conf.getNumReduceTasks() == 0;
+    this.mapper.configure(conf);
   }
 
   @SuppressWarnings("unchecked")
@@ -80,4 +81,9 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
     mapper.map(wrapper.datum(), out, reporter);
   }
 
+  @Override
+  public void close() throws IOException {
+    this.mapper.close();
+  }
+
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
Wed Apr 20 19:55:37 2011
@@ -39,6 +39,7 @@ abstract class HadoopReducerBase<K,V,OUT
   @Override
   public void configure(JobConf conf) {
     this.reducer = getReducer(conf);
+    this.reducer.configure(conf);
   }
 
   class ReduceIterable implements Iterable<V>, Iterator<V> {
@@ -60,4 +61,8 @@ abstract class HadoopReducerBase<K,V,OUT
     reducer.reduce(key.datum(), reduceIterable, collector, reporter);
   }
 
+  @Override
+  public void close() throws IOException {
+    this.reducer.close();
+  }
 }

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1095493&r1=1095492&r2=1095493&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java Wed
Apr 20 19:55:37 2011
@@ -20,6 +20,7 @@ package org.apache.avro.mapred;
 
 import java.io.IOException;
 import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
@@ -34,15 +35,31 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.file.DataFileReader;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import org.junit.After;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import test.Weather;
 
 /** Tests mapred API with a specific record. */
 public class TestWeather {
 
+  private static final AtomicInteger mapCloseCalls = new AtomicInteger();
+  private static final AtomicInteger mapConfigureCalls = new AtomicInteger();
+  private static final AtomicInteger reducerCloseCalls = new AtomicInteger();
+  private static final AtomicInteger reducerConfigureCalls = new AtomicInteger();
+
+
+  @After
+  public void tearDown() {
+    mapCloseCalls.set(0);
+    mapConfigureCalls.set(0);
+    reducerCloseCalls.set(0);
+    reducerConfigureCalls.set(0);
+  }
+
   /** Uses default mapper with no reduces for a map-only identity job. */
   @Test
   @SuppressWarnings("deprecation")
@@ -64,7 +81,7 @@ public class TestWeather {
     FileOutputFormat.setCompressOutput(job, true);
     
     job.setNumReduceTasks(0);                     // map-only
-    
+
     JobClient.runJob(job);
 
     // check output is correct
@@ -88,8 +105,18 @@ public class TestWeather {
                       Reporter reporter) throws IOException {
       collector.collect(new Pair<Weather,Void>(w, (Void)null));
     }
+
+    @Override
+    public void close() throws IOException {
+      mapCloseCalls.incrementAndGet();
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      mapConfigureCalls.incrementAndGet();
+    }
   }
-  
+
   // output keys only, since values are empty
   public static class SortReducer
     extends AvroReducer<Weather, Void, Weather> {
@@ -99,7 +126,17 @@ public class TestWeather {
                        Reporter reporter) throws IOException {
       collector.collect(w);
     }
-  }    
+
+    @Override
+    public void close() throws IOException {
+      reducerCloseCalls.incrementAndGet();
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      reducerConfigureCalls.incrementAndGet();
+    }
+  }
 
   @Test
   @SuppressWarnings("deprecation")
@@ -140,6 +177,15 @@ public class TestWeather {
 
     check.close();
     sorted.close();
+
+    // check that AvroMapper and AvroReducer get close() and configure() called
+    assertEquals(1, mapCloseCalls.get());
+    assertEquals(1, reducerCloseCalls.get());
+    // gets called twice for some reason, so loosen this check
+    assertTrue(mapConfigureCalls.get() >= 1);
+    assertTrue(reducerConfigureCalls.get() >= 1);
+
+
   }
 
 



Mime
View raw message