avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1095495 - in /avro/branches/branch-1.5: ./ doc/src/content/xdocs/ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/ipc/src/test/java/org/apache/avro/ lang/java/mapred/src/ma...
Date Wed, 20 Apr 2011 20:00:54 GMT
Author: cutting
Date: Wed Apr 20 20:00:53 2011
New Revision: 1095495

URL: http://svn.apache.org/viewvc?rev=1095495&view=rev
Log:
Merge changes 1094812, 1095206, 1095207, 1095208 and 1095493 from trunk to 1.5 branch.  Fixes:
AVRO-802, AVRO-799, AVRO-798, and AVRO-763.

Added:
    avro/branches/branch-1.5/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java
      - copied unchanged from r1095207, avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java
Modified:
    avro/branches/branch-1.5/   (props changed)
    avro/branches/branch-1.5/CHANGES.txt
    avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml
    avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
    avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
    avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
    avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
    avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
    avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
    avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java

Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 20:00:53 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493

Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Wed Apr 20 20:00:53 2011
@@ -30,7 +30,20 @@ Avro 1.5.1 (unreleased)
 
     AVRO-794. Makefile.am is no longer required in C++. (thiru)
 
-    AVRO-795. C++ Datafile reader makes it hard to build adaptive clients. (thiru)
+    AVRO-795. C++ Datafile reader makes it hard to build adaptive
+    clients. (thiru)
+
+    AVRO-802. Java: Add documentation for non-Avro input, map-only
+    jobs. (cutting)
+
+    AVRO-799. Java: Add support for --codec parameter to the
+    'fromtext' command.  Also made some performance improvements, bug
+    fixes and added tests for this command. (cutting)
+
+    AVRO-798. Add checksum to Snappy compressed blocks. (cutting)
+
+    AVRO-763. Java MapReduce API: add support for configure() and
+    close() methods to mappers and reducers. (Marshall Pierce via cutting)
 
   BUG FIXES
 
@@ -39,6 +52,9 @@ Avro 1.5.1 (unreleased)
     AVRO-780. Java: Fix a NullPointerException with reflect data when
     a union contains an array and null. (cutting)
 
+    AVRO-790. Java: GenericDatumReader can fail when reusing objects with unions
+    containing 'bytes' fields. (scottcarey)
+
 Avro 1.5.0 (10 March 2011)
 
   INCOMPATIBLE CHANGES

Modified: avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml (original)
+++ avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml Wed Apr 20 20:00:53 2011
@@ -701,7 +701,8 @@
           <title>snappy</title>
           <p>The "snappy" codec uses
             Google's <a href="http://code.google.com/p/snappy/">Snappy</a>
-            compression library.</p>
+            compression library.  Each compressed block is followed
+            by its 4-byte, big-endian CRC32 checksum.</p>
         </section>
       </section>
     </section>

Modified: avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
(original)
+++ avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
Wed Apr 20 20:00:53 2011
@@ -19,19 +19,19 @@ package org.apache.avro.file;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
 
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyException;
 
 /** * Implements Snappy compression and decompression. */
 class SnappyCodec extends Codec {
-
-  private static final SnappyCodec INSTANCE = new SnappyCodec();
+  private CRC32 crc32 = new CRC32();
 
   static class Option extends CodecFactory {
     @Override
     protected Codec createInstance() {
-      return INSTANCE;
+      return new SnappyCodec();
     }
   }
 
@@ -43,10 +43,15 @@ class SnappyCodec extends Codec {
   ByteBuffer compress(ByteBuffer in) throws IOException {
     try { 
       ByteBuffer out =
-        ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining()));
+        ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
       int size = Snappy.compress(in.array(), in.position(), in.remaining(),
                                  out.array(), 0);
-      out.limit(size);
+      crc32.reset();
+      crc32.update(in.array(), in.position(), in.remaining());
+      out.putInt(size, (int)crc32.getValue());
+
+      out.limit(size+4);
+
       return out;
     } catch (SnappyException e) {
       throw new IOException(e);
@@ -57,10 +62,16 @@ class SnappyCodec extends Codec {
   ByteBuffer decompress(ByteBuffer in) throws IOException {
     try { 
       ByteBuffer out = ByteBuffer.allocate
-        (Snappy.uncompressedLength(in.array(), in.position(), in.remaining()));
-      int size = Snappy.uncompress(in.array(), in.position(), in.remaining(),
+        (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
+      int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
                                    out.array(), 0);
       out.limit(size);
+
+      crc32.reset();
+      crc32.update(out.array(), 0, size);
+      if (in.getInt(in.limit()-4) != (int)crc32.getValue())
+        throw new IOException("Checksum failure");
+
       return out;
     } catch (SnappyException e) {
       throw new IOException(e);

Modified: avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++ avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
Wed Apr 20 20:00:53 2011
@@ -330,7 +330,7 @@ public class GenericDatumReader<D> imple
    * byte array representation.  By default, this calls {@link
    * Decoder#readBytes(ByteBuffer)}.*/
   protected Object readBytes(Object old, Decoder in) throws IOException {
-    return in.readBytes((ByteBuffer)old);
+    return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null);
   }
 
   /** Called to read integers.  Subclasses may override to use a different

Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java Wed
Apr 20 20:00:53 2011
@@ -45,7 +45,6 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.compiler.specific.TestSpecificCompiler;
 import org.apache.avro.util.Utf8;
 
@@ -79,7 +78,7 @@ public class TestSchema {
       + "      \"name\": \"inner_union\" }\n" + "  ]\n" + "}\n";
 
   private static final int COUNT =
-    Integer.parseInt(System.getProperty("test.count", "10"));
+    Integer.parseInt(System.getProperty("test.count", "30"));
 
   @Test
   public void testNull() throws Exception {
@@ -163,6 +162,18 @@ public class TestSchema {
     check("{\"type\":\"map\", \"values\":\"long\"}", "{\"a\":1}", map);
     checkParseError("{\"type\":\"map\"}");        // values required
   }
+  
+  @Test
+  public void testUnionMap() throws Exception {
+    String unionMapSchema = "{\"name\":\"foo\", \"type\":\"record\"," +
+    		" \"fields\":[ {\"name\":\"mymap\", \"type\":" +
+    		"   [{\"type\":\"map\", \"values\":" +
+    		"      [\"int\",\"long\",\"float\",\"string\"]}," +
+    		"    \"null\"]" +
+    		"   }]" +
+    		" }";
+    check(unionMapSchema, true);
+  }
 
   @Test
   public void testRecord() throws Exception {
@@ -558,6 +569,7 @@ public class TestSchema {
     throws Exception {
     Schema schema = Schema.parse(jsonSchema);
     checkProp(schema);
+    Object reuse = null;
     for (Object datum : new RandomData(schema, COUNT)) {
 
       if (induce) {
@@ -570,7 +582,10 @@ public class TestSchema {
 
       checkBinary(schema, datum,
                   new GenericDatumWriter<Object>(),
-                  new GenericDatumReader<Object>());
+                  new GenericDatumReader<Object>(), null);
+      reuse = checkBinary(schema, datum,
+          new GenericDatumWriter<Object>(),
+          new GenericDatumReader<Object>(), reuse);
       checkDirectBinary(schema, datum,
                   new GenericDatumWriter<Object>(),
                   new GenericDatumReader<Object>());
@@ -603,6 +618,14 @@ public class TestSchema {
                                  DatumWriter<Object> writer,
                                  DatumReader<Object> reader)
     throws IOException {
+    checkBinary(schema, datum, writer, reader, null);
+  }
+  
+  public static Object checkBinary(Schema schema, Object datum,
+                                 DatumWriter<Object> writer,
+                                 DatumReader<Object> reader,
+                                 Object reuse)
+    throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     writer.setSchema(schema);
     Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
@@ -613,10 +636,11 @@ public class TestSchema {
     reader.setSchema(schema);
         
     Object decoded =
-      reader.read(null, DecoderFactory.get().binaryDecoder(
+      reader.read(reuse, DecoderFactory.get().binaryDecoder(
           data, null));
       
     assertEquals("Decoded data does not match.", datum, decoded);
+    return decoded;
   }
 
   public static void checkDirectBinary(Schema schema, Object datum,

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
Wed Apr 20 20:00:53 2011
@@ -18,17 +18,20 @@
 
 package org.apache.avro.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
 
 /** A mapper for Avro data.
  *
  * <p>Applications subclass this class and pass their subclass to {@link
- * AvroJob#setMapperClass}, overriding {@link #map}.
+ * AvroJob#setMapperClass(JobConf, Class)}, overriding {@link #map(Object, AvroCollector,
Reporter)}.
  */
-public class AvroMapper<IN,OUT> extends Configured {
+public class AvroMapper<IN, OUT> extends Configured implements JobConfigurable, Closeable
{
 
   /** Called with each map input datum.  By default, collects inputs. */
   @SuppressWarnings("unchecked")
@@ -38,4 +41,15 @@ public class AvroMapper<IN,OUT> extends 
   }
 
 
+  /** Subclasses can override this as desired. */
+  @Override
+  public void close() throws IOException {
+    // no op
+  }
+
+  /** Subclasses can override this as desired. */
+  @Override
+  public void configure(JobConf jobConf) {
+    // no op
+  }
 }

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
Wed Apr 20 20:00:53 2011
@@ -18,19 +18,22 @@
 
 package org.apache.avro.mapred;
 
+import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
 
 /** A reducer for Avro data.
  *
  * <p>Applications should subclass this class and pass their subclass to {@link
- * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
- * Subclasses override {@link #reduce}.
+ * AvroJob#setReducerClass(JobConf, Class)} and perhaps {@link AvroJob#setCombinerClass(JobConf,
Class)}.
+ * Subclasses override {@link #reduce(Object, Iterable, AvroCollector, Reporter)}.
  */
 
-public class AvroReducer<K,V,OUT> extends Configured {
+public class AvroReducer<K,V,OUT> extends Configured implements JobConfigurable, Closeable
{
 
   private Pair<K,V> outputPair;
 
@@ -48,4 +51,15 @@ public class AvroReducer<K,V,OUT> extend
     }
   }
 
+  /** Subclasses can override this as desired. */
+  @Override
+  public void close() throws IOException {
+    // no op
+  }
+
+  /** Subclasses can override this as desired. */
+  @Override
+  public void configure(JobConf jobConf) {
+    // no op
+  }
 }

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
Wed Apr 20 20:00:53 2011
@@ -21,11 +21,11 @@ package org.apache.avro.mapred;
 import java.io.IOException;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
@@ -45,6 +45,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
       (conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
        conf);
     this.isMapOnly = conf.getNumReduceTasks() == 0;
+    this.mapper.configure(conf);
   }
 
   @SuppressWarnings("unchecked")
@@ -80,4 +81,9 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
     mapper.map(wrapper.datum(), out, reporter);
   }
 
+  @Override
+  public void close() throws IOException {
+    this.mapper.close();
+  }
+
 }

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
Wed Apr 20 20:00:53 2011
@@ -39,6 +39,7 @@ abstract class HadoopReducerBase<K,V,OUT
   @Override
   public void configure(JobConf conf) {
     this.reducer = getReducer(conf);
+    this.reducer.configure(conf);
   }
 
   class ReduceIterable implements Iterable<V>, Iterator<V> {
@@ -60,4 +61,8 @@ abstract class HadoopReducerBase<K,V,OUT
     reducer.reduce(key.datum(), reduceIterable, collector, reporter);
   }
 
+  @Override
+  public void close() throws IOException {
+    this.reducer.close();
+  }
 }

Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
Wed Apr 20 20:00:53 2011
@@ -96,5 +96,23 @@ Avro data, with map and reduce functions
  </ul>
 </p>
 
+<p>For jobs whose input is non-Avro data file and which use a
+  non-Avro {@link org.apache.hadoop.mapred.Mapper} and no reducer,
+  i.e., a <i>map-only</i> job:
+ <ul>
+   <li>Set your input file format with {@link
+   org.apache.hadoop.mapred.JobConf#setInputFormat}.</li>
+   <li>Implement {@link org.apache.hadoop.mapred.Mapper} and specify
+   your job's mapper with {@link
+   org.apache.hadoop.mapred.JobConf#setMapperClass}.  The output key
+   and value type should be {@link org.apache.avro.mapred.AvroWrapper} and
+   {@link org.apache.hadoop.io.NullWritable}.</li>
+   <li>Call {@link
+   org.apache.hadoop.mapred.JobConf#setNumReduceTasks(int)} with zero.
+   <li>Call {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+   job's output schema.</li>
+ </ul>
+</p>
+
 </body>
 </html>

Modified: avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
Wed Apr 20 20:00:53 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -55,7 +56,7 @@ public class TestSequenceFileReader {
   private static final int COUNT =
     Integer.parseInt(System.getProperty("test.count", "10"));
   private static final File DIR
-    = new File(System.getProperty("test.dir", "/tmp"));
+    = new File(System.getProperty("test.dir", "."));
   private static final File FILE = new File(DIR, "test.seq");
 
   private static final Schema SCHEMA
@@ -162,6 +163,45 @@ public class TestSequenceFileReader {
                new SpecificDatumReader<Pair<Long,CharSequence>>()));
   }
 
+  private static class NonAvroOnlyMapper
+    extends MapReduceBase
+    implements Mapper<LongWritable,Text,AvroWrapper<Pair<Long,Utf8>>,NullWritable>
{
+    
+    public void map(LongWritable key, Text value, 
+                    OutputCollector<AvroWrapper<Pair<Long,Utf8>>,NullWritable>
out, 
+                    Reporter reporter) throws IOException {
+      out.collect(new AvroWrapper<Pair<Long,Utf8>>(new Pair<Long,Utf8>(key.get(),
new Utf8(value.toString()))),
+                  NullWritable.get());
+    }
+  }
+
+  @Test
+  public void testNonAvroMapOnly() throws Exception {
+    JobConf job = new JobConf();
+    Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+    output.getFileSystem(job).delete(output);
+    
+
+    // configure input for non-Avro sequence file
+    job.setInputFormat(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+    // use a hadoop mapper that emits Avro output
+    job.setMapperClass(NonAvroOnlyMapper.class);
+
+    // configure output for avro
+    job.setNumReduceTasks(0);                     // map-only
+    FileOutputFormat.setOutputPath(job, output);
+    AvroJob.setOutputSchema(job, SCHEMA);
+
+    JobClient.runJob(job);
+
+    checkFile(new DataFileReader<Pair<Long,CharSequence>>
+              (new File(output.toString()+"/part-00000.avro"),
+               new SpecificDatumReader<Pair<Long,CharSequence>>()));
+  }
+
   private static class NonAvroReducer
     extends MapReduceBase
     implements Reducer<AvroKey<Long>,AvroValue<Utf8>,LongWritable,Text>
{

Modified: avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
(original)
+++ avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
Wed Apr 20 20:00:53 2011
@@ -20,6 +20,7 @@ package org.apache.avro.mapred;
 
 import java.io.IOException;
 import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
@@ -34,15 +35,31 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.file.DataFileReader;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import org.junit.After;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import test.Weather;
 
 /** Tests mapred API with a specific record. */
 public class TestWeather {
 
+  private static final AtomicInteger mapCloseCalls = new AtomicInteger();
+  private static final AtomicInteger mapConfigureCalls = new AtomicInteger();
+  private static final AtomicInteger reducerCloseCalls = new AtomicInteger();
+  private static final AtomicInteger reducerConfigureCalls = new AtomicInteger();
+
+
+  @After
+  public void tearDown() {
+    mapCloseCalls.set(0);
+    mapConfigureCalls.set(0);
+    reducerCloseCalls.set(0);
+    reducerConfigureCalls.set(0);
+  }
+
   /** Uses default mapper with no reduces for a map-only identity job. */
   @Test
   @SuppressWarnings("deprecation")
@@ -64,7 +81,7 @@ public class TestWeather {
     FileOutputFormat.setCompressOutput(job, true);
     
     job.setNumReduceTasks(0);                     // map-only
-    
+
     JobClient.runJob(job);
 
     // check output is correct
@@ -88,8 +105,18 @@ public class TestWeather {
                       Reporter reporter) throws IOException {
       collector.collect(new Pair<Weather,Void>(w, (Void)null));
     }
+
+    @Override
+    public void close() throws IOException {
+      mapCloseCalls.incrementAndGet();
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      mapConfigureCalls.incrementAndGet();
+    }
   }
-  
+
   // output keys only, since values are empty
   public static class SortReducer
     extends AvroReducer<Weather, Void, Weather> {
@@ -99,7 +126,17 @@ public class TestWeather {
                        Reporter reporter) throws IOException {
       collector.collect(w);
     }
-  }    
+
+    @Override
+    public void close() throws IOException {
+      reducerCloseCalls.incrementAndGet();
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      reducerConfigureCalls.incrementAndGet();
+    }
+  }
 
   @Test
   @SuppressWarnings("deprecation")
@@ -140,6 +177,15 @@ public class TestWeather {
 
     check.close();
     sorted.close();
+
+    // check that AvroMapper and AvroReducer get close() and configure() called
+    assertEquals(1, mapCloseCalls.get());
+    assertEquals(1, reducerCloseCalls.get());
+    // gets called twice for some reason, so loosen this check
+    assertTrue(mapConfigureCalls.get() >= 1);
+    assertTrue(reducerConfigureCalls.get() >= 1);
+
+
   }
 
 

Modified: avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
(original)
+++ avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
Wed Apr 20 20:00:53 2011
@@ -19,9 +19,7 @@ package org.apache.avro.tool;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -34,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
 
 /** Reads a text file into an Avro data file.
  * 
@@ -61,36 +60,68 @@ public class FromTextTool implements Too
     OptionSpec<Integer> level = p.accepts("level", "compression level")
     .withOptionalArg().ofType(Integer.class);
 
+    OptionSpec<String> codec = p.accepts("codec", "compression codec")
+    .withOptionalArg().ofType(String.class);
+
     OptionSet opts = p.parse(args.toArray(new String[0]));
 
-    if (opts.nonOptionArguments().size() != 2) {
+    List<String> nargs = opts.nonOptionArguments();
+    if (nargs.size() != 2) {
       err.println("Expected 2 args: from_file to_file (local filenames," +
           " Hadoop URI's, or '-' for stdin/stdout");
       p.printHelpOn(err);
       return 1;
     }
  
-    BufferedInputStream inStream = Util.fileOrStdin(args.get(0), stdin);
-    BufferedOutputStream outStream = Util.fileOrStdout(args.get(1), out);
-    
     int compressionLevel = 1; // Default compression level
     if (opts.hasArgument(level)) {
       compressionLevel = level.value(opts);
     }
   
-    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String codecName = opts.hasArgument(codec)
+      ? codec.value(opts)
+      : DEFLATE_CODEC;
+    CodecFactory codecFactory = codecName.equals(DEFLATE_CODEC)
+      ? CodecFactory.deflateCodec(compressionLevel)
+      : CodecFactory.fromString(codecName);
+
+    BufferedInputStream inStream = Util.fileOrStdin(nargs.get(0), stdin);
+    BufferedOutputStream outStream = Util.fileOrStdout(nargs.get(1), out);
+    
     DataFileWriter<ByteBuffer> writer =
         new DataFileWriter<ByteBuffer>(new GenericDatumWriter<ByteBuffer>());
-    writer.setCodec(CodecFactory.deflateCodec(compressionLevel));
+    writer.setCodec(codecFactory);
     writer.create(Schema.parse(TEXT_FILE_SCHEMA), outStream);
 
-    String line;
-    while((line = reader.readLine()) != null) {
-      ByteBuffer buff = ByteBuffer.wrap(line.getBytes());
-      writer.append(buff);
+    ByteBuffer line = ByteBuffer.allocate(128);
+    boolean returnSeen = false;
+    byte[] buf = new byte[8192];
+    for (int end = inStream.read(buf); end != -1; end = inStream.read(buf)) {
+      for (int i = 0; i < end; i++) {
+        int b = buf[i] & 0xFF;
+        if (b == '\n') {                          // newline
+          System.out.println("Writing line = "+line.position());
+          line.flip();
+          writer.append(line);
+          line.clear();
+          returnSeen = false;
+        } else if (b == '\r') {                   // return
+          line.flip();
+          writer.append(line);
+          line.clear();
+          returnSeen = true;
+        } else {
+          if (line.position() == line.limit()) {    // reallocate longer line
+            ByteBuffer tempLine = ByteBuffer.allocate(line.limit()*2);
+            line.flip();
+            tempLine.put(line);
+            line = tempLine;
+          }
+          line.put((byte)b);
+          returnSeen = false;
+        }
+      }
     }
-    
-    writer.flush();
     writer.close();
     inStream.close();
     return 0;

Modified: avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
(original)
+++ avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
Wed Apr 20 20:00:53 2011
@@ -35,7 +35,7 @@ import org.apache.avro.generic.GenericDa
 public class ToTextTool implements Tool {
   private static final String TEXT_FILE_SCHEMA = 
         "\"bytes\"";
-  private static final byte[] LINE_SEPERATOR = 
+  private static final byte[] LINE_SEPARATOR = 
         System.getProperty("line.separator").getBytes();
     
   @Override
@@ -45,7 +45,7 @@ public class ToTextTool implements Tool 
 
   @Override
   public String getShortDescription() {
-    return "Converts and avro file to a text file.";
+    return "Converts an Avro data file to a text file.";
   }
 
   @Override
@@ -77,7 +77,7 @@ public class ToTextTool implements Tool 
     while (fileReader.hasNext()) {
       ByteBuffer outBuff = (ByteBuffer) fileReader.next();
       outStream.write(outBuff.array());
-      outStream.write(LINE_SEPERATOR);
+      outStream.write(LINE_SEPARATOR);
     }
     
     outStream.close();



Mime
View raw message