avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1325903 [3/4] - in /avro/trunk: ./ lang/java/ lang/java/mapred/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/ lang/java/mapred/src/main/java/org/apache/avro/hadoop/io/ la...
Date Fri, 13 Apr 2012 19:03:14 GMT
Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,197 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.file;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.DataFileReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestSortedKeyValueFile {
+  private static final Logger LOG = LoggerFactory.getLogger(TestSortedKeyValueFile.class);
+
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testWriteOutOfSortedOrder() throws IOException {
+    LOG.debug("Writing some records to a SortedKeyValueFile...");
+
+    Configuration conf = new Configuration();
+    SortedKeyValueFile.Writer.Options options = new SortedKeyValueFile.Writer.Options()
+        .withKeySchema(Schema.create(Schema.Type.STRING))
+        .withValueSchema(Schema.create(Schema.Type.STRING))
+        .withConfiguration(conf)
+        .withPath(new Path(mTempDir.getRoot().getPath(), "myfile"))
+        .withIndexInterval(2);  // Index every other record.
+
+    SortedKeyValueFile.Writer<CharSequence, CharSequence> writer
+        = new SortedKeyValueFile.Writer<CharSequence, CharSequence>(options);
+
+    try {
+      writer.append("banana", "Banana");
+      writer.append("apple", "Apple");  // Ruh, roh!
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  public void testWriter() throws IOException {
+    LOG.debug("Writing some records to a SortedKeyValueFile...");
+
+    Configuration conf = new Configuration();
+    SortedKeyValueFile.Writer.Options options = new SortedKeyValueFile.Writer.Options()
+        .withKeySchema(Schema.create(Schema.Type.STRING))
+        .withValueSchema(Schema.create(Schema.Type.STRING))
+        .withConfiguration(conf)
+        .withPath(new Path(mTempDir.getRoot().getPath(), "myfile"))
+        .withIndexInterval(2);  // Index every other record.
+
+    SortedKeyValueFile.Writer<CharSequence, CharSequence> writer
+        = new SortedKeyValueFile.Writer<CharSequence, CharSequence>(options);
+
+    try {
+      writer.append("apple", "Apple");  // Will be indexed.
+      writer.append("banana", "Banana");
+      writer.append("carrot", "Carrot");  // Will be indexed.
+      writer.append("durian", "Durian");
+    } finally {
+      writer.close();
+    }
+
+
+    LOG.debug("Checking the generated directory...");
+    File directory = new File(mTempDir.getRoot().getPath(), "myfile");
+    assertTrue(directory.exists());
+
+
+    LOG.debug("Checking the generated index file...");
+    File indexFile = new File(directory, SortedKeyValueFile.INDEX_FILENAME);
+    DatumReader<GenericRecord> indexReader = new GenericDatumReader<GenericRecord>(
+        AvroKeyValue.getSchema(options.getKeySchema(), Schema.create(Schema.Type.LONG)));
+    FileReader<GenericRecord> indexFileReader = DataFileReader.openReader(indexFile, indexReader);
+
+    List<AvroKeyValue<CharSequence, Long>> indexRecords
+        = new ArrayList<AvroKeyValue<CharSequence, Long>>();
+    try {
+      for (GenericRecord indexRecord : indexFileReader) {
+        indexRecords.add(new AvroKeyValue<CharSequence, Long>(indexRecord));
+      }
+    } finally {
+      indexFileReader.close();
+    }
+
+    assertEquals(2, indexRecords.size());
+    assertEquals("apple", indexRecords.get(0).getKey().toString());
+    LOG.debug("apple's position in the file: " + indexRecords.get(0).getValue());
+    assertEquals("carrot", indexRecords.get(1).getKey().toString());
+    LOG.debug("carrot's position in the file: " + indexRecords.get(1).getValue());
+
+    LOG.debug("Checking the generated data file...");
+    File dataFile = new File(directory, SortedKeyValueFile.DATA_FILENAME);
+    DatumReader<GenericRecord> dataReader = new GenericDatumReader<GenericRecord>(
+        AvroKeyValue.getSchema(options.getKeySchema(), options.getValueSchema()));
+    DataFileReader<GenericRecord> dataFileReader
+        = new DataFileReader<GenericRecord>(dataFile, dataReader);
+
+    try {
+      dataFileReader.seek(indexRecords.get(0).getValue());
+      assertTrue(dataFileReader.hasNext());
+      AvroKeyValue<CharSequence, CharSequence> appleRecord
+          = new AvroKeyValue<CharSequence, CharSequence>(dataFileReader.next());
+      assertEquals("apple", appleRecord.getKey().toString());
+      assertEquals("Apple", appleRecord.getValue().toString());
+
+      dataFileReader.seek(indexRecords.get(1).getValue());
+      assertTrue(dataFileReader.hasNext());
+      AvroKeyValue<CharSequence, CharSequence> carrotRecord
+          = new AvroKeyValue<CharSequence, CharSequence>(dataFileReader.next());
+      assertEquals("carrot", carrotRecord.getKey().toString());
+      assertEquals("Carrot", carrotRecord.getValue().toString());
+
+      assertTrue(dataFileReader.hasNext());
+      AvroKeyValue<CharSequence, CharSequence> durianRecord
+          = new AvroKeyValue<CharSequence, CharSequence>(dataFileReader.next());
+      assertEquals("durian", durianRecord.getKey().toString());
+      assertEquals("Durian", durianRecord.getValue().toString());
+    } finally {
+      dataFileReader.close();
+    }
+  }
+
+  @Test
+  public void testReader() throws IOException {
+    Configuration conf = new Configuration();
+    SortedKeyValueFile.Writer.Options writerOptions = new SortedKeyValueFile.Writer.Options()
+        .withKeySchema(Schema.create(Schema.Type.STRING))
+        .withValueSchema(Schema.create(Schema.Type.STRING))
+        .withConfiguration(conf)
+        .withPath(new Path(mTempDir.getRoot().getPath(), "myfile"))
+        .withIndexInterval(2);  // Index every other record.
+
+    SortedKeyValueFile.Writer<CharSequence, CharSequence> writer
+        = new SortedKeyValueFile.Writer<CharSequence, CharSequence>(writerOptions);
+
+    try {
+      writer.append("apple", "Apple");  // Will be indexed.
+      writer.append("banana", "Banana");
+      writer.append("carrot", "Carrot");  // Will be indexed.
+      writer.append("durian", "Durian");
+    } finally {
+      writer.close();
+    }
+
+    LOG.debug("Reading the file back using a reader...");
+    SortedKeyValueFile.Reader.Options readerOptions = new SortedKeyValueFile.Reader.Options()
+        .withKeySchema(Schema.create(Schema.Type.STRING))
+        .withValueSchema(Schema.create(Schema.Type.STRING))
+        .withConfiguration(conf)
+        .withPath(new Path(mTempDir.getRoot().getPath(), "myfile"));
+
+    SortedKeyValueFile.Reader<CharSequence, CharSequence> reader
+        = new SortedKeyValueFile.Reader<CharSequence, CharSequence>(readerOptions);
+
+    try {
+      assertEquals("Carrot", reader.get("carrot").toString());
+      assertEquals("Banana", reader.get("banana").toString());
+      assertNull(reader.get("a-vegetable"));
+      assertNull(reader.get("beet"));
+      assertNull(reader.get("zzz"));
+    } finally {
+      reader.close();
+    }
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroDatumConverterFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroDatumConverterFactory.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroDatumConverterFactory.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroDatumConverterFactory.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,132 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAvroDatumConverterFactory {
+  private Job mJob;
+  private AvroDatumConverterFactory mFactory;
+
+  @Before
+  public void setup() throws IOException {
+    mJob = new Job();
+    mFactory = new AvroDatumConverterFactory(mJob.getConfiguration());
+  }
+
+  @Test
+  public void testConvertAvroKey() throws IOException {
+    AvroJob.setOutputKeySchema(mJob, Schema.create(Schema.Type.STRING));
+
+    AvroKey<CharSequence> avroKey = new AvroKey<CharSequence>("foo");
+    @SuppressWarnings("unchecked")
+    AvroDatumConverter<AvroKey<CharSequence>, ?> converter = mFactory.create(
+        (Class<AvroKey<CharSequence>>) avroKey.getClass());
+    assertEquals("foo", converter.convert(avroKey).toString());
+  }
+
+  @Test
+  public void testConvertAvroValue() throws IOException {
+    AvroJob.setOutputValueSchema(mJob, Schema.create(Schema.Type.INT));
+
+    AvroValue<Integer> avroValue = new AvroValue<Integer>(42);
+    @SuppressWarnings("unchecked")
+    AvroDatumConverter<AvroValue<Integer>, Integer> converter = mFactory.create(
+        (Class<AvroValue<Integer>>) avroValue.getClass());
+    assertEquals(42, converter.convert(avroValue).intValue());
+  }
+
+  @Test
+  public void testConvertBooleanWritable() {
+    AvroDatumConverter<BooleanWritable, Boolean> converter
+        = mFactory.create(BooleanWritable.class);
+    assertEquals(true, converter.convert(new BooleanWritable(true)).booleanValue());
+  }
+
+  @Test
+  public void testConvertBytesWritable() {
+    AvroDatumConverter<BytesWritable, ByteBuffer> converter = mFactory.create(BytesWritable.class);
+    ByteBuffer bytes = converter.convert(new BytesWritable(new byte[] { 1, 2, 3 }));
+    assertEquals(1, bytes.get(0));
+    assertEquals(2, bytes.get(1));
+    assertEquals(3, bytes.get(2));
+  }
+
+  @Test
+  public void testConvertByteWritable() {
+    AvroDatumConverter<ByteWritable, GenericFixed> converter = mFactory.create(ByteWritable.class);
+    assertEquals(42, converter.convert(new ByteWritable((byte) 42)).bytes()[0]);
+  }
+
+  @Test
+  public void testConvertDoubleWritable() {
+    AvroDatumConverter<DoubleWritable, Double> converter = mFactory.create(DoubleWritable.class);
+    assertEquals(2.0, converter.convert(new DoubleWritable(2.0)).doubleValue(), 0.00001);
+  }
+
+  @Test
+  public void testConvertFloatWritable() {
+    AvroDatumConverter<FloatWritable, Float> converter = mFactory.create(FloatWritable.class);
+    assertEquals(2.2f, converter.convert(new FloatWritable(2.2f)).floatValue(), 0.00001);
+  }
+
+  @Test
+  public void testConvertIntWritable() {
+    AvroDatumConverter<IntWritable, Integer> converter = mFactory.create(IntWritable.class);
+    assertEquals(2, converter.convert(new IntWritable(2)).intValue());
+  }
+
+  @Test
+  public void testConvertLongWritable() {
+    AvroDatumConverter<LongWritable, Long> converter = mFactory.create(LongWritable.class);
+    assertEquals(123L, converter.convert(new LongWritable(123L)).longValue());
+  }
+
+  @Test
+  public void testConvertNullWritable() {
+    AvroDatumConverter<NullWritable, Object> converter = mFactory.create(NullWritable.class);
+    assertNull(converter.convert(NullWritable.get()));
+  }
+
+  @Test
+  public void testConvertText() {
+    AvroDatumConverter<Text, CharSequence> converter = mFactory.create(Text.class);
+    assertEquals("foo", converter.convert(new Text("foo")).toString());
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroDatumConverterFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroKeyDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroKeyDeserializer.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroKeyDeserializer.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroKeyDeserializer.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.AvroWrapper;
+import org.junit.Test;
+
+public class TestAvroKeyDeserializer {
+  @Test
+  public void testDeserialize() throws IOException {
+    // Create a deserializer.
+    Schema writerSchema = Schema.create(Schema.Type.STRING);
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    AvroKeyDeserializer<CharSequence> deserializer
+        = new AvroKeyDeserializer<CharSequence>(writerSchema, readerSchema);
+
+    // Check the schemas.
+    assertEquals(writerSchema, deserializer.getWriterSchema());
+    assertEquals(readerSchema, deserializer.getReaderSchema());
+
+    // Write some records to deserialize.
+    DatumWriter<CharSequence> datumWriter = new GenericDatumWriter<CharSequence>(writerSchema);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+    datumWriter.write("record1", encoder);
+    datumWriter.write("record2", encoder);
+    encoder.flush();
+
+    // Deserialize the records.
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    deserializer.open(inputStream);
+    AvroWrapper<CharSequence> record = null;
+
+    record = deserializer.deserialize(record);
+    assertEquals("record1", record.datum().toString());
+
+    record = deserializer.deserialize(record);
+    assertEquals("record2", record.datum().toString());
+
+    deserializer.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroKeyDeserializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSequenceFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSequenceFile.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSequenceFile.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSequenceFile.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,209 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.avro.hadoop.io.AvroSequenceFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroSequenceFile {
+  // Disable checkstyle for this variable.  It must be public to work with JUnit @Rule.
+  // CHECKSTYLE:OFF
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+  // CHECKSTYLE:ON
+
+  /** Tests that reading and writing avro data works. */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testReadAvro() throws IOException {
+    Path sequenceFilePath = new Path(new File(mTempDir.getRoot(), "output.seq").getPath());
+
+    writeSequenceFile(sequenceFilePath, AvroKey.class, AvroValue.class,
+        Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT),
+        new AvroKey<CharSequence>("one"), new AvroValue<Integer>(1),
+        new AvroKey<CharSequence>("two"), new AvroValue<Integer>(2));
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    AvroSequenceFile.Reader.Options options = new AvroSequenceFile.Reader.Options()
+        .withFileSystem(fs)
+        .withInputPath(sequenceFilePath)
+        .withKeySchema(Schema.create(Schema.Type.STRING))
+        .withValueSchema(Schema.create(Schema.Type.INT))
+        .withConfiguration(conf);
+    SequenceFile.Reader reader = new AvroSequenceFile.Reader(options);
+
+    AvroKey<CharSequence> key = new AvroKey<CharSequence>();
+    AvroValue<Integer> value = new AvroValue<Integer>();
+
+    // Read the first record.
+    key = (AvroKey<CharSequence>) reader.next(key);
+    assertNotNull(key);
+    assertEquals("one", key.datum().toString());
+    value = (AvroValue<Integer>) reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(1, value.datum().intValue());
+
+    // Read the second record.
+    key = (AvroKey<CharSequence>) reader.next(key);
+    assertNotNull(key);
+    assertEquals("two", key.datum().toString());
+    value = (AvroValue<Integer>) reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(2, value.datum().intValue());
+
+    assertNull("Should be no more records.", reader.next(key));
+  }
+
+  /** Tests that reading and writing avro records without a reader schema works. */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testReadAvroWithoutReaderSchemas() throws IOException {
+    Path sequenceFilePath = new Path(new File(mTempDir.getRoot(), "output.seq").getPath());
+
+    writeSequenceFile(sequenceFilePath, AvroKey.class, AvroValue.class,
+        Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT),
+        new AvroKey<CharSequence>("one"), new AvroValue<Integer>(1),
+        new AvroKey<CharSequence>("two"), new AvroValue<Integer>(2));
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    AvroSequenceFile.Reader.Options options = new AvroSequenceFile.Reader.Options()
+        .withFileSystem(fs)
+        .withInputPath(sequenceFilePath)
+        .withConfiguration(conf);
+    SequenceFile.Reader reader = new AvroSequenceFile.Reader(options);
+
+    AvroKey<CharSequence> key = new AvroKey<CharSequence>();
+    AvroValue<Integer> value = new AvroValue<Integer>();
+
+    // Read the first record.
+    key = (AvroKey<CharSequence>) reader.next(key);
+    assertNotNull(key);
+    assertEquals("one", key.datum().toString());
+    value = (AvroValue<Integer>) reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(1, value.datum().intValue());
+
+    // Read the second record.
+    key = (AvroKey<CharSequence>) reader.next(key);
+    assertNotNull(key);
+    assertEquals("two", key.datum().toString());
+    value = (AvroValue<Integer>) reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(2, value.datum().intValue());
+
+    assertNull("Should be no more records.", reader.next(key));
+  }
+
+  /** Tests that reading and writing ordinary Writables still works. */
+  @Test
+  public void testReadWritables() throws IOException {
+    Path sequenceFilePath = new Path(new File(mTempDir.getRoot(), "output.seq").getPath());
+
+    writeSequenceFile(sequenceFilePath, Text.class, IntWritable.class, null, null,
+        new Text("one"), new IntWritable(1),
+        new Text("two"), new IntWritable(2));
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    AvroSequenceFile.Reader.Options options = new AvroSequenceFile.Reader.Options()
+        .withFileSystem(fs)
+        .withInputPath(sequenceFilePath)
+        .withConfiguration(conf);
+    SequenceFile.Reader reader = new AvroSequenceFile.Reader(options);
+
+    Text key = new Text();
+    IntWritable value = new IntWritable();
+
+    // Read the first record.
+    assertTrue(reader.next(key));
+    assertEquals("one", key.toString());
+    reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(1, value.get());
+
+    // Read the second record.
+    assertTrue(reader.next(key));
+    assertEquals("two", key.toString());
+    reader.getCurrentValue(value);
+    assertNotNull(value);
+    assertEquals(2, value.get());
+
+    assertFalse("Should be no more records.", reader.next(key));
+  }
+
+  /**
+   * Writes a sequence file of records.
+   *
+   * @param file The target file path.
+   * @param keySchema The schema of the key if using Avro, else null.
+   * @param valueSchema The schema of the value if using Avro, else null.
+   * @param records <i>key1</i>, <i>value1</i>, <i>key2</i>, <i>value2</i>, ...
+   */
+  private void writeSequenceFile(Path file, Class<?> keyClass, Class<?> valueClass,
+      Schema keySchema, Schema valueSchema, Object... records) throws IOException {
+    // Make sure the key/value records have an even size.
+    if (0 != records.length % 2) {
+      throw new IllegalArgumentException("Expected a value for each key record.");
+    }
+
+    // Open a AvroSequenceFile writer.
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    AvroSequenceFile.Writer.Options options = new AvroSequenceFile.Writer.Options()
+        .withFileSystem(fs)
+        .withConfiguration(conf)
+        .withOutputPath(file);
+    if (null != keySchema) {
+      options.withKeySchema(keySchema);
+    } else {
+      options.withKeyClass(keyClass);
+    }
+    if (null != valueSchema) {
+      options.withValueSchema(valueSchema);
+    } else {
+      options.withValueClass(valueClass);
+    }
+    SequenceFile.Writer writer = new AvroSequenceFile.Writer(options);
+
+    // Write some records.
+    for (int i = 0; i < records.length; i += 2) {
+      writer.append(records[i], records[i + 1]);
+    }
+
+    // Close the file.
+    writer.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSequenceFile.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+public class TestAvroSerialization {
+  @Test
+  public void testAccept() {
+    AvroSerialization<CharSequence> serialization = new AvroSerialization<CharSequence>();
+
+    assertTrue(serialization.accept(AvroKey.class));
+    assertTrue(serialization.accept(AvroValue.class));
+    assertFalse(serialization.accept(AvroWrapper.class));
+    assertFalse(serialization.accept(String.class));
+  }
+
+  @Test
+  public void testGetSerializerForKey() throws IOException {
+    // Set the writer schema in the job configuration.
+    Schema writerSchema = Schema.create(Schema.Type.STRING);
+    Job job = new Job();
+    AvroJob.setMapOutputKeySchema(job, writerSchema);
+
+    // Get a serializer from the configuration.
+    AvroSerialization serialization
+        = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+    @SuppressWarnings("unchecked")
+    Serializer<AvroWrapper> serializer = serialization.getSerializer(AvroKey.class);
+    assertTrue(serializer instanceof AvroSerializer);
+    AvroSerializer avroSerializer = (AvroSerializer) serializer;
+
+    // Check that the writer schema is set correctly on the serializer.
+    assertEquals(writerSchema, avroSerializer.getWriterSchema());
+  }
+
+  @Test
+  public void testGetSerializerForValue() throws IOException {
+    // Set the writer schema in the job configuration.
+    Schema writerSchema = Schema.create(Schema.Type.STRING);
+    Job job = new Job();
+    AvroJob.setMapOutputValueSchema(job, writerSchema);
+
+    // Get a serializer from the configuration.
+    AvroSerialization serialization
+        = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+    @SuppressWarnings("unchecked")
+    Serializer<AvroWrapper> serializer = serialization.getSerializer(AvroValue.class);
+    assertTrue(serializer instanceof AvroSerializer);
+    AvroSerializer avroSerializer = (AvroSerializer) serializer;
+
+    // Check that the writer schema is set correctly on the serializer.
+    assertEquals(writerSchema, avroSerializer.getWriterSchema());
+  }
+
+  @Test
+  public void testGetDeserializerForKey() throws IOException {
+    // Set the reader schema in the job configuration.
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    Job job = new Job();
+    AvroJob.setMapOutputKeySchema(job, readerSchema);
+
+    // Get a deserializer from the configuration.
+    AvroSerialization serialization
+        = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+    @SuppressWarnings("unchecked")
+    Deserializer<AvroWrapper> deserializer = serialization.getDeserializer(AvroKey.class);
+    assertTrue(deserializer instanceof AvroKeyDeserializer);
+    AvroKeyDeserializer avroDeserializer = (AvroKeyDeserializer) deserializer;
+
+    // Check that the reader schema is set correctly on the deserializer.
+    assertEquals(readerSchema, avroDeserializer.getReaderSchema());
+  }
+
+  @Test
+  public void testGetDeserializerForValue() throws IOException {
+    // Set the reader schema in the job configuration.
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    Job job = new Job();
+    AvroJob.setMapOutputValueSchema(job, readerSchema);
+
+    // Get a deserializer from the configuration.
+    AvroSerialization serialization
+        = ReflectionUtils.newInstance(AvroSerialization.class, job.getConfiguration());
+    @SuppressWarnings("unchecked")
+    Deserializer<AvroWrapper> deserializer = serialization.getDeserializer(AvroValue.class);
+    assertTrue(deserializer instanceof AvroValueDeserializer);
+    AvroValueDeserializer avroDeserializer = (AvroValueDeserializer) deserializer;
+
+    // Check that the reader schema is set correctly on the deserializer.
+    assertEquals(readerSchema, avroDeserializer.getReaderSchema());
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerialization.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerializer.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerializer.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerializer.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,66 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.junit.Test;
+
+public class TestAvroSerializer {
+  @Test
+  public void testSerialize() throws IOException {
+    // Create a serializer.
+    Schema writerSchema = Schema.create(Schema.Type.STRING);
+    AvroSerializer<CharSequence> serializer = new AvroSerializer<CharSequence>(writerSchema);
+
+    // Check the writer schema.
+    assertEquals(writerSchema, serializer.getWriterSchema());
+
+    // Serialize two records, 'record1' and 'record2'.
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    serializer.open(outputStream);
+    serializer.serialize(new AvroKey<CharSequence>("record1"));
+    serializer.serialize(new AvroKey<CharSequence>("record2"));
+    serializer.close();
+
+    // Make sure the records were serialized correctly.
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    DatumReader<CharSequence> datumReader = new GenericDatumReader<CharSequence>(readerSchema);
+    Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+    CharSequence record = null;
+
+    record = datumReader.read(record, decoder);
+    assertEquals("record1", record.toString());
+
+    record = datumReader.read(record, decoder);
+    assertEquals("record2", record.toString());
+
+    inputStream.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroSerializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroValueDeserializer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroValueDeserializer.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroValueDeserializer.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroValueDeserializer.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.io;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.AvroWrapper;
+import org.junit.Test;
+
+public class TestAvroValueDeserializer {
+  @Test
+  public void testDeserialize() throws IOException {
+    // Create a deserializer.
+    Schema writerSchema = Schema.create(Schema.Type.STRING);
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    AvroValueDeserializer<CharSequence> deserializer
+        = new AvroValueDeserializer<CharSequence>(writerSchema, readerSchema);
+
+    // Check the schemas.
+    assertEquals(writerSchema, deserializer.getWriterSchema());
+    assertEquals(readerSchema, deserializer.getReaderSchema());
+
+    // Write some records to deserialize.
+    DatumWriter<CharSequence> datumWriter = new GenericDatumWriter<CharSequence>(writerSchema);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
+    datumWriter.write("record1", encoder);
+    datumWriter.write("record2", encoder);
+    encoder.flush();
+
+    // Deserialize the records.
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    deserializer.open(inputStream);
+    AvroWrapper<CharSequence> record = null;
+
+    record = deserializer.deserialize(record);
+    assertEquals("record1", record.datum().toString());
+
+    record = deserializer.deserialize(record);
+    assertEquals("record2", record.datum().toString());
+
+    deserializer.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/io/TestAvroValueDeserializer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/util/TestAvroCharSequenceComparator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/util/TestAvroCharSequenceComparator.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/util/TestAvroCharSequenceComparator.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/util/TestAvroCharSequenceComparator.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.hadoop.util;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.number.OrderingComparisons.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.avro.util.Utf8;
+
+public class TestAvroCharSequenceComparator {
+  private AvroCharSequenceComparator<CharSequence> mComparator;
+
+  @Before
+  public void setup() {
+    mComparator = new AvroCharSequenceComparator<CharSequence>();
+  }
+
+  @Test
+  public void testCompareString() {
+    assertEquals(0, mComparator.compare("", ""));
+    assertThat(mComparator.compare("", "a"), lessThan(0));
+    assertThat(mComparator.compare("a", ""), greaterThan(0));
+
+    assertEquals(0, mComparator.compare("a", "a"));
+    assertThat(mComparator.compare("a", "b"), lessThan(0));
+    assertThat(mComparator.compare("b", "a"), greaterThan(0));
+
+    assertEquals(0, mComparator.compare("ab", "ab"));
+    assertThat(mComparator.compare("a", "aa"), lessThan(0));
+    assertThat(mComparator.compare("aa", "a"), greaterThan(0));
+
+    assertThat(mComparator.compare("abc", "abcdef"), lessThan(0));
+    assertThat(mComparator.compare("abcdef", "abc"), greaterThan(0));
+  }
+
+  @Test
+  public void testCompareUtf8() {
+    assertEquals(0, mComparator.compare(new Utf8(""), new Utf8("")));
+    assertThat(mComparator.compare(new Utf8(""), new Utf8("a")), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("a"), new Utf8("")), greaterThan(0));
+
+    assertEquals(0, mComparator.compare(new Utf8("a"), new Utf8("a")));
+    assertThat(mComparator.compare(new Utf8("a"), new Utf8("b")), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("b"), new Utf8("a")), greaterThan(0));
+
+    assertEquals(0, mComparator.compare(new Utf8("ab"), new Utf8("ab")));
+    assertThat(mComparator.compare(new Utf8("a"), new Utf8("aa")), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("aa"), new Utf8("a")), greaterThan(0));
+
+    assertThat(mComparator.compare(new Utf8("abc"), new Utf8("abcdef")), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("abcdef"), new Utf8("abc")), greaterThan(0));
+  }
+
+  @Test
+  public void testCompareUtf8ToString() {
+    assertEquals(0, mComparator.compare(new Utf8(""), ""));
+    assertThat(mComparator.compare(new Utf8(""), "a"), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("a"), ""), greaterThan(0));
+
+    assertEquals(0, mComparator.compare(new Utf8("a"), "a"));
+    assertThat(mComparator.compare(new Utf8("a"), "b"), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("b"), "a"), greaterThan(0));
+
+    assertEquals(0, mComparator.compare(new Utf8("ab"), "ab"));
+    assertThat(mComparator.compare(new Utf8("a"), "aa"), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("aa"), "a"), greaterThan(0));
+
+    assertThat(mComparator.compare(new Utf8("abc"), "abcdef"), lessThan(0));
+    assertThat(mComparator.compare(new Utf8("abcdef"), "abc"), greaterThan(0));
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/util/TestAvroCharSequenceComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/AvroFiles.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/AvroFiles.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/AvroFiles.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/AvroFiles.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+
+/**
+ * A utility class for working with Avro container files within tests.
+ */
+public final class AvroFiles {
+  private AvroFiles() {}
+
+  /**
+   * Creates an avro container file.
+   *
+   * @param file The file to create.
+   * @param schema The schema for the records the file should contain.
+   * @param records The records to put in the file.
+   * @param <T> The (java) type of the avro records.
+   * @return The created file.
+   */
+  public static <T> File createFile(File file, Schema schema, T... records)
+      throws IOException {
+    DatumWriter<T> datumWriter = new GenericDatumWriter<T>(schema);
+    DataFileWriter<T> fileWriter = new DataFileWriter<T>(datumWriter);
+    fileWriter.create(schema, file);
+    for (T record : records) {
+      fileWriter.append(record);
+    }
+    fileWriter.close();
+
+    return file;
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/AvroFiles.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyInputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyInputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyInputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyInputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+import static org.easymock.EasyMock.*;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Test;
+
+public class TestAvroKeyInputFormat {
+  /**
+   * Verifies that a non-null record reader can be created, and the key/value types are
+   * as expected.
+   */
+  @Test
+  public void testCreateRecordReader() throws IOException, InterruptedException {
+    // Set up the job configuration.
+    Job job = new Job();
+    AvroJob.setInputKeySchema(job, Schema.create(Schema.Type.STRING));
+    Configuration conf = job.getConfiguration();
+
+    FileSplit inputSplit = createMock(FileSplit.class);
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+    expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+    replay(inputSplit);
+    replay(context);
+
+    AvroKeyInputFormat inputFormat = new AvroKeyInputFormat();
+    @SuppressWarnings("unchecked")
+    RecordReader<AvroKey<Object>, NullWritable> recordReader = inputFormat.createRecordReader(
+        inputSplit, context);
+    assertNotNull(inputFormat);
+    recordReader.close();
+
+    verify(inputSplit);
+    verify(context);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroKeyOutputFormat {
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+
+  @Test
+  public void testWithNullCodec() throws IOException {
+    Configuration conf = new Configuration();
+    testGetRecordWriter(conf, CodecFactory.nullCodec());
+  }
+
+  @Test
+  public void testWithDeflateCodec() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.output.compress", true);
+    conf.setInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, 3);
+    testGetRecordWriter(conf, CodecFactory.deflateCodec(3));
+  }
+
+  /**
+   * Tests that the record writer is contructed and returned correclty from the output format.
+   */
+  private void testGetRecordWriter(Configuration conf, CodecFactory expectedCodec)
+      throws IOException {
+    // Configure a mock task attempt context.
+    Job job = new Job(conf);
+    job.getConfiguration().set("mapred.output.dir", mTempDir.getRoot().getPath());
+    Schema writerSchema = Schema.create(Schema.Type.INT);
+    AvroJob.setOutputKeySchema(job, writerSchema);
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+    expect(context.getConfiguration())
+        .andReturn(job.getConfiguration()).anyTimes();
+    expect(context.getTaskAttemptID())
+        .andReturn(new TaskAttemptID("id", 1, true, 1, 1))
+        .anyTimes();
+
+    // Create a mock record writer.
+    @SuppressWarnings("unchecked")
+    RecordWriter<AvroKey<Integer>, NullWritable> expectedRecordWriter
+        = createMock(RecordWriter.class);
+    AvroKeyOutputFormat.RecordWriterFactory recordWriterFactory
+        = createMock(AvroKeyOutputFormat.RecordWriterFactory.class);
+
+    // Expect the record writer factory to be called with appropriate parameters.
+    Capture<CodecFactory> capturedCodecFactory = new Capture<CodecFactory>();
+    expect(recordWriterFactory.create(eq(writerSchema),
+        capture(capturedCodecFactory),  // Capture for comparison later.
+        anyObject(OutputStream.class))).andReturn(expectedRecordWriter);
+
+    replay(context);
+    replay(expectedRecordWriter);
+    replay(recordWriterFactory);
+
+    AvroKeyOutputFormat<Integer> outputFormat
+        = new AvroKeyOutputFormat<Integer>(recordWriterFactory);
+    RecordWriter<AvroKey<Integer>, NullWritable> recordWriter
+        = outputFormat.getRecordWriter(context);
+    // Make sure the expected codec was used.
+    assertTrue(capturedCodecFactory.hasCaptured());
+    assertEquals(expectedCodec.toString(), capturedCodecFactory.getValue().toString());
+
+    verify(context);
+    verify(expectedRecordWriter);
+    verify(recordWriterFactory);
+
+    assertNotNull(recordWriter);
+    assertTrue(expectedRecordWriter == recordWriter);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyOutputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordReader.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordReader.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordReader.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,133 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroKeyRecordReader {
+  /** A temporary directory for test data. */
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+
+  /**
+   * Verifies that avro records can be read and progress is reported correctly.
+   */
+  @Test
+  public void testReadRecords() throws IOException, InterruptedException {
+    // Create the test avro file input with two records:
+    //   1. "first"
+    //   2. "second"
+    final SeekableInput avroFileInput = new SeekableFileInput(
+        AvroFiles.createFile(new File(mTempDir.getRoot(), "myStringfile.avro"),
+            Schema.create(Schema.Type.STRING), "first", "second"));
+
+    // Create the record reader.
+    Schema readerSchema = Schema.create(Schema.Type.STRING);
+    RecordReader<AvroKey<CharSequence>, NullWritable> recordReader
+        = new AvroKeyRecordReader<CharSequence>(readerSchema) {
+      @Override
+      protected SeekableInput createSeekableInput(Configuration conf, Path path)
+          throws IOException {
+        return avroFileInput;
+      }
+    };
+
+    // Set up the job configuration.
+    Configuration conf = new Configuration();
+
+    // Create a mock input split for this record reader.
+    FileSplit inputSplit = createMock(FileSplit.class);
+    expect(inputSplit.getPath()).andReturn(new Path("/path/to/an/avro/file")).anyTimes();
+    expect(inputSplit.getStart()).andReturn(0L).anyTimes();
+    expect(inputSplit.getLength()).andReturn(avroFileInput.length()).anyTimes();
+
+    // Create a mock task attempt context for this record reader.
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+    expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+    // Initialize the record reader.
+    replay(inputSplit);
+    replay(context);
+    recordReader.initialize(inputSplit, context);
+
+    assertEquals("Progress should be zero before any records are read",
+        0.0f, recordReader.getProgress(), 0.0f);
+
+    // Some variables to hold the records.
+    AvroKey<CharSequence> key;
+    NullWritable value;
+
+    // Read the first record.
+    assertTrue("Expected at least one record", recordReader.nextKeyValue());
+    key = recordReader.getCurrentKey();
+    value = recordReader.getCurrentValue();
+
+    assertNotNull("First record had null key", key);
+    assertNotNull("First record had null value", value);
+
+    CharSequence firstString = key.datum();
+    assertEquals("first", firstString.toString());
+
+    assertTrue("getCurrentKey() returned different keys for the same record",
+        key == recordReader.getCurrentKey());
+    assertTrue("getCurrentValue() returned different values for the same record",
+        value == recordReader.getCurrentValue());
+
+    // Read the second record.
+    assertTrue("Expected to read a second record", recordReader.nextKeyValue());
+    key = recordReader.getCurrentKey();
+    value = recordReader.getCurrentValue();
+
+    assertNotNull("Second record had null key", key);
+    assertNotNull("Second record had null value", value);
+
+    CharSequence secondString = key.datum();
+    assertEquals("second", secondString.toString());
+
+    assertEquals("Progress should be complete (2 out of 2 records processed)",
+        1.0f, recordReader.getProgress(), 0.0f);
+
+    // There should be no more records.
+    assertFalse("Expected only 2 records", recordReader.nextKeyValue());
+
+    // Close the record reader.
+    recordReader.close();
+
+    // Verify the expected calls on the mocks.
+    verify(inputSplit);
+    verify(context);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class TestAvroKeyRecordWriter {
+  @Test
+  public void testWrite() throws IOException {
+    Schema writerSchema = Schema.create(Schema.Type.INT);
+    CodecFactory compressionCodec = CodecFactory.nullCodec();
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+    replay(context);
+
+    // Write an avro container file with two records: 1 and 2.
+    AvroKeyRecordWriter<Integer> recordWriter = new AvroKeyRecordWriter<Integer>(
+        writerSchema, compressionCodec, outputStream);
+    recordWriter.write(new AvroKey<Integer>(1), NullWritable.get());
+    recordWriter.write(new AvroKey<Integer>(2), NullWritable.get());
+    recordWriter.close(context);
+
+    verify(context);
+
+    // Verify that the file was written as expected.
+    InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    Schema readerSchema = Schema.create(Schema.Type.INT);
+    DatumReader<Integer> datumReader = new SpecificDatumReader<Integer>(readerSchema);
+    DataFileStream<Integer> dataFileReader = new DataFileStream<Integer>(inputStream, datumReader);
+
+    assertTrue(dataFileReader.hasNext());  // Record 1.
+    assertEquals(1, dataFileReader.next().intValue());
+    assertTrue(dataFileReader.hasNext());  // Record 2.
+    assertEquals(2, dataFileReader.next().intValue());
+    assertFalse(dataFileReader.hasNext());  // No more records.
+
+    dataFileReader.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordReader.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordReader.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordReader.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,148 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroKeyValueRecordReader {
+  /** A temporary directory for test data. */
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+
+  /**
+   * Verifies that avro records can be read and progress is reported correctly.
+   */
+  @Test
+  public void testReadRecords() throws IOException, InterruptedException {
+    // Create the test avro file input with two records:
+    //   1. <"firstkey", 1>
+    //   2. <"second", 2>
+    Schema keyValueSchema = AvroKeyValue.getSchema(
+        Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT));
+
+    AvroKeyValue<CharSequence, Integer> firstInputRecord
+        = new AvroKeyValue<CharSequence, Integer>(new GenericData.Record(keyValueSchema));
+    firstInputRecord.setKey("first");
+    firstInputRecord.setValue(1);
+
+    AvroKeyValue<CharSequence, Integer> secondInputRecord
+        = new AvroKeyValue<CharSequence, Integer>(new GenericData.Record(keyValueSchema));
+    secondInputRecord.setKey("second");
+    secondInputRecord.setValue(2);
+
+    final SeekableInput avroFileInput = new SeekableFileInput(
+        AvroFiles.createFile(new File(mTempDir.getRoot(), "myInputFile.avro"), keyValueSchema,
+            firstInputRecord.get(), secondInputRecord.get()));
+
+    // Create the record reader over the avro input file.
+    RecordReader<AvroKey<CharSequence>, AvroValue<Integer>> recordReader
+        = new AvroKeyValueRecordReader<CharSequence, Integer>(
+            Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.INT)) {
+      @Override
+      protected SeekableInput createSeekableInput(Configuration conf, Path path)
+          throws IOException {
+        return avroFileInput;
+      }
+    };
+
+    // Set up the job configuration.
+    Configuration conf = new Configuration();
+
+    // Create a mock input split for this record reader.
+    FileSplit inputSplit = createMock(FileSplit.class);
+    expect(inputSplit.getPath()).andReturn(new Path("/path/to/an/avro/file")).anyTimes();
+    expect(inputSplit.getStart()).andReturn(0L).anyTimes();
+    expect(inputSplit.getLength()).andReturn(avroFileInput.length()).anyTimes();
+
+    // Create a mock task attempt context for this record reader.
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+    expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+    // Initialize the record reader.
+    replay(inputSplit);
+    replay(context);
+    recordReader.initialize(inputSplit, context);
+
+    assertEquals("Progress should be zero before any records are read",
+        0.0f, recordReader.getProgress(), 0.0f);
+
+    // Some variables to hold the records.
+    AvroKey<CharSequence> key;
+    AvroValue<Integer> value;
+
+    // Read the first record.
+    assertTrue("Expected at least one record", recordReader.nextKeyValue());
+    key = recordReader.getCurrentKey();
+    value = recordReader.getCurrentValue();
+
+    assertNotNull("First record had null key", key);
+    assertNotNull("First record had null value", value);
+
+    assertEquals("first", key.datum().toString());
+    assertEquals(1, value.datum().intValue());
+
+    assertTrue("getCurrentKey() returned different keys for the same record",
+        key == recordReader.getCurrentKey());
+    assertTrue("getCurrentValue() returned different values for the same record",
+        value == recordReader.getCurrentValue());
+
+    // Read the second record.
+    assertTrue("Expected to read a second record", recordReader.nextKeyValue());
+    key = recordReader.getCurrentKey();
+    value = recordReader.getCurrentValue();
+
+    assertNotNull("Second record had null key", key);
+    assertNotNull("Second record had null value", value);
+
+    assertEquals("second", key.datum().toString());
+    assertEquals(2, value.datum().intValue());
+
+    assertEquals("Progress should be complete (2 out of 2 records processed)",
+        1.0f, recordReader.getProgress(), 0.0f);
+
+    // There should be no more records.
+    assertFalse("Expected only 2 records", recordReader.nextKeyValue());
+
+    // Close the record reader.
+    recordReader.close();
+
+    // Verify the expected calls on the mocks.
+    verify(inputSplit);
+    verify(context);
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroDatumConverter;
+import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
+
+public class TestAvroKeyValueRecordWriter {
+  @Test
+  public void testWriteRecords() throws IOException {
+    Job job = new Job();
+    AvroJob.setOutputValueSchema(job, TextStats.SCHEMA$);
+    TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+    replay(context);
+
+    AvroDatumConverterFactory factory = new AvroDatumConverterFactory(job.getConfiguration());
+    AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+    AvroValue<TextStats> avroValue = new AvroValue<TextStats>(null);
+    @SuppressWarnings("unchecked")
+    AvroDatumConverter<AvroValue<TextStats>, ?> valueConverter
+        = factory.create((Class<AvroValue<TextStats>>) avroValue.getClass());
+    CodecFactory compressionCodec = CodecFactory.nullCodec();
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    // Use a writer to generate a Avro container file in memory.
+    // Write two records: <'apple', TextStats('apple')> and <'banana', TextStats('banana')>.
+    AvroKeyValueRecordWriter<Text, AvroValue<TextStats>> writer
+        = new AvroKeyValueRecordWriter<Text, AvroValue<TextStats>>(keyConverter, valueConverter,
+            compressionCodec, outputStream);
+    TextStats appleStats = new TextStats();
+    appleStats.name = "apple";
+    writer.write(new Text("apple"), new AvroValue<TextStats>(appleStats));
+    TextStats bananaStats = new TextStats();
+    bananaStats.name = "banana";
+    writer.write(new Text("banana"), new AvroValue<TextStats>(bananaStats));
+    writer.close(context);
+
+    verify(context);
+
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+    Schema readerSchema = AvroKeyValue.getSchema(
+        Schema.create(Schema.Type.STRING), TextStats.SCHEMA$);
+    DatumReader<GenericRecord> datumReader
+        = new SpecificDatumReader<GenericRecord>(readerSchema);
+    DataFileStream<GenericRecord> avroFileReader
+        = new DataFileStream<GenericRecord>(inputStream, datumReader);
+
+    // Verify that the first record was written.
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, TextStats> firstRecord
+        = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+    assertNotNull(firstRecord.get());
+    assertEquals("apple", firstRecord.getKey().toString());
+    assertEquals("apple", firstRecord.getValue().name.toString());
+
+    // Verify that the second record was written;
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, TextStats> secondRecord
+        = new AvroKeyValue<CharSequence, TextStats>(avroFileReader.next());
+    assertNotNull(secondRecord.get());
+    assertEquals("banana", secondRecord.getKey().toString());
+    assertEquals("banana", secondRecord.getValue().name.toString());
+
+    // That's all, folks.
+    assertFalse(avroFileReader.hasNext());
+    avroFileReader.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java?rev=1325903&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java Fri Apr 13 19:03:12 2012
@@ -0,0 +1,190 @@
+/**
+ * Licensed to Odiago, Inc. under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for
+ * additional information regarding copyright ownership.  Odiago, Inc.
+ * licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.hadoop.io.AvroKeyValue;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests that Avro container files of generic records with two fields 'key' and 'value'
+ * can be read by the AvroKeyValueInputFormat.
+ */
+public class TestKeyValueInput {
+  @Rule
+  public TemporaryFolder mTempDir = new TemporaryFolder();
+
+  /**
+   * Creates an Avro file of <docid, text> pairs to use for test input:
+   *
+   * +-----+-----------------------+
+   * | KEY | VALUE                 |
+   * +-----+-----------------------+
+   * | 1   | "apple banana carrot" |
+   * | 2   | "apple banana"        |
+   * | 3   | "apple"               |
+   * +-----+-----------------------+
+   *
+   * @return The avro file.
+   */
+  private File createInputFile() throws IOException {
+    Schema keyValueSchema = AvroKeyValue.getSchema(
+        Schema.create(Schema.Type.INT), Schema.create(Schema.Type.STRING));
+
+    AvroKeyValue<Integer, CharSequence> record1
+        = new AvroKeyValue<Integer, CharSequence>(new GenericData.Record(keyValueSchema));
+    record1.setKey(1);
+    record1.setValue("apple banana carrot");
+
+    AvroKeyValue<Integer, CharSequence> record2
+        = new AvroKeyValue<Integer, CharSequence>(new GenericData.Record(keyValueSchema));
+    record2.setKey(2);
+    record2.setValue("apple banana");
+
+    AvroKeyValue<Integer, CharSequence> record3
+        = new AvroKeyValue<Integer, CharSequence>(new GenericData.Record(keyValueSchema));
+    record3.setKey(3);
+    record3.setValue("apple");
+
+    return AvroFiles.createFile(new File(mTempDir.getRoot(), "inputKeyValues.avro"),
+        keyValueSchema, record1.get(), record2.get(), record3.get());
+  }
+
+  /** A mapper for indexing documents. */
+  public static class IndexMapper
+      extends Mapper<AvroKey<Integer>, AvroValue<CharSequence>, Text, IntWritable> {
+    @Override
+    protected void map(AvroKey<Integer> docid, AvroValue<CharSequence> body, Context context)
+        throws IOException, InterruptedException {
+      for (String token : body.datum().toString().split(" ")) {
+        context.write(new Text(token), new IntWritable(docid.datum()));
+      }
+    }
+  }
+
+  /** A reducer for aggregating token to docid mapping into a hitlist. */
+  public static class IndexReducer
+      extends Reducer<Text, IntWritable, Text, AvroValue<List<Integer>>> {
+    @Override
+    protected void reduce(Text token, Iterable<IntWritable> docids, Context context)
+        throws IOException, InterruptedException {
+      List<Integer> hitlist = new ArrayList<Integer>();
+      for (IntWritable docid : docids) {
+        hitlist.add(docid.get());
+      }
+      context.write(token, new AvroValue<List<Integer>>(hitlist));
+    }
+  }
+
+  @Test
+  public void testKeyValueInput()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    // Create a test input file.
+    File inputFile = createInputFile();
+
+    // Configure the job input.
+    Job job = new Job();
+    FileInputFormat.setInputPaths(job, new Path(inputFile.getAbsolutePath()));
+    job.setInputFormatClass(AvroKeyValueInputFormat.class);
+    AvroJob.setInputKeySchema(job, Schema.create(Schema.Type.INT));
+    AvroJob.setInputValueSchema(job, Schema.create(Schema.Type.STRING));
+
+    // Configure a mapper.
+    job.setMapperClass(IndexMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    // Configure a reducer.
+    job.setReducerClass(IndexReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(AvroValue.class);
+    AvroJob.setOutputValueSchema(job, Schema.createArray(Schema.create(Schema.Type.INT)));
+
+    // Configure the output format.
+    job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
+    Path outputPath = new Path(mTempDir.getRoot().getPath(), "out-index");
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // Run the job.
+    assertTrue(job.waitForCompletion(true));
+
+    // Verify that the output Avro container file as the expected data.
+    File avroFile = new File(outputPath.toString(), "part-r-00000.avro");
+    DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(
+        AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING),
+            Schema.createArray(Schema.create(Schema.Type.INT))));
+    DataFileReader<GenericRecord> avroFileReader
+        = new DataFileReader<GenericRecord>(avroFile, datumReader);
+    assertTrue(avroFileReader.hasNext());
+
+    AvroKeyValue<CharSequence, List<Integer>> appleRecord
+        = new AvroKeyValue<CharSequence, List<Integer>>(avroFileReader.next());
+    assertNotNull(appleRecord.get());
+    assertEquals("apple", appleRecord.getKey().toString());
+    List<Integer> appleDocs = appleRecord.getValue();
+    assertEquals(3, appleDocs.size());
+    assertTrue(appleDocs.contains(1));
+    assertTrue(appleDocs.contains(2));
+    assertTrue(appleDocs.contains(3));
+
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, List<Integer>> bananaRecord
+        = new AvroKeyValue<CharSequence, List<Integer>>(avroFileReader.next());
+    assertNotNull(bananaRecord.get());
+    assertEquals("banana", bananaRecord.getKey().toString());
+    List<Integer> bananaDocs = bananaRecord.getValue();
+    assertEquals(2, bananaDocs.size());
+    assertTrue(bananaDocs.contains(1));
+    assertTrue(bananaDocs.contains(2));
+
+    assertTrue(avroFileReader.hasNext());
+    AvroKeyValue<CharSequence, List<Integer>> carrotRecord
+        = new AvroKeyValue<CharSequence, List<Integer>>(avroFileReader.next());
+    assertEquals("carrot", carrotRecord.getKey().toString());
+    List<Integer> carrotDocs = carrotRecord.getValue();
+    assertEquals(1, carrotDocs.size());
+    assertTrue(carrotDocs.contains(1));
+
+    assertFalse(avroFileReader.hasNext());
+    avroFileReader.close();
+  }
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestKeyValueInput.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message