orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [1/2] orc git commit: ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)
Date Thu, 02 Jun 2016 03:59:04 GMT
Repository: orc
Updated Branches:
  refs/heads/master 545fe3712 -> 3bb5ce532


http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
new file mode 100644
index 0000000..f686e05
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.orc.mapred.OrcStruct;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapreduce API.
+ * It is in the org.apache.orc.mapred package to share implementation with
+ * the mapred API record reader.
+ * @param <V> the root type of the file
+ */
+public class OrcMapreduceRecordReader<V extends WritableComparable>
+    extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, V> {
+  private final TypeDescription schema;
+  private final RecordReader batchReader;
+  private final VectorizedRowBatch batch;
+  private int rowInBatch;
+  private final V row;
+
+  public OrcMapreduceRecordReader(Reader fileReader,
+                                  Reader.Options options) throws IOException {
+    this.batchReader = fileReader.rows(options);
+    if (options.getSchema() == null) {
+      schema = fileReader.getSchema();
+    } else {
+      schema = options.getSchema();
+    }
+    this.batch = schema.createRowBatch();
+    rowInBatch = 0;
+    this.row = (V) OrcStruct.createValue(schema);
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
+   */
+  boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return batchReader.nextBatch(batch);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    batchReader.close();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+                         TaskAttemptContext taskAttemptContext) {
+    // nothing required
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!ensureBatch()) {
+      return false;
+    }
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      OrcStruct result = (OrcStruct) row;
+      List<TypeDescription> children = schema.getChildren();
+      int numberOfChildren = children.size();
+      for(int i=0; i < numberOfChildren; ++i) {
+        result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowInBatch,
+            children.get(i), result.getFieldValue(i)));
+      }
+    } else {
+      OrcMapredRecordReader.nextValue(batch.cols[0], rowInBatch, schema, row);
+    }
+    rowInBatch += 1;
+    return true;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return batchReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
new file mode 100644
index 0000000..9379584
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcMapredRecordWriter;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+
+import java.io.IOException;
+
+public class OrcMapreduceRecordWriter<V extends Writable>
+    extends RecordWriter<NullWritable, V> {
+
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
+  private final TypeDescription schema;
+  private final boolean isTopStruct;
+
+  public OrcMapreduceRecordWriter(Writer writer) {
+    this.writer = writer;
+    schema = writer.getSchema();
+    this.batch = schema.createRowBatch();
+    isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+  }
+
+  @Override
+  public void write(NullWritable nullWritable, V v) throws IOException {
+    // if the batch is full, write it out.
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+
+    // add the new row
+    int row = batch.size++;
+    // skip over the OrcKey or OrcValue
+    if (v instanceof OrcKey) {
+      v = (V)((OrcKey) v).key;
+    } else if (v instanceof OrcValue) {
+      v = (V)((OrcValue) v).value;
+    }
+    if (isTopStruct) {
+      for(int f=0; f < schema.getChildren().size(); ++f) {
+        OrcMapredRecordWriter.setColumn(schema.getChildren().get(f),
+            batch.cols[f], row, ((OrcStruct) v).getFieldValue(f));
+      }
+    } else {
+      OrcMapredRecordWriter.setColumn(schema, batch.cols[0], row, v);
+    }
+  }
+
+  @Override
+  public void close(TaskAttemptContext taskAttemptContext) throws IOException {
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
new file mode 100644
index 0000000..797998c
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+/**
+ * An ORC output format that satisfies the org.apache.hadoop.mapreduce API.
+ */
+public class OrcOutputFormat<V extends Writable>
+    extends FileOutputFormat<NullWritable, V> {
+  private static final String EXTENSION = ".orc";
+  // This is useful for unit tests or local runs where you don't need the
+  // output committer.
+  public static final String SKIP_TEMP_DIRECTORY =
+      "orc.mapreduce.output.skip-temporary-directory";
+
+  @Override
+  public RecordWriter<NullWritable, V>
+       getRecordWriter(TaskAttemptContext taskAttemptContext
+                       ) throws IOException {
+    Configuration conf = taskAttemptContext.getConfiguration();
+    Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION);
+    Writer writer = OrcFile.createWriter(filename,
+        org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
+     return new OrcMapreduceRecordWriter<V>(writer);
+  }
+
+  @Override
+  public Path getDefaultWorkFile(TaskAttemptContext context,
+                                 String extension) throws IOException {
+    if (context.getConfiguration().getBoolean(SKIP_TEMP_DIRECTORY, false)) {
+      return new Path(getOutputPath(context),
+          getUniqueFile(context, getOutputName(context), extension));
+    } else {
+      return super.getDefaultWorkFile(context, extension);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
new file mode 100644
index 0000000..cd11603
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mrunit.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+  JobConf conf = new JobConf();
+
+  /**
+   * Split the input struct into its two parts.
+   */
+  public static class MyMapper
+      implements Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+    private OrcKey keyWrapper = new OrcKey();
+    private OrcValue valueWrapper = new OrcValue();
+
+    @Override
+    public void map(NullWritable key, OrcStruct value,
+                    OutputCollector<OrcKey, OrcValue> outputCollector,
+                    Reporter reporter) throws IOException {
+      keyWrapper.key = value.getFieldValue(0);
+      valueWrapper.value = value.getFieldValue(1);
+      outputCollector.collect(keyWrapper, valueWrapper);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // PASS
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      // PASS
+    }
+  }
+
+  /**
+   * Glue the key and values back together.
+   */
+  public static class MyReducer
+      implements Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+    private OrcStruct output = new OrcStruct(TypeDescription.fromString
+        ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+    private final NullWritable nada = NullWritable.get();
+
+    @Override
+    public void reduce(OrcKey key, Iterator<OrcValue> iterator,
+                       OutputCollector<NullWritable, OrcStruct> collector,
+                       Reporter reporter) throws IOException {
+      output.setFieldValue(0, key.key);
+      while (iterator.hasNext()) {
+        OrcValue value = iterator.next();
+        output.setFieldValue(1, value.value);
+        collector.collect(nada, output);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // PASS
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      // PASS
+    }
+  }
+
+  /**
+   * This class is intended to support MRUnit's object copying for input and
+   * output objects.
+   *
+   * Real mapreduce contexts should NEVER use this class.
+   *
+   * The type string is serialized before each value.
+   */
+  public static class OrcStructSerialization
+      implements Serialization<OrcStruct> {
+
+    @Override
+    public boolean accept(Class<?> cls) {
+      return OrcStruct.class.isAssignableFrom(cls);
+    }
+
+    @Override
+    public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+      return new Serializer<OrcStruct>() {
+        DataOutputStream dataOut;
+
+        public void open(OutputStream out) {
+          if(out instanceof DataOutputStream) {
+            dataOut = (DataOutputStream)out;
+          } else {
+            dataOut = new DataOutputStream(out);
+          }
+        }
+
+        public void serialize(OrcStruct w) throws IOException {
+          Text.writeString(dataOut, w.getSchema().toString());
+          w.write(dataOut);
+        }
+
+        public void close() throws IOException {
+          dataOut.close();
+        }
+      };
+    }
+
+    @Override
+    public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+      return new Deserializer<OrcStruct>() {
+        DataInputStream input;
+
+        @Override
+        public void open(InputStream inputStream) throws IOException {
+          if(inputStream instanceof DataInputStream) {
+            input = (DataInputStream)inputStream;
+          } else {
+            input = new DataInputStream(inputStream);
+          }
+        }
+
+        @Override
+        public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+          String typeStr = Text.readString(input);
+          OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+          result.readFields(input);
+          return result;
+        }
+
+        @Override
+        public void close() throws IOException {
+          // PASS
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testMapred() throws IOException {
+    conf.set("io.serializations",
+        OrcStructSerialization.class.getName() + "," +
+            WritableSerialization.class.getName());
+    OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+    MyMapper mapper = new MyMapper();
+    mapper.configure(conf);
+    MyReducer reducer = new MyReducer();
+    reducer.configure(conf);
+    MapReduceDriver<NullWritable, OrcStruct,
+                    OrcKey, OrcValue,
+                    NullWritable, OrcStruct> driver =
+        new MapReduceDriver<>(mapper, reducer);
+    driver.setConfiguration(conf);
+    NullWritable nada = NullWritable.get();
+    OrcStruct input = (OrcStruct) OrcStruct.createValue(
+        TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+    IntWritable x =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+    IntWritable y =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+    Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+    // generate the input stream
+    for(int r=0; r < 20; ++r) {
+      x.set(100 -  (r / 4));
+      y.set(r*2);
+      z.set(Integer.toHexString(r));
+      driver.withInput(nada, input);
+    }
+
+    // generate the expected outputs
+    for(int g=4; g >= 0; --g) {
+      x.set(100 - g);
+      for(int i=0; i < 4; ++i) {
+        int r = g * 4 + i;
+        y.set(r * 2);
+        z.set(Integer.toHexString(r));
+        driver.withOutput(nada, input);
+      }
+    }
+    driver.runTest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
new file mode 100644
index 0000000..a915ed3
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOrcOutputFormat {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  JobConf conf = new JobConf();
+  FileSystem fs;
+
+  {
+    try {
+      fs =  FileSystem.getLocal(conf).getRaw();
+      fs.delete(workDir, true);
+      fs.mkdirs(workDir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+  }
+
+  static class NullOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // PASS
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) {
+      // PASS
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      // PASS
+    }
+  }
+
+  @Test
+  public void testAllTypes() throws Exception {
+    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
+        "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
+        "l:array<bigint>,map:map<smallint,string>," +
+        "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    FileOutputFormat.setOutputPath(conf, workDir);
+    TypeDescription type = TypeDescription.fromString(typeStr);
+
+    // build a row object
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
+    ((BooleanWritable) row.getFieldValue(1)).set(true);
+    ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
+    ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+    ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
+        (format.parse("2016-04-01").getTime()));
+    ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
+    ((DoubleWritable) row.getFieldValue(6)).set(1.5);
+    ((FloatWritable) row.getFieldValue(7)).set(4.5f);
+    ((IntWritable) row.getFieldValue(8)).set(31415);
+    OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
+    longList.add(new LongWritable(123));
+    longList.add(new LongWritable(456));
+    OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
+    map.put(new ShortWritable((short) 1000), new Text("aaaa"));
+    map.put(new ShortWritable((short) 123), new Text("bbbb"));
+    OrcStruct struct = (OrcStruct) row.getFieldValue(11);
+    OrcUnion union = (OrcUnion) struct.getFieldValue(0);
+    union.set((byte) 1, new Text("abcde"));
+    ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
+    NullWritable nada = NullWritable.get();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
+            Reporter.NULL);
+    for(int r=0; r < 10; ++r) {
+      row.setFieldValue(8, new IntWritable(r * 10));
+      writer.write(nada, row);
+    }
+    union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+    for(int r=0; r < 10; ++r) {
+      row.setFieldValue(8, new IntWritable(r * 10 + 100));
+      writer.write(nada, row);
+    }
+    OrcStruct row2 = new OrcStruct(type);
+    writer.write(nada, row2);
+    row.setFieldValue(8, new IntWritable(210));
+    writer.write(nada, row);
+    writer.close(Reporter.NULL);
+
+    FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
+        new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
+            Reporter.NULL);
+    nada = reader.createKey();
+    row = reader.createValue();
+    for(int r=0; r < 22; ++r) {
+      assertEquals(true, reader.next(nada, row));
+      if (r == 20) {
+        for(int c=0; c < 12; ++c) {
+          assertEquals(null, row.getFieldValue(c));
+        }
+      } else {
+        assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
+        assertEquals(new BooleanWritable(true), row.getFieldValue(1));
+        assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
+        assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
+        assertEquals(new DateWritable(DateWritable.millisToDays
+            (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
+        assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
+        assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
+        assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
+        assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
+        assertEquals(longList, row.getFieldValue(9));
+        assertEquals(map, row.getFieldValue(10));
+        if (r < 10) {
+          union.set((byte) 1, new Text("abcde"));
+        } else {
+          union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+        }
+        assertEquals("row " + r, struct, row.getFieldValue(11));
+        assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
+            row.getFieldValue(12));
+      }
+    }
+    assertEquals(false, reader.next(nada, row));
+  }
+
+  /**
+   * Test the case where the top level isn't a struct, but a long.
+   */
+  @Test
+  public void testLongRoot() throws Exception {
+    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
+    conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
+    final String typeStr = "bigint";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    FileOutputFormat.setOutputPath(conf, workDir);
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    LongWritable value = new LongWritable();
+    NullWritable nada = NullWritable.get();
+    RecordWriter<NullWritable, LongWritable> writer =
+        new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
+            "long.orc", Reporter.NULL);
+    for(long lo=0; lo < 2000; ++lo) {
+      value.set(lo);
+      writer.write(nada, value);
+    }
+    writer.close(Reporter.NULL);
+
+    Path path = new Path(workDir, "long.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(1000, file.getRowIndexStride());
+    assertEquals(64 * 1024, file.getCompressionSize());
+    assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
+    FileSplit split = new FileSplit(path, 0, 100000,
+        new String[0]);
+    RecordReader<NullWritable, LongWritable> reader =
+        new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
+            Reporter.NULL);
+    nada = reader.createKey();
+    value = reader.createValue();
+    for(long lo=0; lo < 2000; ++lo) {
+      assertEquals(true, reader.next(nada, value));
+      assertEquals(lo, value.get());
+    }
+    assertEquals(false, reader.next(nada, value));
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcKey
+   * @throws Exception
+   */
+  @Test
+  public void testOrcKey() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+    String TYPE_STRING = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcKey key = new OrcKey(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(fs, conf, "key.orc",
+            Reporter.NULL);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 2000; ++r) {
+      ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+          new Text(Integer.toString(r)));
+      writer.write(nada, key);
+    }
+    writer.close(Reporter.NULL);
+    Path path = new Path(workDir, "key.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcValue
+   * @throws Exception
+   */
+  @Test
+  public void testOrcValue() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+    String TYPE_STRING = "struct<i:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcValue value = new OrcValue(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(fs, conf, "value.orc",
+            Reporter.NULL);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+      writer.write(nada, value);
+    }
+    writer.close(Reporter.NULL);
+    Path path = new Path(workDir, "value.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(3000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
index d32ce94..82699ed 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
@@ -76,6 +76,11 @@ public class TestOrcStruct {
     assertEquals(new IntWritable(42), struct.getFieldValue("i"));
     assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1));
     assertEquals(new Text("Moria"), struct.getFieldValue("k"));
+    struct.setAllFields(new IntWritable(123), new DoubleWritable(4.5),
+        new Text("ok"));
+    assertEquals("123", struct.getFieldValue(0).toString());
+    assertEquals("4.5", struct.getFieldValue(1).toString());
+    assertEquals("ok", struct.getFieldValue(2).toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
deleted file mode 100644
index ce5523f..0000000
--- a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.mapred.other;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.mapred.OrcList;
-import org.apache.orc.mapred.OrcMap;
-import org.apache.orc.mapred.OrcOutputFormat;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcTimestamp;
-import org.apache.orc.mapred.OrcUnion;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestOrcOutputFormat {
-
-  Path workDir = new Path(System.getProperty("test.tmp.dir",
-      "target" + File.separator + "test" + File.separator + "tmp"));
-  JobConf conf = new JobConf();
-  FileSystem fs;
-
-  {
-    try {
-      fs =  FileSystem.getLocal(conf).getRaw();
-      fs.delete(workDir, true);
-      fs.mkdirs(workDir);
-    } catch (IOException e) {
-      throw new IllegalStateException("bad fs init", e);
-    }
-  }
-
-  static class NullOutputCommitter extends OutputCommitter {
-
-    @Override
-    public void setupJob(JobContext jobContext) {
-      // PASS
-    }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskAttemptContext) {
-
-    }
-
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
-      return false;
-    }
-
-    @Override
-    public void commitTask(TaskAttemptContext taskAttemptContext) {
-      // PASS
-    }
-
-    @Override
-    public void abortTask(TaskAttemptContext taskAttemptContext) {
-      // PASS
-    }
-  }
-
-  @Test
-  public void testAllTypes() throws Exception {
-    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
-    conf.setOutputCommitter(NullOutputCommitter.class);
-    final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
-        "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
-        "l:array<bigint>,map:map<smallint,string>," +
-        "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
-    conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
-    FileOutputFormat.setOutputPath(conf, workDir);
-    TypeDescription type = TypeDescription.fromString(typeStr);
-
-    // build a row object
-    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
-    ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
-    ((BooleanWritable) row.getFieldValue(1)).set(true);
-    ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
-    ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
-    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-    ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
-        (format.parse("2016-04-01").getTime()));
-    ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
-    ((DoubleWritable) row.getFieldValue(6)).set(1.5);
-    ((FloatWritable) row.getFieldValue(7)).set(4.5f);
-    ((IntWritable) row.getFieldValue(8)).set(31415);
-    OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
-    longList.add(new LongWritable(123));
-    longList.add(new LongWritable(456));
-    OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
-    map.put(new ShortWritable((short) 1000), new Text("aaaa"));
-    map.put(new ShortWritable((short) 123), new Text("bbbb"));
-    OrcStruct struct = (OrcStruct) row.getFieldValue(11);
-    OrcUnion union = (OrcUnion) struct.getFieldValue(0);
-    union.set((byte) 1, new Text("abcde"));
-    ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
-    NullWritable nada = NullWritable.get();
-    RecordWriter<NullWritable, OrcStruct> writer =
-        new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
-            Reporter.NULL);
-    for(int r=0; r < 10; ++r) {
-      row.setFieldValue(8, new IntWritable(r * 10));
-      writer.write(nada, row);
-    }
-    union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
-    for(int r=0; r < 10; ++r) {
-      row.setFieldValue(8, new IntWritable(r * 10 + 100));
-      writer.write(nada, row);
-    }
-    OrcStruct row2 = new OrcStruct(type);
-    writer.write(nada, row2);
-    row.setFieldValue(8, new IntWritable(210));
-    writer.write(nada, row);
-    writer.close(Reporter.NULL);
-
-    FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
-        new String[0]);
-    RecordReader<NullWritable, OrcStruct> reader =
-        new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
-            Reporter.NULL);
-    nada = reader.createKey();
-    row = reader.createValue();
-    for(int r=0; r < 22; ++r) {
-      assertEquals(true, reader.next(nada, row));
-      if (r == 20) {
-        for(int c=0; c < 12; ++c) {
-          assertEquals(null, row.getFieldValue(c));
-        }
-      } else {
-        assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
-        assertEquals(new BooleanWritable(true), row.getFieldValue(1));
-        assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
-        assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
-        assertEquals(new DateWritable(DateWritable.millisToDays
-            (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
-        assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
-        assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
-        assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
-        assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
-        assertEquals(longList, row.getFieldValue(9));
-        assertEquals(map, row.getFieldValue(10));
-        if (r < 10) {
-          union.set((byte) 1, new Text("abcde"));
-        } else {
-          union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
-        }
-        assertEquals("row " + r, struct, row.getFieldValue(11));
-        assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
-            row.getFieldValue(12));
-      }
-    }
-    assertEquals(false, reader.next(nada, row));
-  }
-
-  /**
-   * Test the case where the top level isn't a struct, but a long.
-   */
-  @Test
-  public void testLongRoot() throws Exception {
-    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
-    conf.setOutputCommitter(NullOutputCommitter.class);
-    conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
-    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
-    conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
-    conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
-    final String typeStr = "bigint";
-    conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
-    FileOutputFormat.setOutputPath(conf, workDir);
-    TypeDescription type = TypeDescription.fromString(typeStr);
-    LongWritable value = new LongWritable();
-    NullWritable nada = NullWritable.get();
-    RecordWriter<NullWritable, LongWritable> writer =
-        new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
-            "long.orc", Reporter.NULL);
-    for(long lo=0; lo < 2000; ++lo) {
-      value.set(lo);
-      writer.write(nada, value);
-    }
-    writer.close(Reporter.NULL);
-
-    Path path = new Path(workDir, "long.orc");
-    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
-    assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
-    assertEquals(2000, file.getNumberOfRows());
-    assertEquals(1000, file.getRowIndexStride());
-    assertEquals(64 * 1024, file.getCompressionSize());
-    assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
-    FileSplit split = new FileSplit(path, 0, 100000,
-        new String[0]);
-    RecordReader<NullWritable, LongWritable> reader =
-        new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
-            Reporter.NULL);
-    nada = reader.createKey();
-    value = reader.createValue();
-    for(long lo=0; lo < 2000; ++lo) {
-      assertEquals(true, reader.next(nada, value));
-      assertEquals(lo, value.get());
-    }
-    assertEquals(false, reader.next(nada, value));
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
new file mode 100644
index 0000000..27543c1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMapreduceOrcOutputFormat {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  JobConf conf = new JobConf();
+  FileSystem fs;
+
+  {
+    try {
+      fs =  FileSystem.getLocal(conf).getRaw();
+      fs.delete(workDir, true);
+      fs.mkdirs(workDir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+  }
+
+  @Test
+  public void testPredicatePushdown() throws Exception {
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    final String typeStr = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    OutputFormat<NullWritable, OrcStruct> outputFormat =
+        new OrcOutputFormat<OrcStruct>();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        outputFormat.getRecordWriter(attemptContext);
+
+    // write 4000 rows with the integer and the binary string
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 4000; ++r) {
+      row.setFieldValue(0, new IntWritable(r));
+      row.setFieldValue(1, new Text(Integer.toBinaryString(r)));
+      writer.write(nada, row);
+    }
+    writer.close(attemptContext);
+
+    OrcInputFormat.setSearchArgument(conf,
+        SearchArgumentFactory.newBuilder()
+            .between("i", PredicateLeaf.Type.LONG, new Long(1500), new Long(1999))
+            .build(), new String[]{null, "i", "s"});
+    FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+        0, 1000000, new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().createRecordReader(split,
+            attemptContext);
+    // the sarg should cause it to skip over the rows except 1000 to 2000
+    for(int r=1000; r < 2000; ++r) {
+      assertEquals(true, reader.nextKeyValue());
+      row = reader.getCurrentValue();
+      assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(Integer.toBinaryString(r), row.getFieldValue(1).toString());
+    }
+    assertEquals(false, reader.nextKeyValue());
+  }
+
+  @Test
+  public void testColumnSelection() throws Exception {
+    String typeStr = "struct<i:int,j:int,k:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    OutputFormat<NullWritable, OrcStruct> outputFormat =
+        new OrcOutputFormat<OrcStruct>();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        outputFormat.getRecordWriter(attemptContext);
+
+    // write 4000 rows with the integer and the binary string
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      row.setFieldValue(0, new IntWritable(r));
+      row.setFieldValue(1, new IntWritable(r * 2));
+      row.setFieldValue(2, new IntWritable(r * 3));
+      writer.write(nada, row);
+    }
+    writer.close(attemptContext);
+
+    conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute(), "0,2");
+    FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+        0, 1000000, new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().createRecordReader(split,
+            attemptContext);
+    // the sarg should cause it to skip over the rows except 1000 to 2000
+    for(int r=0; r < 3000; ++r) {
+      assertEquals(true, reader.nextKeyValue());
+      row = reader.getCurrentValue();
+      assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(null, row.getFieldValue(1));
+      assertEquals(r * 3, ((IntWritable) row.getFieldValue(2)).get());
+    }
+    assertEquals(false, reader.nextKeyValue());
+  }
+
+
+  /**
+   * Make sure that the writer ignores the OrcKey
+   * @throws Exception
+   */
+  @Test
+  public void testOrcKey() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    String TYPE_STRING = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcKey key = new OrcKey(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(attemptContext);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 2000; ++r) {
+      ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+          new Text(Integer.toString(r)));
+      writer.write(nada, key);
+    }
+    writer.close(attemptContext);
+    Path path = new Path(workDir, "part-m-00000.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcValue
+   * @throws Exception
+   */
+  @Test
+  public void testOrcValue() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    String TYPE_STRING = "struct<i:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcValue value = new OrcValue(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(attemptContext);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+      writer.write(nada, value);
+    }
+    writer.close(attemptContext);
+    Path path = new Path(workDir, "part-m-00000.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(3000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
new file mode 100644
index 0000000..01208e1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+  JobConf conf = new JobConf();
+
+  /**
+   * Split the input struct into its two parts.
+   */
+  public static class MyMapper
+      extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+    private OrcKey keyWrapper = new OrcKey();
+    private OrcValue valueWrapper = new OrcValue();
+
+    @Override
+    protected void map(NullWritable key,
+                       OrcStruct value,
+                       Context context
+                       ) throws IOException, InterruptedException {
+      keyWrapper.key = value.getFieldValue(0);
+      valueWrapper.value = value.getFieldValue(1);
+      context.write(keyWrapper, valueWrapper);
+    }
+  }
+
+  /**
+   * Glue the key and values back together.
+   */
+  public static class MyReducer
+      extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+    private OrcStruct output = new OrcStruct(TypeDescription.fromString
+        ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+    private final NullWritable nada = NullWritable.get();
+
+    @Override
+    protected void reduce(OrcKey key,
+                          Iterable<OrcValue> values,
+                          Context context
+                          ) throws IOException, InterruptedException {
+      output.setFieldValue(0, key.key);
+      for(OrcValue value: values) {
+        output.setFieldValue(1, value.value);
+        context.write(nada, output);
+      }
+    }
+  }
+
+  /**
+   * This class is intended to support MRUnit's object copying for input and
+   * output objects.
+   *
+   * Real mapreduce contexts should NEVER use this class.
+   *
+   * The type string is serialized before each value.
+   */
+  public static class OrcStructSerialization
+      implements Serialization<OrcStruct> {
+
+    @Override
+    public boolean accept(Class<?> cls) {
+      return OrcStruct.class.isAssignableFrom(cls);
+    }
+
+    @Override
+    public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+      return new Serializer<OrcStruct>() {
+        DataOutputStream dataOut;
+
+        public void open(OutputStream out) {
+          if(out instanceof DataOutputStream) {
+            dataOut = (DataOutputStream)out;
+          } else {
+            dataOut = new DataOutputStream(out);
+          }
+        }
+
+        public void serialize(OrcStruct w) throws IOException {
+          Text.writeString(dataOut, w.getSchema().toString());
+          w.write(dataOut);
+        }
+
+        public void close() throws IOException {
+          dataOut.close();
+        }
+      };
+    }
+
+    @Override
+    public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+      return new Deserializer<OrcStruct>() {
+        DataInputStream input;
+
+        @Override
+        public void open(InputStream inputStream) throws IOException {
+          if(inputStream instanceof DataInputStream) {
+            input = (DataInputStream)inputStream;
+          } else {
+            input = new DataInputStream(inputStream);
+          }
+        }
+
+        @Override
+        public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+          String typeStr = Text.readString(input);
+          OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+          result.readFields(input);
+          return result;
+        }
+
+        @Override
+        public void close() throws IOException {
+          // PASS
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testMapred() throws IOException {
+    conf.set("io.serializations",
+        OrcStructSerialization.class.getName() + "," +
+            WritableSerialization.class.getName());
+    OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+    MyMapper mapper = new MyMapper();
+    MyReducer reducer = new MyReducer();
+    MapReduceDriver<NullWritable, OrcStruct,
+                    OrcKey, OrcValue,
+                    NullWritable, OrcStruct> driver =
+        new MapReduceDriver<>(mapper, reducer);
+    driver.setConfiguration(conf);
+    NullWritable nada = NullWritable.get();
+    OrcStruct input = (OrcStruct) OrcStruct.createValue(
+        TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+    IntWritable x =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+    IntWritable y =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+    Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+    // generate the input stream
+    for(int r=0; r < 20; ++r) {
+      x.set(100 -  (r / 4));
+      y.set(r*2);
+      z.set(Integer.toHexString(r));
+      driver.withInput(nada, input);
+    }
+
+    // generate the expected outputs
+    for(int g=4; g >= 0; --g) {
+      x.set(100 - g);
+      for(int i=0; i < 4; ++i) {
+        int r = g * 4 + i;
+        y.set(r * 2);
+        z.set(Integer.toHexString(r));
+        driver.withOutput(nada, input);
+      }
+    }
+    driver.runTest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 9941dee..2eacd7a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,6 +47,7 @@
     <junit.version>4.11</junit.version>
     <kryo.version>3.0.3</kryo.version>
     <mockito.version>1.9.5</mockito.version>
+    <mrunit.version>1.1.0</mrunit.version>
     <protobuf.version>2.5.0</protobuf.version>
     <slf4j.version>1.7.5</slf4j.version>
     <snappy.version>0.2</snappy.version>


Mime
View raw message