avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1053749 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/mapred/src/main/java/org/apache/avro/mapred/ lang/java/mapred/src/test/java/org/apache/avro/mapred/
Date Wed, 29 Dec 2010 21:37:43 GMT
Author: cutting
Date: Wed Dec 29 21:37:42 2010
New Revision: 1053749

URL: http://svn.apache.org/viewvc?rev=1053749&view=rev
Log:
AVRO-669. Java: Make MapReduce to work with reflection-based data.

Added:
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroKeyComparator.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Dec 29 21:37:42 2010
@@ -79,6 +79,9 @@ Avro 1.5.0 (unreleased)
 
     AVRO-714. Fix Forrest to work with Java 6. (Carl Steinbach via cutting)
 
+    AVRO-669. Java: Make MapReduce to work with reflection-based data.
+    (cutting)
+
   BUG FIXES
 
     AVRO-675. C: Bytes and fixed setters don't update datum size.

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Wed Dec
29 21:37:42 2010
@@ -112,6 +112,7 @@ public class ReflectData extends Specifi
   @Override
   protected boolean isBytes(Object datum) {
     if (datum == null) return false;
+    if (super.isBytes(datum)) return true;
     Class c = datum.getClass();
     return c.isArray() && c.getComponentType() == Byte.TYPE;
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumWriter.java
Wed Dec 29 21:37:42 2010
@@ -61,7 +61,6 @@ public class ReflectDatumWriter<T> exten
     if (array instanceof Collection)
       return ((Collection)array).size();
     return Array.getLength(array);
-        
   }
 
   @Override
@@ -88,7 +87,10 @@ public class ReflectDatumWriter<T> exten
 
   @Override
   protected void writeBytes(Object datum, Encoder out) throws IOException {
-    out.writeBytes((byte[])datum);
+    if (datum instanceof byte[])
+      out.writeBytes((byte[])datum);
+    else
+      super.writeBytes(datum, out);
   }
 
   @Override

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroJob.java Wed Dec
29 21:37:42 2010
@@ -46,6 +46,10 @@ public class AvroJob {
   public static final String TEXT_PREFIX = "avro.meta.text.";
   /** The configuration key prefix for a binary output metadata. */
   public static final String BINARY_PREFIX = "avro.meta.binary.";
+  /** The configuration key for reflection-based input representation. */
+  public static final String INPUT_IS_REFLECT = "avro.input.is.reflect";
+  /** The configuration key for reflection-based map output representation. */
+  public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
 
   /** Configure a job's map input schema. */
   public static void setInputSchema(JobConf job, Schema s) {
@@ -102,6 +106,22 @@ public class AvroJob {
     job.setInputFormat(SequenceFileInputFormat.class);
   }
 
+  /** Indicate that all a job's data should use the reflect representation.*/
+  public static void setReflect(JobConf job) {
+    setInputReflect(job);
+    setMapOutputReflect(job);
+  }
+  
+  /** Indicate that a job's input data should use reflect representation.*/
+  public static void setInputReflect(JobConf job) {
+    job.setBoolean(INPUT_IS_REFLECT, true);
+  }
+  
+  /** Indicate that a job's map output data should use reflect representation.*/
+  public static void setMapOutputReflect(JobConf job) {
+    job.setBoolean(MAP_OUTPUT_IS_REFLECT, true);
+  }
+
   /** Return a job's output key schema. */
   public static Schema getOutputSchema(Configuration job) {
     return Schema.parse(job.get(OUTPUT_SCHEMA));

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroKeyComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroKeyComparator.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroKeyComparator.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroKeyComparator.java
Wed Dec 29 21:37:42 2010
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
 
 import org.apache.avro.Schema;
 import org.apache.avro.io.BinaryData;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.reflect.ReflectData;
 
 /** The {@link RawComparator} used by jobs configured with {@link AvroJob}. */
 public class AvroKeyComparator<T>
@@ -44,7 +44,7 @@ public class AvroKeyComparator<T>
   }
 
   public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
-    return SpecificData.get().compare(x.datum(), y.datum(), schema);
+    return ReflectData.get().compare(x.datum(), y.datum(), schema);
   }
 
 }

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroOutputFormat.java
Wed Dec 29 21:37:42 2010
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.util.Progressable;
 
 import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.CodecFactory;
 
@@ -66,7 +66,7 @@ public class AvroOutputFormat <T>
       : AvroJob.getOutputSchema(job);
 
     final DataFileWriter<T> writer =
-      new DataFileWriter<T>(new SpecificDatumWriter<T>());
+      new DataFileWriter<T>(new ReflectDatumWriter<T>());
 
     if (FileOutputFormat.getCompressOutput(job)) {
       int level = job.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroRecordReader.java
Wed Dec 29 21:37:42 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.RecordRe
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
 
 /** An {@link RecordReader} for Avro data files. */
 public class AvroRecordReader<T>
@@ -41,7 +42,9 @@ public class AvroRecordReader<T>
     throws IOException {
     this(DataFileReader.openReader
          (new FsInput(split.getPath(), job),
-          new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
+          job.getBoolean(AvroJob.INPUT_IS_REFLECT, false)
+          ? new ReflectDatumReader<T>(AvroJob.getInputSchema(job))
+          : new SpecificDatumReader<T>(AvroJob.getInputSchema(job))),
          split);
   }
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
Wed Dec 29 21:37:42 2010
@@ -34,7 +34,8 @@ import org.apache.avro.io.DecoderFactory
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
 
 /** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
 public class AvroSerialization<T> extends Configured 
@@ -54,8 +55,11 @@ public class AvroSerialization<T> extend
     Schema schema = isKey
       ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
       : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf()));
-    return new AvroWrapperDeserializer(new SpecificDatumReader<T>(schema),
-                                       isKey);
+    DatumReader<T> datumReader =
+      getConf().getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)
+      ? new ReflectDatumReader<T>(schema)
+      : new SpecificDatumReader<T>(schema);
+    return new AvroWrapperDeserializer(datumReader, isKey);
   }
   
   private static final DecoderFactory FACTORY = new DecoderFactory();
@@ -104,7 +108,7 @@ public class AvroSerialization<T> extend
       : (AvroKey.class.isAssignableFrom(c)
          ? Pair.getKeySchema(AvroJob.getMapOutputSchema(getConf()))
          : Pair.getValueSchema(AvroJob.getMapOutputSchema(getConf())));
-    return new AvroWrapperSerializer(new SpecificDatumWriter<T>(schema));
+    return new AvroWrapperSerializer(new ReflectDatumWriter<T>(schema));
   }
 
   private class AvroWrapperSerializer implements Serializer<AvroWrapper<T>> {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/Pair.java Wed Dec 29
21:37:42 2010
@@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificDatumReader.SchemaConstructable;
+import org.apache.avro.reflect.ReflectData;
 
 /** A key/value pair. */
 public class Pair<K,V>
@@ -163,6 +164,46 @@ public class Pair<K,V>
   private static final Schema NULL_SCHEMA = Schema.create(Type.NULL);
 
   @SuppressWarnings("unchecked")
+  public Pair(Object key, Object value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, GenericContainer value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, value.getSchema());
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, CharSequence value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, STRING_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, ByteBuffer value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, BYTES_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, Integer value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, INT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, Long value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, LONG_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, Float value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, FLOAT_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, Double value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, DOUBLE_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(Object key, Void value) {
+    this((K)key, ReflectData.get().getSchema(key.getClass()), (V)value, NULL_SCHEMA);
+  }
+  @SuppressWarnings("unchecked")
+  public Pair(GenericContainer key, Object value) {
+    this((K)key, key.getSchema(), (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(GenericContainer key, GenericContainer value) {
     this((K)key, key.getSchema(), (V)value, value.getSchema());
   }
@@ -195,6 +236,10 @@ public class Pair<K,V>
     this((K)key, key.getSchema(), (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(CharSequence key, Object value) {
+    this((K)key, STRING_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(CharSequence key, GenericContainer value) {
     this((K)key, STRING_SCHEMA, (V)value, value.getSchema());
   }
@@ -227,6 +272,10 @@ public class Pair<K,V>
     this((K)key, STRING_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(ByteBuffer key, Object value) {
+    this((K)key, BYTES_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(ByteBuffer key, GenericContainer value) {
     this((K)key, BYTES_SCHEMA, (V)value, value.getSchema());
   }
@@ -259,6 +308,10 @@ public class Pair<K,V>
     this((K)key, BYTES_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(Integer key, Object value) {
+    this((K)key, INT_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(Integer key, GenericContainer value) {
     this((K)key, INT_SCHEMA, (V)value, value.getSchema());
   }
@@ -291,6 +344,10 @@ public class Pair<K,V>
     this((K)key, INT_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(Long key, Object value) {
+    this((K)key, LONG_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(Long key, GenericContainer value) {
     this((K)key, LONG_SCHEMA, (V)value, value.getSchema());
   }
@@ -323,6 +380,10 @@ public class Pair<K,V>
     this((K)key, LONG_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(Float key, Object value) {
+    this((K)key, FLOAT_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(Float key, GenericContainer value) {
     this((K)key, FLOAT_SCHEMA, (V)value, value.getSchema());
   }
@@ -355,6 +416,10 @@ public class Pair<K,V>
     this((K)key, FLOAT_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(Double key, Object value) {
+    this((K)key, DOUBLE_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(Double key, GenericContainer value) {
     this((K)key, DOUBLE_SCHEMA, (V)value, value.getSchema());
   }
@@ -387,6 +452,10 @@ public class Pair<K,V>
     this((K)key, DOUBLE_SCHEMA, (V)value, NULL_SCHEMA);
   }
   @SuppressWarnings("unchecked")
+  public Pair(Void key, Object value) {
+    this((K)key, NULL_SCHEMA, (V)value, ReflectData.get().getSchema(value.getClass()));
+  }
+  @SuppressWarnings("unchecked")
   public Pair(Void key, GenericContainer value) {
     this((K)key, NULL_SCHEMA, (V)value, value.getSchema());
   }
@@ -420,6 +489,7 @@ public class Pair<K,V>
   }
 
   // private static final String[][] TABLE = new String[][] {
+  //   {"Object", "ReflectData.get().getSchema({0}.getClass())"},
   //   {"GenericContainer", "{0}.getSchema()"},
   //   {"CharSequence", "STRING_SCHEMA"},
   //   {"ByteBuffer", "BYTES_SCHEMA"},

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java?rev=1053749&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java Wed
Dec 29 21:37:42 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.reflect.ReflectDatumReader;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestReflectJob {
+
+  /** The input class. */
+  public static class Text {
+    private String text = "";
+    public Text() {}
+    public Text(String text) { this.text = text; }
+    public String toString() { return text; }
+  }
+
+  /** The intermediate data class. */
+  public static class Count {
+    private long count;
+    public Count() {}
+    public Count(long count) { this.count = count; }
+  }
+
+  /** The output class. */
+  public static class WordCount {
+    private String word;
+    private long count;
+    public WordCount() {}
+    public WordCount(String word, long count) {
+      this.word = word;
+      this.count = count;
+    }
+  }
+
+  public static class MapImpl extends AvroMapper<Text, Pair<Text,Count>> {
+    @Override
+      public void map(Text text, AvroCollector<Pair<Text,Count>> collector,
+                      Reporter reporter) throws IOException {
+      StringTokenizer tokens = new StringTokenizer(text.toString());
+      while (tokens.hasMoreTokens())
+        collector.collect(new Pair<Text,Count>(new Text(tokens.nextToken()),
+                                               new Count(1L)));
+    }
+  }
+  
+  public static class ReduceImpl
+    extends AvroReducer<Text, Count, WordCount> {
+    @Override
+    public void reduce(Text word, Iterable<Count> counts,
+                       AvroCollector<WordCount> collector,
+                       Reporter reporter) throws IOException {
+      long sum = 0;
+      for (Count count : counts)
+        sum += count.count;
+      collector.collect(new WordCount(word.text, sum));
+    }
+  }    
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJob() throws Exception {
+    JobConf job = new JobConf();
+    String dir = System.getProperty("test.dir", ".") + "/mapred-reflect";
+    Path inputPath = new Path(dir + "/in");
+    Path outputPath = new Path(dir + "/out");
+
+    outputPath.getFileSystem(job).delete(outputPath);
+    inputPath.getFileSystem(job).delete(inputPath);
+
+    writeLinesFile(new File(dir+"/in"));
+    
+    job.setJobName("reflect");
+    
+    AvroJob.setInputSchema(job, ReflectData.get().getSchema(Text.class));
+    AvroJob.setMapOutputSchema
+      (job, new Pair(new Text(""), new Count(0L)).getSchema());
+    AvroJob.setOutputSchema(job, ReflectData.get().getSchema(WordCount.class));
+    
+    AvroJob.setMapperClass(job, MapImpl.class);        
+    //AvroJob.setCombinerClass(job, ReduceImpl.class);
+    AvroJob.setReducerClass(job, ReduceImpl.class);
+    
+    FileInputFormat.setInputPaths(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    AvroJob.setReflect(job);                      // use reflection
+
+    JobClient.runJob(job);
+    
+    validateCountsFile(new File(new File(dir, "out"), "part-00000.avro"));
+  }
+
+  private void writeLinesFile(File dir) throws IOException {
+    DatumWriter<Text> writer = new ReflectDatumWriter<Text>();
+    DataFileWriter<Text> out = new DataFileWriter<Text>(writer);
+    File linesFile = new File(dir+"/lines.avro");
+    dir.mkdirs();
+    out.create(ReflectData.get().getSchema(Text.class), linesFile);
+    for (String line : WordCountUtil.LINES)
+      out.append(new Text(line));
+    out.close();
+  }
+  
+  private void validateCountsFile(File file) throws Exception {
+    DatumReader<WordCount> reader = new ReflectDatumReader<WordCount>();
+    InputStream in = new BufferedInputStream(new FileInputStream(file));
+    DataFileStream<WordCount> counts = new DataFileStream<WordCount>(in,reader);
+    int numWords = 0;
+    for (WordCount wc : counts) {
+      assertEquals(wc.word,
+                   WordCountUtil.COUNTS.get(wc.word),
+                   (Long)wc.count);
+      numWords++;
+    }
+    in.close();
+    assertEquals(WordCountUtil.COUNTS.size(), numWords);
+  }
+
+
+}

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1053749&r1=1053748&r2=1053749&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java Wed
Dec 29 21:37:42 2010
@@ -54,7 +54,7 @@ class WordCountUtil {
   private static final File COUNTS_FILE
     = new File(new File(DIR, "out"), "part-00000.avro");
 
-  private static final String[] LINES = new String[] {
+  public static final String[] LINES = new String[] {
     "the quick brown fox jumps over the lazy dog",
     "the cow jumps over the moon",
     "the rain in spain falls mainly on the plains"



Mime
View raw message