Return-Path: X-Original-To: apmail-avro-commits-archive@www.apache.org Delivered-To: apmail-avro-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0171E1CCB for ; Wed, 20 Apr 2011 20:01:23 +0000 (UTC) Received: (qmail 32893 invoked by uid 500); 20 Apr 2011 20:01:22 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 32872 invoked by uid 500); 20 Apr 2011 20:01:22 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 32865 invoked by uid 99); 20 Apr 2011 20:01:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2011 20:01:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2011 20:01:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A68DF2388A3C; Wed, 20 Apr 2011 20:00:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1095495 - in /avro/branches/branch-1.5: ./ doc/src/content/xdocs/ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/ipc/src/test/java/org/apache/avro/ lang/java/mapred/src/ma... Date: Wed, 20 Apr 2011 20:00:54 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110420200054.A68DF2388A3C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Wed Apr 20 20:00:53 2011 New Revision: 1095495 URL: http://svn.apache.org/viewvc?rev=1095495&view=rev Log: Merge changes 1094812, 1095206, 1095207, 1095208 and 1095493 from trunk to 1.5 branch. Fixes: AVRO-802, AVRO-799, AVRO-798, and AVRO-763. Added: avro/branches/branch-1.5/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java - copied unchanged from r1095207, avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java Modified: avro/branches/branch-1.5/ (props changed) avro/branches/branch-1.5/CHANGES.txt avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java Propchange: avro/branches/branch-1.5/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Apr 20 20:00:53 2011 @@ -1 +1 @@ -/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550 +/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493 Modified: avro/branches/branch-1.5/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/CHANGES.txt (original) +++ avro/branches/branch-1.5/CHANGES.txt Wed Apr 20 20:00:53 2011 @@ -30,7 +30,20 @@ Avro 1.5.1 (unreleased) AVRO-794. Makefile.am is no longer required in C++. (thiru) - AVRO-795. C++ Datafile reader makes it hard to build adaptive clients. (thiru) + AVRO-795. C++ Datafile reader makes it hard to build adaptive + clients. (thiru) + + AVRO-802. Java: Add documentation for non-Avro input, map-only + jobs. (cutting) + + AVRO-799. Java: Add support for --codec parameter to the + 'fromtext' command. Also made some performance improvements, bug + fixes and added tests for this command. (cutting) + + AVRO-798. Add checksum to Snappy compressed blocks. (cutting) + + AVRO-763. Java MapReduce API: add support for configure() and + close() methods to mappers and reducers. (Marshall Pierce via cutting) BUG FIXES @@ -39,6 +52,9 @@ Avro 1.5.1 (unreleased) AVRO-780. Java: Fix a NullPointerException with reflect data when a union contains an array and null. (cutting) + AVRO-790. Java: GenericDatumReader can fail when reusing objects with unions + containing 'bytes' fields. (scottcarey) + Avro 1.5.0 (10 March 2011) INCOMPATIBLE CHANGES Modified: avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml (original) +++ avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml Wed Apr 20 20:00:53 2011 @@ -701,7 +701,8 @@ snappy

The "snappy" codec uses Google's Snappy - compression library.

+ compression library. Each compressed block is followed + by its 4-byte, big-endian CRC32 checksum.

Modified: avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java (original) +++ avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java Wed Apr 20 20:00:53 2011 @@ -19,19 +19,19 @@ package org.apache.avro.file; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.zip.CRC32; import org.xerial.snappy.Snappy; import org.xerial.snappy.SnappyException; /** * Implements Snappy compression and decompression. */ class SnappyCodec extends Codec { - - private static final SnappyCodec INSTANCE = new SnappyCodec(); + private CRC32 crc32 = new CRC32(); static class Option extends CodecFactory { @Override protected Codec createInstance() { - return INSTANCE; + return new SnappyCodec(); } } @@ -43,10 +43,15 @@ class SnappyCodec extends Codec { ByteBuffer compress(ByteBuffer in) throws IOException { try { ByteBuffer out = - ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())); + ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4); int size = Snappy.compress(in.array(), in.position(), in.remaining(), out.array(), 0); - out.limit(size); + crc32.reset(); + crc32.update(in.array(), in.position(), in.remaining()); + out.putInt(size, (int)crc32.getValue()); + + out.limit(size+4); + return out; } catch (SnappyException e) { throw new IOException(e); @@ -57,10 +62,16 @@ class SnappyCodec extends Codec { ByteBuffer decompress(ByteBuffer in) throws IOException { try { ByteBuffer out = ByteBuffer.allocate - (Snappy.uncompressedLength(in.array(), in.position(), in.remaining())); - int size = Snappy.uncompress(in.array(), in.position(), in.remaining(), + (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4)); + int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4, out.array(), 0); out.limit(size); + + crc32.reset(); + crc32.update(out.array(), 0, size); + if (in.getInt(in.limit()-4) != (int)crc32.getValue()) + throw new IOException("Checksum failure"); + return out; } catch (SnappyException e) { throw new IOException(e); Modified: avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java (original) +++ avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java Wed Apr 20 20:00:53 2011 @@ -330,7 +330,7 @@ public class GenericDatumReader imple * byte array representation. By default, this calls {@link * Decoder#readBytes(ByteBuffer)}.*/ protected Object readBytes(Object old, Decoder in) throws IOException { - return in.readBytes((ByteBuffer)old); + return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null); } /** Called to read integers. Subclasses may override to use a different Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java (original) +++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java Wed Apr 20 20:00:53 2011 @@ -45,7 +45,6 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonDecoder; import org.apache.avro.compiler.specific.TestSpecificCompiler; import org.apache.avro.util.Utf8; @@ -79,7 +78,7 @@ public class TestSchema { + " \"name\": \"inner_union\" }\n" + " ]\n" + "}\n"; private static final int COUNT = - Integer.parseInt(System.getProperty("test.count", "10")); + Integer.parseInt(System.getProperty("test.count", "30")); @Test public void testNull() throws Exception { @@ -163,6 +162,18 @@ public class TestSchema { check("{\"type\":\"map\", \"values\":\"long\"}", "{\"a\":1}", map); checkParseError("{\"type\":\"map\"}"); // values required } + + @Test + public void testUnionMap() throws Exception { + String unionMapSchema = "{\"name\":\"foo\", \"type\":\"record\"," + + " \"fields\":[ {\"name\":\"mymap\", \"type\":" + + " [{\"type\":\"map\", \"values\":" + + " [\"int\",\"long\",\"float\",\"string\"]}," + + " \"null\"]" + + " }]" + + " }"; + check(unionMapSchema, true); + } @Test public void testRecord() throws Exception { @@ -558,6 +569,7 @@ public class TestSchema { throws Exception { Schema schema = Schema.parse(jsonSchema); checkProp(schema); + Object reuse = null; for (Object datum : new RandomData(schema, COUNT)) { if (induce) { @@ -570,7 +582,10 @@ public class TestSchema { checkBinary(schema, datum, new GenericDatumWriter(), - new GenericDatumReader()); + new GenericDatumReader(), null); + reuse = checkBinary(schema, datum, + new GenericDatumWriter(), + new GenericDatumReader(), reuse); checkDirectBinary(schema, datum, new GenericDatumWriter(), new GenericDatumReader()); @@ -603,6 +618,14 @@ public class TestSchema { DatumWriter writer, DatumReader reader) throws IOException { + checkBinary(schema, datum, writer, reader, null); + } + + public static Object checkBinary(Schema schema, Object datum, + DatumWriter writer, + DatumReader reader, + Object reuse) + throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); writer.setSchema(schema); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); @@ -613,10 +636,11 @@ public class TestSchema { reader.setSchema(schema); Object decoded = - reader.read(null, DecoderFactory.get().binaryDecoder( + reader.read(reuse, DecoderFactory.get().binaryDecoder( data, null)); assertEquals("Decoded data does not match.", datum, decoded); + return decoded; } public static void checkDirectBinary(Schema schema, Object datum, Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java Wed Apr 20 20:00:53 2011 @@ -18,17 +18,20 @@ package org.apache.avro.mapred; +import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; /** A mapper for Avro data. * *

Applications subclass this class and pass their subclass to {@link - * AvroJob#setMapperClass}, overriding {@link #map}. + * AvroJob#setMapperClass(JobConf, Class)}, overriding {@link #map(Object, AvroCollector, Reporter)}. */ -public class AvroMapper extends Configured { +public class AvroMapper extends Configured implements JobConfigurable, Closeable { /** Called with each map input datum. By default, collects inputs. */ @SuppressWarnings("unchecked") @@ -38,4 +41,15 @@ public class AvroMapper extends } + /** Subclasses can override this as desired. */ + @Override + public void close() throws IOException { + // no op + } + + /** Subclasses can override this as desired. */ + @Override + public void configure(JobConf jobConf) { + // no op + } } Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java Wed Apr 20 20:00:53 2011 @@ -18,19 +18,22 @@ package org.apache.avro.mapred; +import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.Reporter; /** A reducer for Avro data. * *

Applications should subclass this class and pass their subclass to {@link - * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}. - * Subclasses override {@link #reduce}. + * AvroJob#setReducerClass(JobConf, Class)} and perhaps {@link AvroJob#setCombinerClass(JobConf, Class)}. + * Subclasses override {@link #reduce(Object, Iterable, AvroCollector, Reporter)}. */ -public class AvroReducer extends Configured { +public class AvroReducer extends Configured implements JobConfigurable, Closeable { private Pair outputPair; @@ -48,4 +51,15 @@ public class AvroReducer extend } } + /** Subclasses can override this as desired. */ + @Override + public void close() throws IOException { + // no op + } + + /** Subclasses can override this as desired. */ + @Override + public void configure(JobConf jobConf) { + // no op + } } Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java Wed Apr 20 20:00:53 2011 @@ -21,11 +21,11 @@ package org.apache.avro.mapred; import java.io.IOException; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; /** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link @@ -45,6 +45,7 @@ class HadoopMapper ext (conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class), conf); this.isMapOnly = conf.getNumReduceTasks() == 0; + this.mapper.configure(conf); } @SuppressWarnings("unchecked") @@ -80,4 +81,9 @@ class HadoopMapper ext mapper.map(wrapper.datum(), out, reporter); } + @Override + public void close() throws IOException { + this.mapper.close(); + } + } Modified: avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java Wed Apr 20 20:00:53 2011 @@ -39,6 +39,7 @@ abstract class HadoopReducerBase, Iterator { @@ -60,4 +61,8 @@ abstract class HadoopReducerBase

+

For jobs whose input is non-Avro data file and which use a + non-Avro {@link org.apache.hadoop.mapred.Mapper} and no reducer, + i.e., a map-only job: +

    +
  • Set your input file format with {@link + org.apache.hadoop.mapred.JobConf#setInputFormat}.
  • +
  • Implement {@link org.apache.hadoop.mapred.Mapper} and specify + your job's mapper with {@link + org.apache.hadoop.mapred.JobConf#setMapperClass}. The output key + and value type should be {@link org.apache.avro.mapred.AvroWrapper} and + {@link org.apache.hadoop.io.NullWritable}.
  • +
  • Call {@link + org.apache.hadoop.mapred.JobConf#setNumReduceTasks(int)} with zero. +
  • Call {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your + job's output schema.
  • +
+

+ Modified: avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java Wed Apr 20 20:00:53 2011 @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.FileInputFormat; @@ -55,7 +56,7 @@ public class TestSequenceFileReader { private static final int COUNT = Integer.parseInt(System.getProperty("test.count", "10")); private static final File DIR - = new File(System.getProperty("test.dir", "/tmp")); + = new File(System.getProperty("test.dir", ".")); private static final File FILE = new File(DIR, "test.seq"); private static final Schema SCHEMA @@ -162,6 +163,45 @@ public class TestSequenceFileReader { new SpecificDatumReader>())); } + private static class NonAvroOnlyMapper + extends MapReduceBase + implements Mapper>,NullWritable> { + + public void map(LongWritable key, Text value, + OutputCollector>,NullWritable> out, + Reporter reporter) throws IOException { + out.collect(new AvroWrapper>(new Pair(key.get(), new Utf8(value.toString()))), + NullWritable.get()); + } + } + + @Test + public void testNonAvroMapOnly() throws Exception { + JobConf job = new JobConf(); + Path output = new Path(System.getProperty("test.dir",".")+"/seq-out"); + + output.getFileSystem(job).delete(output); + + + // configure input for non-Avro sequence file + job.setInputFormat(SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(job, FILE.toURI().toString()); + + // use a hadoop mapper that emits Avro output + job.setMapperClass(NonAvroOnlyMapper.class); + + // configure output for avro + job.setNumReduceTasks(0); // map-only + FileOutputFormat.setOutputPath(job, output); + AvroJob.setOutputSchema(job, SCHEMA); + + JobClient.runJob(job); + + checkFile(new DataFileReader> + (new File(output.toString()+"/part-00000.avro"), + new SpecificDatumReader>())); + } + private static class NonAvroReducer extends MapReduceBase implements Reducer,AvroValue,LongWritable,Text> { Modified: avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java (original) +++ avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java Wed Apr 20 20:00:53 2011 @@ -20,6 +20,7 @@ package org.apache.avro.mapred; import java.io.IOException; import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; @@ -34,15 +35,31 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.file.DataFileReader; import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.After; import org.junit.Test; -import static org.junit.Assert.*; import test.Weather; /** Tests mapred API with a specific record. */ public class TestWeather { + private static final AtomicInteger mapCloseCalls = new AtomicInteger(); + private static final AtomicInteger mapConfigureCalls = new AtomicInteger(); + private static final AtomicInteger reducerCloseCalls = new AtomicInteger(); + private static final AtomicInteger reducerConfigureCalls = new AtomicInteger(); + + + @After + public void tearDown() { + mapCloseCalls.set(0); + mapConfigureCalls.set(0); + reducerCloseCalls.set(0); + reducerConfigureCalls.set(0); + } + /** Uses default mapper with no reduces for a map-only identity job. */ @Test @SuppressWarnings("deprecation") @@ -64,7 +81,7 @@ public class TestWeather { FileOutputFormat.setCompressOutput(job, true); job.setNumReduceTasks(0); // map-only - + JobClient.runJob(job); // check output is correct @@ -88,8 +105,18 @@ public class TestWeather { Reporter reporter) throws IOException { collector.collect(new Pair(w, (Void)null)); } + + @Override + public void close() throws IOException { + mapCloseCalls.incrementAndGet(); + } + + @Override + public void configure(JobConf jobConf) { + mapConfigureCalls.incrementAndGet(); + } } - + // output keys only, since values are empty public static class SortReducer extends AvroReducer { @@ -99,7 +126,17 @@ public class TestWeather { Reporter reporter) throws IOException { collector.collect(w); } - } + + @Override + public void close() throws IOException { + reducerCloseCalls.incrementAndGet(); + } + + @Override + public void configure(JobConf jobConf) { + reducerConfigureCalls.incrementAndGet(); + } + } @Test @SuppressWarnings("deprecation") @@ -140,6 +177,15 @@ public class TestWeather { check.close(); sorted.close(); + + // check that AvroMapper and AvroReducer get close() and configure() called + assertEquals(1, mapCloseCalls.get()); + assertEquals(1, reducerCloseCalls.get()); + // gets called twice for some reason, so loosen this check + assertTrue(mapConfigureCalls.get() >= 1); + assertTrue(reducerConfigureCalls.get() >= 1); + + } Modified: avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java (original) +++ avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java Wed Apr 20 20:00:53 2011 @@ -19,9 +19,7 @@ package org.apache.avro.tool; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.BufferedReader; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.List; @@ -34,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; +import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC; /** Reads a text file into an Avro data file. * @@ -61,36 +60,68 @@ public class FromTextTool implements Too OptionSpec level = p.accepts("level", "compression level") .withOptionalArg().ofType(Integer.class); + OptionSpec codec = p.accepts("codec", "compression codec") + .withOptionalArg().ofType(String.class); + OptionSet opts = p.parse(args.toArray(new String[0])); - if (opts.nonOptionArguments().size() != 2) { + List nargs = opts.nonOptionArguments(); + if (nargs.size() != 2) { err.println("Expected 2 args: from_file to_file (local filenames," + " Hadoop URI's, or '-' for stdin/stdout"); p.printHelpOn(err); return 1; } - BufferedInputStream inStream = Util.fileOrStdin(args.get(0), stdin); - BufferedOutputStream outStream = Util.fileOrStdout(args.get(1), out); - int compressionLevel = 1; // Default compression level if (opts.hasArgument(level)) { compressionLevel = level.value(opts); } - BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); + String codecName = opts.hasArgument(codec) + ? codec.value(opts) + : DEFLATE_CODEC; + CodecFactory codecFactory = codecName.equals(DEFLATE_CODEC) + ? CodecFactory.deflateCodec(compressionLevel) + : CodecFactory.fromString(codecName); + + BufferedInputStream inStream = Util.fileOrStdin(nargs.get(0), stdin); + BufferedOutputStream outStream = Util.fileOrStdout(nargs.get(1), out); + DataFileWriter writer = new DataFileWriter(new GenericDatumWriter()); - writer.setCodec(CodecFactory.deflateCodec(compressionLevel)); + writer.setCodec(codecFactory); writer.create(Schema.parse(TEXT_FILE_SCHEMA), outStream); - String line; - while((line = reader.readLine()) != null) { - ByteBuffer buff = ByteBuffer.wrap(line.getBytes()); - writer.append(buff); + ByteBuffer line = ByteBuffer.allocate(128); + boolean returnSeen = false; + byte[] buf = new byte[8192]; + for (int end = inStream.read(buf); end != -1; end = inStream.read(buf)) { + for (int i = 0; i < end; i++) { + int b = buf[i] & 0xFF; + if (b == '\n') { // newline + System.out.println("Writing line = "+line.position()); + line.flip(); + writer.append(line); + line.clear(); + returnSeen = false; + } else if (b == '\r') { // return + line.flip(); + writer.append(line); + line.clear(); + returnSeen = true; + } else { + if (line.position() == line.limit()) { // reallocate longer line + ByteBuffer tempLine = ByteBuffer.allocate(line.limit()*2); + line.flip(); + tempLine.put(line); + line = tempLine; + } + line.put((byte)b); + returnSeen = false; + } + } } - - writer.flush(); writer.close(); inStream.close(); return 0; Modified: avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff ============================================================================== --- avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java (original) +++ avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java Wed Apr 20 20:00:53 2011 @@ -35,7 +35,7 @@ import org.apache.avro.generic.GenericDa public class ToTextTool implements Tool { private static final String TEXT_FILE_SCHEMA = "\"bytes\""; - private static final byte[] LINE_SEPERATOR = + private static final byte[] LINE_SEPARATOR = System.getProperty("line.separator").getBytes(); @Override @@ -45,7 +45,7 @@ public class ToTextTool implements Tool @Override public String getShortDescription() { - return "Converts and avro file to a text file."; + return "Converts an Avro data file to a text file."; } @Override @@ -77,7 +77,7 @@ public class ToTextTool implements Tool while (fileReader.hasNext()) { ByteBuffer outBuff = (ByteBuffer) fileReader.next(); outStream.write(outBuff.array()); - outStream.write(LINE_SEPERATOR); + outStream.write(LINE_SEPARATOR); } outStream.close();