crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-450: Adding crunch-hive module w/ORC file support. Contributed by Zhong Wang.
Date Tue, 12 Aug 2014 16:26:31 GMT
CRUNCH-450: Adding crunch-hive module w/ORC file support. Contributed by
Zhong Wang.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/363c8243
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/363c8243
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/363c8243

Branch: refs/heads/master
Commit: 363c8243b90e5df0394e8edb547f268b2408f250
Parents: dee0fcf
Author: Josh Wills <jwills@apache.org>
Authored: Mon Aug 11 15:31:14 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Aug 12 09:25:06 2014 -0700

----------------------------------------------------------------------
 crunch-hive/pom.xml                             |  75 +++++
 .../crunch/io/orc/OrcFileSourceTargetIT.java    | 161 +++++++++++
 .../crunch/io/orc/OrcCrunchInputFormat.java     |  98 +++++++
 .../crunch/io/orc/OrcCrunchOutputFormat.java    |  68 +++++
 .../crunch/io/orc/OrcFileReaderFactory.java     | 120 ++++++++
 .../org/apache/crunch/io/orc/OrcFileSource.java | 102 +++++++
 .../crunch/io/orc/OrcFileSourceTarget.java      |  48 ++++
 .../org/apache/crunch/io/orc/OrcFileTarget.java |  51 ++++
 .../org/apache/crunch/io/orc/OrcFileWriter.java |  78 ++++++
 .../apache/crunch/io/orc/OrcReadableData.java   |  47 ++++
 .../org/apache/crunch/io/orc/OrcWritable.java   | 124 +++++++++
 .../org/apache/crunch/types/orc/OrcUtils.java   | 262 ++++++++++++++++++
 .../java/org/apache/crunch/types/orc/Orcs.java  | 274 +++++++++++++++++++
 .../crunch/types/orc/TupleObjectInspector.java  | 233 ++++++++++++++++
 .../crunch/io/orc/OrcFileReaderFactoryTest.java |  65 +++++
 .../crunch/io/orc/OrcFileReaderWriterTest.java  |  55 ++++
 .../org/apache/crunch/io/orc/OrcFileTest.java   |  48 ++++
 .../apache/crunch/io/orc/OrcWritableTest.java   | 125 +++++++++
 .../crunch/test/orc/pojos/AddressBook.java      | 141 ++++++++++
 .../apache/crunch/test/orc/pojos/Person.java    | 101 +++++++
 .../org/apache/crunch/types/orc/OrcsTest.java   | 141 ++++++++++
 .../types/orc/TupleObjectInspectorTest.java     |  73 +++++
 pom.xml                                         |   8 +
 23 files changed, 2498 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-hive/pom.xml b/crunch-hive/pom.xml
new file mode 100644
index 0000000..aef85c3
--- /dev/null
+++ b/crunch-hive/pom.xml
@@ -0,0 +1,75 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>crunch-hive</artifactId>
+  <name>Apache Crunch Hive</name>
+  
+  <dependencies>
+  
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-core</artifactId>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+     <dependency>
+      <groupId>commons-httpclient</groupId>
+      <artifactId>commons-httpclient</artifactId>
+      <scope>test</scope> <!-- only needed for LocalJobRunner -->
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+    </dependency>
+    
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
+     </plugins>
+   </build>   
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java b/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java
new file mode 100644
index 0000000..8cf467c
--- /dev/null
+++ b/crunch-hive/src/it/java/org/apache/crunch/io/orc/OrcFileSourceTargetIT.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.orc.OrcFileSource;
+import org.apache.crunch.io.orc.OrcFileTarget;
+import org.apache.crunch.io.orc.OrcFileWriter;
+import org.apache.crunch.test.orc.pojos.Person;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.orc.OrcUtils;
+import org.apache.crunch.types.orc.Orcs;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class OrcFileSourceTargetIT extends OrcFileTest implements Serializable {
+  
+  private void generateInputData() throws IOException {
+    String typeStr = "struct<name:string,age:int,numbers:array<string>>";
+    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    OrcStruct s = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23),
+        Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913")));
+    
+    OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, new Path(tempPath, "input.orc"), Orcs.orcs(typeInfo));
+    writer.write(s);
+    writer.close();
+  }
+  
+  private <T> void testSourceTarget(PType<T> ptype, T expected) {
+    Path inputPath = new Path(tempPath, "input.orc");
+    Path outputPath = new Path(tempPath, "output");
+    
+    Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf);
+    OrcFileSource<T> source = new OrcFileSource<T>(inputPath, ptype);
+    PCollection<T> rows = pipeline.read(source);
+    List<T> result = Lists.newArrayList(rows.materialize());
+    
+    assertEquals(Lists.newArrayList(expected), result);
+    
+    OrcFileTarget target = new OrcFileTarget(outputPath);
+    pipeline.write(rows, target);
+    
+    assertTrue(pipeline.done().succeeded());
+    
+    OrcFileReaderFactory<T> reader = new OrcFileReaderFactory<T>(ptype);
+    List<T> newResult = Lists.newArrayList(reader.read(fs, inputPath));
+    
+    assertEquals(Lists.newArrayList(expected), newResult);
+  }
+  
+  @Test
+  public void testOrcs() throws IOException {
+    generateInputData();
+    
+    String typeStr = "struct<name:string,age:int,numbers:array<string>>";
+    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    OrcStruct expected = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23),
+        Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913")));
+    
+    testSourceTarget(Orcs.orcs(typeInfo), expected);
+  }
+  
+  @Test
+  public void testReflects() throws IOException {
+    generateInputData();
+    Person expected = new Person("Alice", 23, Arrays.asList("919-342-5555", "650-333-2913"));
+    testSourceTarget(Orcs.reflects(Person.class), expected);
+  }
+  
+  @Test
+  public void testTuples() throws IOException {
+    generateInputData();
+    TupleN expected = new TupleN("Alice", 23, Arrays.asList("919-342-5555", "650-333-2913"));
+    testSourceTarget(Orcs.tuples(Writables.strings(), Writables.ints(), Writables.collections(Writables.strings())),
+        expected);
+  }
+  
+  @Test
+  public void testColumnPruning() throws IOException {
+    generateInputData();
+    
+    Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf);
+    int[] readColumns = {0, 1};
+    OrcFileSource<Person> source = new OrcFileSource<Person>(new Path(tempPath, "input.orc"),
+        Orcs.reflects(Person.class), readColumns);
+    PCollection<Person> rows = pipeline.read(source);
+    List<Person> result = Lists.newArrayList(rows.materialize());
+    
+    Person expected = new Person("Alice", 23, null);
+    assertEquals(Lists.newArrayList(expected), result);
+  }
+  
+  @Test
+  public void testGrouping() throws IOException {
+    String typeStr = "struct<name:string,age:int,numbers:array<string>>";
+    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    OrcStruct s1 = OrcUtils.createOrcStruct(typeInfo, new Text("Bob"), new IntWritable(28), null);
+    OrcStruct s2 = OrcUtils.createOrcStruct(typeInfo, new Text("Bob"), new IntWritable(28), null);
+    OrcStruct s3 = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(23),
+        Arrays.asList(new Text("444-333-9999")));
+    OrcStruct s4 = OrcUtils.createOrcStruct(typeInfo, new Text("Alice"), new IntWritable(36),
+        Arrays.asList(new Text("919-342-5555"), new Text("650-333-2913")));
+
+    Path inputPath = new Path(tempPath, "input.orc");
+    OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, inputPath, Orcs.orcs(typeInfo));
+    writer.write(s1);
+    writer.write(s2);
+    writer.write(s3);
+    writer.write(s4);
+    writer.close();
+    
+    Pipeline pipeline = new MRPipeline(OrcFileSourceTargetIT.class, conf);
+    OrcFileSource<Person> source = new OrcFileSource<Person>(inputPath, Orcs.reflects(Person.class));
+    PCollection<Person> rows = pipeline.read(source);
+    PTable<Person, Long> count = rows.count();
+
+    List<Pair<Person, Long>> result = Lists.newArrayList(count.materialize());
+    List<Pair<Person, Long>> expected = Lists.newArrayList(
+        Pair.of(new Person("Alice", 23, Arrays.asList("444-333-9999")), 1L),
+        Pair.of(new Person("Alice", 36, Arrays.asList("919-342-5555", "650-333-2913")), 1L),
+        Pair.of(new Person("Bob", 28, null), 2L));
+    
+    assertEquals(expected, result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java
new file mode 100644
index 0000000..3ffc88b
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchInputFormat.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class OrcCrunchInputFormat extends InputFormat<NullWritable, OrcWritable> {
+
+  private OrcNewInputFormat inputFormat = new OrcNewInputFormat();
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    return inputFormat.getSplits(context);
+  }
+
+  @Override
+  public RecordReader<NullWritable, OrcWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    RecordReader<NullWritable, OrcStruct> reader = inputFormat.createRecordReader(
+        split, context);
+    return new OrcCrunchRecordReader(reader);
+  }
+
+  static class OrcCrunchRecordReader extends RecordReader<NullWritable, OrcWritable> {
+    
+    private final RecordReader<NullWritable, OrcStruct> reader;
+    private OrcWritable value = new OrcWritable();
+
+    OrcCrunchRecordReader(RecordReader<NullWritable, OrcStruct> reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException,
+        InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public OrcWritable getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return reader.getProgress();
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean hasNext = reader.nextKeyValue();
+      if (hasNext) {
+        value.set(reader.getCurrentValue());
+      }
+      return hasNext;
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java
new file mode 100644
index 0000000..2d56503
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcCrunchOutputFormat.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class OrcCrunchOutputFormat extends FileOutputFormat<NullWritable, OrcWritable> {
+  
+  private OrcNewOutputFormat outputFormat = new OrcNewOutputFormat();
+
+  @Override
+  public RecordWriter<NullWritable, OrcWritable> getRecordWriter(
+      TaskAttemptContext job) throws IOException, InterruptedException {
+    RecordWriter<NullWritable, Writable> writer = outputFormat.getRecordWriter(job);
+    return new OrcCrunchRecordWriter(writer);
+  }
+  
+  static class OrcCrunchRecordWriter extends RecordWriter<NullWritable, OrcWritable> {
+    
+    private final RecordWriter<NullWritable, Writable> writer;
+    private final OrcSerde orcSerde;
+    
+    OrcCrunchRecordWriter(RecordWriter<NullWritable, Writable> writer) {
+      this.writer = writer;
+      this.orcSerde = new OrcSerde();
+    }
+
+    @Override
+    public void write(NullWritable key, OrcWritable value) throws IOException,
+        InterruptedException {
+      if (value.get() == null) {
+        throw new NullPointerException("Cannot write null records to orc file");
+      }
+      writer.write(key, orcSerde.serialize(value.get(), value.getObjectInspector()));
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      writer.close(context);
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java
new file mode 100644
index 0000000..abc0ec8
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileReaderFactory.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.util.Iterator;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+public class OrcFileReaderFactory<T> implements FileReaderFactory<T> {
+  
+  private MapFn<Object, T> inputFn;
+  private OrcInputFormat inputFormat = new OrcInputFormat();
+  private int[] readColumns;
+  
+  public OrcFileReaderFactory(PType<T> ptype) {
+    this(ptype, null);
+  }
+  
+  public OrcFileReaderFactory(PType<T> ptype, int[] readColumns) {
+    inputFn = ptype.getInputMapFn();
+    this.readColumns = readColumns;
+  }
+
+  @Override
+  public Iterator<T> read(FileSystem fs, final Path path) {
+    try {
+      if (!fs.isFile(path)) {
+        throw new CrunchRuntimeException("Not a file: " + path);
+      }
+      
+      inputFn.initialize();
+      
+      FileStatus status = fs.getFileStatus(path);
+      FileSplit split = new FileSplit(path, 0, status.getLen(), new String[0]);
+      
+      JobConf conf = new JobConf();
+      if (readColumns != null) {
+        conf.setBoolean(OrcFileSource.HIVE_READ_ALL_COLUMNS, false);
+        conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, OrcFileSource.getColumnIdsStr(readColumns));
+      }
+      final RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(split, conf, Reporter.NULL);
+      
+      return new UnmodifiableIterator<T>() {
+        
+        private boolean checked = false;
+        private boolean hasNext;
+        private OrcStruct value;
+        private OrcWritable writable = new OrcWritable();
+
+        @Override
+        public boolean hasNext() {
+          try {
+            if (value == null) {
+              value = reader.createValue();
+            }
+            if (!checked) {
+              hasNext = reader.next(NullWritable.get(), value);
+              checked = true;
+            }
+            return hasNext;
+          } catch (Exception e) {
+            throw new CrunchRuntimeException("Error while reading local file: " + path, e);
+          }
+        }
+
+        @Override
+        public T next() {
+          try {
+            if (value == null) {
+              value = reader.createValue();
+            }
+            if (!checked) {
+              reader.next(NullWritable.get(), value);
+            }
+            checked = false;
+            writable.set(value);
+            return inputFn.map(writable);
+          } catch (Exception e) {
+            throw new CrunchRuntimeException("Error while reading local file: " + path, e);
+          }
+        }
+        
+      };
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while reading local file: " + path, e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java
new file mode 100644
index 0000000..8689b6c
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSource.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+
+public class OrcFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+  
+  private int[] readColumns;
+    
+  public static final String HIVE_READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
+  
+  private static <S> FormatBundle<OrcCrunchInputFormat> getBundle(int[] readColumns) {
+    FormatBundle<OrcCrunchInputFormat> fb = FormatBundle.forInput(OrcCrunchInputFormat.class);
+    if (readColumns != null) {  // setting configurations for column pruning
+      fb.set(HIVE_READ_ALL_COLUMNS, "false");
+      fb.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, getColumnIdsStr(readColumns));
+    }
+    return fb;
+  }
+  
+  static String getColumnIdsStr(int[] columns) {
+    StringBuilder sb = new StringBuilder();
+    for (int c : columns) {
+      sb.append(c);
+      sb.append(',');
+    }
+    return sb.length() > 0 ? sb.substring(0, sb.length() - 1) : "";
+  }
+  
+  public OrcFileSource(Path path, PType<T> ptype) {
+    this(path, ptype, null);
+  }
+  
+  /**
+   * Constructor for column pruning optimization
+   * 
+   * @param path
+   * @param ptype
+   * @param readColumns columns which will be read
+   */
+  public OrcFileSource(Path path, PType<T> ptype, int[] readColumns) {
+    super(path, ptype, getBundle(readColumns));
+    this.readColumns = readColumns;
+  }
+  
+  public OrcFileSource(List<Path> paths, PType<T> ptype) {
+    this(paths, ptype, null);
+  }
+  
+  /**
+   * Constructor for column pruning optimization
+   * 
+   * @param paths
+   * @param ptype
+   * @param columns columns which will be reserved
+   */
+  public OrcFileSource(List<Path> paths, PType<T> ptype, int[] columns) {
+    super(paths, ptype, getBundle(columns));
+  }
+  
+  @Override
+  public String toString() {
+    return "Orc(" + pathsAsString() + ")";
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return read(conf, new OrcFileReaderFactory<T>(ptype, readColumns));
+  }
+
+  @Override
+  public ReadableData<T> asReadable() {
+    return new OrcReadableData<T>(this.paths, ptype, readColumns);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java
new file mode 100644
index 0000000..0226864
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileSourceTarget.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+
+public class OrcFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T>  {
+  
+  public OrcFileSourceTarget(Path path, PType<T> ptype) {
+    this(path, ptype, SequentialFileNamingScheme.getInstance());
+  }
+  
+  public OrcFileSourceTarget(Path path, PType<T> ptype, FileNamingScheme fileNameScheme) {
+    this(new OrcFileSource<T>(path, ptype), new OrcFileTarget(path), fileNameScheme);
+  }
+
+  public OrcFileSourceTarget(ReadableSource<T> source, PathTarget target,
+      FileNamingScheme fileNamingScheme) {
+    super(source, target, fileNamingScheme);
+  }
+  
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
new file mode 100644
index 0000000..0ad0434
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileTarget.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+
+public class OrcFileTarget extends FileTargetImpl {
+  
+  public OrcFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public OrcFileTarget(Path path) {
+    this(path, SequentialFileNamingScheme.getInstance());
+  }
+  
+  public OrcFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, OrcCrunchOutputFormat.class, fileNamingScheme);
+  }
+  
+  @Override
+  public String toString() {
+    return "Orc(" + path.toString() + ")";
+  }
+  
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    return new OrcFileSourceTarget<T>(path, ptype);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java
new file mode 100644
index 0000000..ebfe1fe
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcFileWriter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A writer class which is corresponding to OrcFileReaderFactory. Mainly used
+ * for test purpose
+ *
+ * @param <T>
+ */
+public class OrcFileWriter<T> implements Closeable {
+  
+  private RecordWriter<NullWritable, Object> writer;
+  private MapFn<T, Object> mapFn;
+  private final OrcSerde serde;
+  
+  static class NullProgress implements Progressable {
+    @Override
+    public void progress() {
+    }
+  }
+  
+  public OrcFileWriter(Configuration conf, Path path, PType<T> pType) throws IOException {
+    JobConf jobConf = new JobConf(conf);
+    OutputFormat outputFormat = new OrcOutputFormat();
+    writer = outputFormat.getRecordWriter(null, jobConf, path.toString(), new NullProgress());
+    
+    mapFn = pType.getOutputMapFn();
+    mapFn.initialize();
+    
+    serde = new OrcSerde();
+  }
+  
+  public void write(T t) throws IOException {
+    OrcWritable ow = (OrcWritable) mapFn.map(t);
+    if (ow.get() == null) {
+      throw new NullPointerException("Cannot write null records to orc file");
+    }
+    writer.write(NullWritable.get(), serde.serialize(ow.get(), ow.getObjectInspector()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close(Reporter.NULL);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java
new file mode 100644
index 0000000..6f342bf
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcReadableData.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.util.List;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.ReadableDataImpl;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+
+public class OrcReadableData<T> extends ReadableDataImpl<T> {
+  
+  private final PType<T> ptype;
+  private final int[] readColumns;
+
+  public OrcReadableData(List<Path> paths, PType<T> ptype) {
+    this(paths, ptype, null);
+  }
+  
+  public OrcReadableData(List<Path> paths, PType<T> ptype, int[] readColumns) {
+    super(paths);
+    this.ptype = ptype;
+    this.readColumns = readColumns;
+  }
+
+  @Override
+  public FileReaderFactory<T> getFileReaderFactory() {
+    return new OrcFileReaderFactory<T>(ptype, readColumns);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
new file mode 100644
index 0000000..883d0f0
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/io/orc/OrcWritable.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.types.orc.OrcUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+public class OrcWritable implements WritableComparable<OrcWritable> {
+
+  private OrcStruct orc;
+  private ObjectInspector oi; // object inspector for orc struct
+
+  private BytesWritable blob; // serialized from orc struct
+  private BinarySortableSerDe serde;
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    serialize();
+    blob.write(out);
+  }
+
+  private void serialize() {
+    try {
+      if (blob == null) {
+        // Make a copy since BinarySortableSerDe will reuse the byte buffer.
+        // This is not very efficient for the current implementation. Shall we
+        // implement a no-reuse version of BinarySortableSerDe?
+        byte[] bytes = ((BytesWritable) serde.serialize(orc, oi)).getBytes();
+        byte[] newBytes = new byte[bytes.length];
+        System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
+        blob = new BytesWritable(newBytes);
+      }
+    } catch (SerDeException e) {
+      throw new CrunchRuntimeException("Unable to serialize object: "
+          + orc);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    blob = new BytesWritable();
+    blob.readFields(in);
+    orc = null; // the orc struct is stale
+  }
+
+  @Override
+  public int compareTo(OrcWritable arg0) {
+    serialize();
+    arg0.serialize();
+    return ((Comparable) blob).compareTo((Comparable) arg0.blob);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    return compareTo((OrcWritable) obj) == 0;
+  }
+
+  public void setSerde(BinarySortableSerDe serde) {
+    this.serde = serde;
+  }
+
+  public void setObjectInspector(ObjectInspector oi) {
+    this.oi = oi;
+  }
+  
+  public ObjectInspector getObjectInspector() {
+    return oi;
+  }
+
+  public void set(OrcStruct orcStruct) {
+    this.orc = orcStruct;
+    blob = null; // the blob is stale
+  }
+  
+  public OrcStruct get() {
+    if (orc == null && blob != null) {
+      makeOrcStruct();
+    }
+    return orc;
+  }
+  
+  private void makeOrcStruct() {
+    try {
+      Object row = serde.deserialize(blob);
+      StructObjectInspector rowOi = (StructObjectInspector) serde.getObjectInspector();
+      orc = (OrcStruct) OrcUtils.convert(row, rowOi, oi);
+    } catch (SerDeException e) {
+      throw new CrunchRuntimeException("Unable to deserialize blob: " + blob);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java
new file mode 100644
index 0000000..8d9c806
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/OrcUtils.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.orc;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableTimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+public class OrcUtils {
+  
+  /**
+   * Generate TypeInfo for a given java class based on reflection
+   * 
+   * @param typeClass
+   * @return
+   */
+  public static TypeInfo getTypeInfo(Class<?> typeClass) {
+    ObjectInspector oi = ObjectInspectorFactory
+        .getReflectionObjectInspector(typeClass, ObjectInspectorOptions.JAVA);
+    return TypeInfoUtils.getTypeInfoFromObjectInspector(oi);
+  }
+  
+  /**
+   * Create an object of OrcStruct given a type string and a list of objects
+   * 
+   * @param typeStr
+   * @param objs
+   * @return
+   */
+  public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
+    SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct
+        .createObjectInspector(typeInfo);
+    List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs();
+    OrcStruct result = (OrcStruct) oi.create();
+    result.setNumFields(fields.size());
+    for (int i = 0; i < fields.size(); i++) {
+      oi.setStructFieldData(result, fields.get(i), objs[i]);
+    }
+    return result;
+  }
+  
+  /**
+   * Create a binary serde for OrcStruct serialization/deserialization
+   * 
+   * @param typeInfo
+   * @return
+   */
+  public static BinarySortableSerDe createBinarySerde(TypeInfo typeInfo){
+    BinarySortableSerDe serde = new BinarySortableSerDe();
+    
+    StringBuffer nameSb = new StringBuffer();
+    StringBuffer typeSb = new StringBuffer();
+
+    StructTypeInfo sti = (StructTypeInfo) typeInfo;
+    for (String name : sti.getAllStructFieldNames()) {
+      nameSb.append(name);
+      nameSb.append(',');
+    }
+    for (TypeInfo info : sti.getAllStructFieldTypeInfos()) {
+      typeSb.append(info.toString());
+      typeSb.append(',');
+    }
+
+    Properties tbl = new Properties();
+    String names = nameSb.length() > 0 ? nameSb.substring(0,
+        nameSb.length() - 1) : "";
+    String types = typeSb.length() > 0 ? typeSb.substring(0,
+        typeSb.length() - 1) : "";
+    tbl.setProperty(serdeConstants.LIST_COLUMNS, names);
+    tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, types);
+    
+    try {
+      serde.initialize(null, tbl);
+    } catch (SerDeException e) {
+      throw new CrunchRuntimeException("Unable to initialize binary serde");
+    }
+    
+    return serde;
+  }
+  
+  /**
+   * Convert an object from / to OrcStruct
+   * 
+   * @param from
+   * @param fromOi
+   * @param toOi
+   * @return
+   */
+  public static Object convert(Object from, ObjectInspector fromOi, ObjectInspector toOi) {
+    if (from == null) {
+      return null;
+    }
+    Object to;
+    switch (fromOi.getCategory()) {
+    case PRIMITIVE:
+      PrimitiveObjectInspector fromPoi = (PrimitiveObjectInspector) fromOi;
+      switch (fromPoi.getPrimitiveCategory()) {
+      case FLOAT:
+        SettableFloatObjectInspector floatOi = (SettableFloatObjectInspector) toOi;
+        return floatOi.create((Float) fromPoi.getPrimitiveJavaObject(from));
+      case DOUBLE:
+        SettableDoubleObjectInspector doubleOi = (SettableDoubleObjectInspector) toOi;
+        return doubleOi.create((Double) fromPoi.getPrimitiveJavaObject(from));
+      case BOOLEAN:
+        SettableBooleanObjectInspector boolOi = (SettableBooleanObjectInspector) toOi;
+        return boolOi.create((Boolean) fromPoi.getPrimitiveJavaObject(from));
+      case INT:
+        SettableIntObjectInspector intOi = (SettableIntObjectInspector) toOi;
+        return intOi.create((Integer) fromPoi.getPrimitiveJavaObject(from));
+      case LONG:
+        SettableLongObjectInspector longOi = (SettableLongObjectInspector) toOi;
+        return longOi.create((Long) fromPoi.getPrimitiveJavaObject(from));
+      case STRING:
+        SettableStringObjectInspector strOi = (SettableStringObjectInspector) toOi;
+        return strOi.create((String) fromPoi.getPrimitiveJavaObject(from));
+      case BYTE:
+        SettableByteObjectInspector byteOi = (SettableByteObjectInspector) toOi;
+        return byteOi.create((Byte) fromPoi.getPrimitiveJavaObject(from));
+      case SHORT:
+        SettableShortObjectInspector shortOi = (SettableShortObjectInspector) toOi;
+        return shortOi.create((Short) fromPoi.getPrimitiveJavaObject(from));
+      case BINARY:
+        SettableBinaryObjectInspector binOi = (SettableBinaryObjectInspector) toOi;
+        return binOi.create((byte[]) fromPoi.getPrimitiveJavaObject(from));
+      case TIMESTAMP:
+        SettableTimestampObjectInspector timeOi = (SettableTimestampObjectInspector) toOi;
+        return timeOi.create((Timestamp) fromPoi.getPrimitiveJavaObject(from));
+      case DATE:
+        SettableDateObjectInspector dateOi = (SettableDateObjectInspector) toOi;
+        return dateOi.create((Date) fromPoi.getPrimitiveJavaObject(from));
+      case DECIMAL:
+        SettableHiveDecimalObjectInspector decimalOi = (SettableHiveDecimalObjectInspector) toOi;
+        return decimalOi.create((HiveDecimal) fromPoi.getPrimitiveJavaObject(from));
+      case CHAR:
+        SettableHiveCharObjectInspector charOi = (SettableHiveCharObjectInspector) toOi;
+        return charOi.create((HiveChar) fromPoi.getPrimitiveJavaObject(from));
+      case VARCHAR:
+        SettableHiveVarcharObjectInspector varcharOi = (SettableHiveVarcharObjectInspector) toOi;
+        return varcharOi.create((HiveVarchar) fromPoi.getPrimitiveJavaObject(from));
+      case VOID:
+        throw new IllegalArgumentException("Void type is not supported yet");
+      default:
+        throw new IllegalArgumentException("Unknown primitive type "
+            + (fromPoi).getPrimitiveCategory());
+      }
+    case STRUCT:
+      StructObjectInspector fromSoi = (StructObjectInspector) fromOi;
+      List<StructField> fromFields = (List<StructField>) fromSoi.getAllStructFieldRefs();
+      List<Object> fromItems = fromSoi.getStructFieldsDataAsList(from);
+      
+      // this is a tuple. use TupleObjectInspector to construct the result
+      if (toOi instanceof TupleObjectInspector) {
+        TupleObjectInspector toToi = (TupleObjectInspector) toOi;
+        List<StructField> toFields = (List<StructField>) toToi.getAllStructFieldRefs();
+        Object[] values = new Object[fromItems.size()];
+        for (int i = 0; i < fromItems.size(); i++) {
+          values[i] = convert(fromItems.get(i),
+            fromFields.get(i).getFieldObjectInspector(),
+            toFields.get(i).getFieldObjectInspector());
+        }
+        return toToi.create(values);
+      }
+      
+      SettableStructObjectInspector toSoi = (SettableStructObjectInspector) toOi;
+      List<StructField> toFields = (List<StructField>) toSoi.getAllStructFieldRefs();
+      to = toSoi.create();
+      for (int i = 0; i < fromItems.size(); i++) {
+        Object converted = convert(fromItems.get(i),
+            fromFields.get(i).getFieldObjectInspector(),
+            toFields.get(i).getFieldObjectInspector());
+        toSoi.setStructFieldData(to, toFields.get(i), converted);
+      }
+      return to;
+    case MAP:
+      MapObjectInspector fromMoi = (MapObjectInspector) fromOi;
+      SettableMapObjectInspector toMoi = (SettableMapObjectInspector) toOi;
+      to = toMoi.create(); // do not reuse
+      for (Map.Entry<?, ?> entry : fromMoi.getMap(from).entrySet()) {
+        Object convertedKey = convert(entry.getKey(),
+            fromMoi.getMapKeyObjectInspector(),
+            toMoi.getMapKeyObjectInspector());
+        Object convertedValue = convert(entry.getValue(),
+            fromMoi.getMapValueObjectInspector(),
+            toMoi.getMapValueObjectInspector());
+        toMoi.put(to, convertedKey, convertedValue);
+      }
+      return to;
+    case LIST:
+      ListObjectInspector fromLoi = (ListObjectInspector) fromOi;
+      List<?> fromList = fromLoi.getList(from);
+      
+      SettableListObjectInspector toLoi = (SettableListObjectInspector) toOi;
+      to = toLoi.create(fromList.size()); // do not reuse
+      for (int i = 0; i < fromList.size(); i++) {
+        Object converted = convert(fromList.get(i),
+            fromLoi.getListElementObjectInspector(),
+            toLoi.getListElementObjectInspector());
+        toLoi.set(to, i, converted);
+      }
+      return to;
+    case UNION:
+      throw new IllegalArgumentException("Union type is not supported yet");
+    default:
+      throw new IllegalArgumentException("Unknown type " + fromOi.getCategory());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java
new file mode 100644
index 0000000..d3611c2
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/Orcs.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.orc;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.io.orc.OrcWritable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * Utilities to create PTypes for ORC serialization / deserialization
+ * 
+ */
+public class Orcs {
+  
+  /**
+   * Create a PType to directly use OrcStruct as the deserialized format. This
+   * is the fastest way for serialization/deserializations. However, users
+   * need to use ObjectInspectors to handle the OrcStruct. Currently, void and
+   * union types are not supported.
+   * 
+   * @param typeInfo
+   * @return
+   */
+  public static final PType<OrcStruct> orcs(TypeInfo typeInfo) {
+    return Writables.derived(OrcStruct.class, new OrcInFn(typeInfo), new OrcOutFn(typeInfo),
+        Writables.writables(OrcWritable.class));
+  }
+  
+  /**
+   * Create a PType which uses reflection to serialize/deserialize java POJOs
+   * to/from ORC. There are some restrictions of the POJO: 1) it must have a
+   * default, no-arg constructor; 2) All of its fields must be Hive primitive
+   * types or collection types that have Hive equivalents; 3) Void and Union
+   * are not supported yet.
+   * 
+   * @param clazz
+   * @return
+   */
+  public static final <T> PType<T> reflects(Class<T> clazz) {
+    TypeInfo reflectInfo = createReflectTypeInfo(clazz);
+    return Writables.derived(clazz, new ReflectInFn<T>(clazz),
+        new ReflectOutFn<T>(clazz), orcs(reflectInfo));
+  }
+  
+  private static TypeInfo createReflectTypeInfo(Class<?> clazz) {
+    ObjectInspector reflectOi = ObjectInspectorFactory
+        .getReflectionObjectInspector(clazz, ObjectInspectorOptions.JAVA);
+    return TypeInfoUtils.getTypeInfoFromObjectInspector(reflectOi);
+  }
+  
+  /**
+   * Create a tuple-based PType. Users can use other Crunch PTypes (such as
+   * Writables.ints(), Orcs.reflects(), Writables.pairs(), ...) to construct
+   * the PType. Currently, nulls and unions are not supported.
+   * 
+   * @param ptypes
+   * @return
+   */
+  public static final PType<TupleN> tuples(PType... ptypes) {
+    TypeInfo tupleInfo = createTupleTypeInfo(ptypes);
+    return derived(TupleN.class, new TupleInFn<TupleN>(TupleFactory.TUPLEN, ptypes),
+        new TupleOutFn<TupleN>(ptypes), orcs(tupleInfo), ptypes);
+  }
+  
+  // derived, but override subtypes
+  static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base, PType[] subTypes) {
+    WritableType<S, ?> wt = (WritableType<S, ?>) base;
+    MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
+    MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
+    return new WritableType(clazz, wt.getSerializationClass(), input, output, subTypes);
+  }
+  
+  private static TypeInfo createTupleTypeInfo(PType... ptypes) {
+    ObjectInspector tupleOi = new TupleObjectInspector(null, ptypes);
+    return TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
+  }
+  
+  private static class OrcInFn extends MapFn<OrcWritable, OrcStruct> {
+    
+    private TypeInfo typeInfo;
+    
+    private transient ObjectInspector oi;
+    private transient BinarySortableSerDe serde;
+
+    public OrcInFn(TypeInfo typeInfo) {
+      this.typeInfo = typeInfo;
+    }
+    
+    @Override
+    public void initialize() {
+      oi = OrcStruct.createObjectInspector(typeInfo);
+      serde = OrcUtils.createBinarySerde(typeInfo);
+    }
+    
+    @Override
+    public OrcStruct map(OrcWritable input) {
+      input.setObjectInspector(oi);
+      input.setSerde(serde);
+      return input.get();
+    }
+    
+  }
+  
+  private static class OrcOutFn extends MapFn<OrcStruct, OrcWritable> {
+    
+    private TypeInfo typeInfo;
+    
+    private transient ObjectInspector oi;
+    private transient BinarySortableSerDe serde;
+    
+    public OrcOutFn(TypeInfo typeInfo) {
+      this.typeInfo = typeInfo;
+    }
+    
+    @Override
+    public void initialize() {
+      oi = OrcStruct.createObjectInspector(typeInfo);
+      serde = OrcUtils.createBinarySerde(typeInfo);
+    }
+
+    @Override
+    public OrcWritable map(OrcStruct input) {
+      OrcWritable output = new OrcWritable();
+      output.setObjectInspector(oi);
+      output.setSerde(serde);
+      output.set(input);
+      return output;
+    }
+    
+  }
+  
+  private static Map<Class<?>, Field[]> fieldsCache = new HashMap<Class<?>, Field[]>();
+  
+  private static class ReflectInFn<T> extends MapFn<OrcStruct, T> {
+    
+    private Class<T> typeClass;
+    private TypeInfo typeInfo;
+    
+    private transient ObjectInspector reflectOi;
+    private transient ObjectInspector orcOi;
+    
+    @Override
+    public void initialize() {
+      reflectOi = ObjectInspectorFactory
+          .getReflectionObjectInspector(typeClass, ObjectInspectorOptions.JAVA);
+      orcOi = OrcStruct.createObjectInspector(typeInfo);
+    }
+    
+    public ReflectInFn(Class<T> typeClass) {
+      this.typeClass = typeClass;
+      typeInfo = createReflectTypeInfo(typeClass);
+    }
+
+    @Override
+    public T map(OrcStruct input) {
+      return (T) OrcUtils.convert(input, orcOi, reflectOi);
+    }
+    
+  }
+  
+  private static class ReflectOutFn<T> extends MapFn<T, OrcStruct> {
+    
+    private Class<T> typeClass;
+    private TypeInfo typeInfo;
+    
+    private transient ObjectInspector reflectOi;
+    private transient SettableStructObjectInspector orcOi;
+    
+    @Override
+    public void initialize() {
+      reflectOi = ObjectInspectorFactory.getReflectionObjectInspector(typeClass,
+          ObjectInspectorOptions.JAVA);
+      orcOi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(typeInfo);
+    }
+    
+    public ReflectOutFn(Class<T> typeClass) {
+      this.typeClass = typeClass;
+      typeInfo = createReflectTypeInfo(typeClass);
+    }
+
+    @Override
+    public OrcStruct map(T input) {
+      return (OrcStruct) OrcUtils.convert(input, reflectOi, orcOi);
+    }
+    
+  }
+  
+  private static class TupleInFn<T extends Tuple> extends MapFn<OrcStruct, T> {
+    
+    private PType[] ptypes;
+    private TupleFactory<T> tupleFactory;
+    
+    private transient ObjectInspector tupleOi;
+    private transient ObjectInspector orcOi;
+    
+    public TupleInFn(TupleFactory<T> tupleFactory, PType... ptypes) {
+      this.tupleFactory = tupleFactory;
+      this.ptypes = ptypes;
+    }
+    
+    @Override
+    public void initialize() {
+      tupleOi = new TupleObjectInspector<T>(tupleFactory, ptypes);
+      TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
+      orcOi = OrcStruct.createObjectInspector(info);
+    }
+
+    @Override
+    public T map(OrcStruct input) {
+      return (T) OrcUtils.convert(input, orcOi, tupleOi);
+    }
+    
+  }
+  
+  private static class TupleOutFn<T extends Tuple> extends MapFn<T, OrcStruct> {
+    
+    private PType[] ptypes;
+    
+    private transient ObjectInspector tupleOi;
+    private transient ObjectInspector orcOi;
+    
+    public TupleOutFn(PType... ptypes) {
+      this.ptypes = ptypes;
+    }
+    
+    @Override
+    public void initialize() {
+      tupleOi = new TupleObjectInspector<T>(null, ptypes);
+      TypeInfo info = TypeInfoUtils.getTypeInfoFromObjectInspector(tupleOi);
+      orcOi = OrcStruct.createObjectInspector(info);
+    }
+
+    @Override
+    public OrcStruct map(T input) {
+      return (OrcStruct) OrcUtils.convert(input, tupleOi, orcOi);
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java b/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java
new file mode 100644
index 0000000..9f5cd94
--- /dev/null
+++ b/crunch-hive/src/main/java/org/apache/crunch/types/orc/TupleObjectInspector.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.orc;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Union;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.TupleFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableBinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * An object inspector to define the structure of Crunch Tuples
+ *
+ */
+public class TupleObjectInspector<T extends Tuple> extends StructObjectInspector {
+  
+  private TupleFactory<T> tupleFactory;
+  private List<TupleField> fields;
+  
+  public TupleObjectInspector(TupleFactory<T> tupleFactory, PType... ptypes) {
+    this.tupleFactory = tupleFactory;
+    fields = new ArrayList<TupleField>();
+    for (int i = 0; i < ptypes.length; i++) {
+      TupleField field = new TupleField(i, ptypes[i]);
+      fields.add(field);
+    }
+  }
+  
+  static class TupleField implements StructField {
+    
+    private int index;
+    private ObjectInspector oi;
+    
+    public TupleField(int index, PType<?> ptype) {
+      this.index = index;
+      oi = createObjectInspector(ptype);
+    }
+    
+    private ObjectInspector createObjectInspector(PType<?> ptype) {
+      Class typeClass = ptype.getTypeClass();
+      if (typeClass == Union.class || typeClass == Void.class) {
+        throw new IllegalArgumentException(typeClass.getName() + " is not supported yet");
+      }
+      
+      ObjectInspector result;
+      if (typeClass == ByteBuffer.class) {
+        result = new ByteBufferObjectInspector();
+      } else if (typeClass == Collection.class) {
+        ObjectInspector itemOi = createObjectInspector(ptype.getSubTypes().get(0));
+        result = ObjectInspectorFactory.getStandardListObjectInspector(itemOi);
+      } else if (typeClass == Map.class) {
+        ObjectInspector keyOi = ObjectInspectorFactory
+            .getReflectionObjectInspector(String.class, ObjectInspectorOptions.JAVA);
+        ObjectInspector valueOi = createObjectInspector(ptype.getSubTypes().get(0));
+        result = ObjectInspectorFactory.getStandardMapObjectInspector(keyOi, valueOi);
+      } else if (Tuple.class.isAssignableFrom(typeClass)) {
+        result = new TupleObjectInspector(TupleFactory.getTupleFactory(typeClass),
+            ptype.getSubTypes().toArray(new PType[0]));
+      } else {
+        result = ObjectInspectorFactory.getReflectionObjectInspector(typeClass,
+            ObjectInspectorOptions.JAVA);
+      }
+      return result;
+    }
+
+    @Override
+    public String getFieldName() {
+      return "_col" + index;
+    }
+
+    @Override
+    public ObjectInspector getFieldObjectInspector() {
+      return oi;
+    }
+
+    @Override
+    public String getFieldComment() {
+      return null;
+    }
+    
+  }
+
+  @Override
+  public String getTypeName() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("struct<");
+    for (int i = 0; i < fields.size(); ++i) {
+        StructField field = fields.get(i);
+        if (i != 0) {
+            buffer.append(",");
+        }
+        buffer.append(field.getFieldName());
+        buffer.append(":");
+        buffer.append(field.getFieldObjectInspector().getTypeName());
+    }
+    buffer.append(">");
+    return buffer.toString();
+  }
+
+  @Override
+  public Category getCategory() {
+    return Category.STRUCT;
+  }
+  
+  public T create(Object... values) {
+    return tupleFactory.makeTuple(values);
+  }
+
+  @Override
+  public List<? extends StructField> getAllStructFieldRefs() {
+    return fields;
+  }
+
+  @Override
+  public StructField getStructFieldRef(String fieldName) {
+    for (StructField field : fields) {
+      if (field.getFieldName().equals(fieldName)) {
+        return field;
+      }
+    }
+    return null;
+  }
+  
+  @Override
+  public Object getStructFieldData(Object data, StructField fieldRef) {
+    TupleField field = (TupleField) fieldRef;
+    return ((T) data).get(field.index);
+  }
+
+  @Override
+  public List<Object> getStructFieldsDataAsList(Object data) {
+    T tuple = (T) data;
+    List<Object> result = new ArrayList<Object>();
+    for (int i = 0; i < tuple.size(); i++) {
+      result.add(tuple.get(i));
+    }
+    return result;
+  }
+  
+  
+  static class ByteBufferObjectInspector extends AbstractPrimitiveJavaObjectInspector implements SettableBinaryObjectInspector {
+
+    ByteBufferObjectInspector() {
+      super(TypeInfoFactory.binaryTypeInfo);
+    }
+    
+    @Override
+    public ByteBuffer copyObject(Object o) {
+      if (o == null) {
+        return null;
+      }
+      byte[] oldBytes = getPrimitiveJavaObject(o);
+      byte[] copiedBytes = new byte[oldBytes.length];
+      System.arraycopy(oldBytes, 0, copiedBytes, 0, oldBytes.length);
+      ByteBuffer duplicate = ByteBuffer.wrap(copiedBytes);
+      return duplicate;
+    }
+    
+    @Override
+    public BytesWritable getPrimitiveWritableObject(Object o) {
+      if (o == null) {
+        return null;
+      }
+      ByteBuffer buf = (ByteBuffer) o;
+      BytesWritable bw = new BytesWritable();
+      bw.set(buf.array(), buf.arrayOffset(), buf.limit());
+      return bw;
+    }
+    
+    @Override
+    public byte[] getPrimitiveJavaObject(Object o) {
+      if (o == null) {
+        return null;
+      }
+      ByteBuffer buf = (ByteBuffer) o;
+      byte[] b = new byte[buf.limit()];
+      System.arraycopy(buf.array(), buf.arrayOffset(), b, 0, b.length);
+      return b;
+    }
+
+    @Override
+    public Object set(Object o, byte[] b) {
+      throw new UnsupportedOperationException("set is not supported");
+    }
+
+    @Override
+    public Object set(Object o, BytesWritable bw) {
+      throw new UnsupportedOperationException("set is not supported");
+    }
+
+    @Override
+    public ByteBuffer create(byte[] bb) {
+      return bb == null ? null : ByteBuffer.wrap(bb);
+    }
+
+    @Override
+    public ByteBuffer create(BytesWritable bw) {
+      return bw == null ? null : ByteBuffer.wrap(bw.getBytes());
+    }
+
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java
new file mode 100644
index 0000000..53dbfaf
--- /dev/null
+++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderFactoryTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.orc.OrcUtils;
+import org.apache.crunch.types.orc.Orcs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class OrcFileReaderFactoryTest extends OrcFileTest {
+  
+  @Test
+  public void testColumnPruning() throws IOException {
+    Path path = new Path(tempPath, "test.orc");
+    
+    String typeStr = "struct<a:int,b:string,c:float>";
+    TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    StructObjectInspector soi = (StructObjectInspector) OrcStruct.createObjectInspector(info);
+    PType<OrcStruct> ptype = Orcs.orcs(info);
+    
+    OrcFileWriter<OrcStruct> writer = new OrcFileWriter<OrcStruct>(conf, path, ptype);
+    writer.write(OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("Alice"), new FloatWritable(167.2f)));
+    writer.write(OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("Bob"), new FloatWritable(179.7f)));
+    writer.close();
+    
+    int[] readColumns = {1};
+    OrcFileSource<OrcStruct> source = new OrcFileSource<OrcStruct>(path, ptype, readColumns);
+    for (OrcStruct row : source.read(conf)) {
+      List<Object> list = soi.getStructFieldsDataAsList(row);
+      assertNull(list.get(0));
+      assertNotNull(list.get(1));
+      assertNull(list.get(2));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java
new file mode 100644
index 0000000..f049608
--- /dev/null
+++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileReaderWriterTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.crunch.io.orc.OrcFileSource;
+import org.apache.crunch.io.orc.OrcFileWriter;
+import org.apache.crunch.test.orc.pojos.Person;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.orc.Orcs;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class OrcFileReaderWriterTest extends OrcFileTest {
+  
+  @Test
+  public void testReadWrite() throws IOException {
+    Path path = new Path(tempPath, "test.orc");
+    PType<Person> ptype = Orcs.reflects(Person.class);
+    OrcFileWriter<Person> writer = new OrcFileWriter<Person>(conf, path, ptype);
+    
+    Person p1 = new Person("Alice", 23, Arrays.asList("666-677-9999"));
+    Person p2 = new Person("Bob", 26, null);
+    
+    writer.write(p1);
+    writer.write(p2);
+    writer.close();
+    
+    OrcFileSource<Person> reader = new OrcFileSource<Person>(path, ptype);
+    Iterator<Person> iter = reader.read(conf).iterator();
+    assertEquals(p1, iter.next());
+    assertEquals(p2, iter.next());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java
new file mode 100644
index 0000000..dcc9cd6
--- /dev/null
+++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcFileTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.common.io.Files;
+
+public class OrcFileTest {
+
+  protected transient Configuration conf;
+  protected transient FileSystem fs;
+  protected transient Path tempPath;
+  
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    tempPath = new Path(Files.createTempDir().getAbsolutePath());
+    fs = tempPath.getFileSystem(conf);
+  }
+  
+  @After
+  public void tearDown() throws IOException {
+    fs.delete(tempPath, true);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/363c8243/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java
----------------------------------------------------------------------
diff --git a/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java
new file mode 100644
index 0000000..a281890
--- /dev/null
+++ b/crunch-hive/src/test/java/org/apache/crunch/io/orc/OrcWritableTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.orc;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.crunch.types.orc.OrcUtils;
+import org.apache.crunch.types.writable.WritableDeepCopier;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class OrcWritableTest {
+  
+  @Test
+  public void testDeepCopy() {
+    String typeStr = "struct<a:int,b:string,c:float>";
+    TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    StructObjectInspector oi = (StructObjectInspector) OrcStruct.createObjectInspector(info);
+    BinarySortableSerDe serde = OrcUtils.createBinarySerde(info);
+    
+    OrcStruct struct = OrcUtils.createOrcStruct(info,
+        new IntWritable(1), new Text("Alice"), new FloatWritable(165.3f));
+    OrcWritable writable = new OrcWritable();
+    writable.set(struct);
+    assertTrue(struct == writable.get());
+    
+    writable.setObjectInspector(oi);
+    writable.setSerde(serde);
+    
+    WritableDeepCopier<OrcWritable> deepCopier = new WritableDeepCopier<OrcWritable>(OrcWritable.class);
+    OrcWritable copied = deepCopier.deepCopy(writable);
+    assertTrue(writable != copied);
+    assertEquals(writable, copied);
+    
+    copied.setObjectInspector(oi);
+    copied.setSerde(serde);
+    OrcStruct copiedStruct = copied.get();
+    assertTrue(struct != copiedStruct);
+    assertEquals(struct, copiedStruct);
+    
+    List<Object> items = oi.getStructFieldsDataAsList(struct);
+    List<Object> copiedItems = oi.getStructFieldsDataAsList(copiedStruct);
+    for (int i = 0; i < items.size(); i++) {
+      assertTrue(items.get(i) != copiedItems.get(i));
+      assertEquals(items.get(i), copiedItems.get(i));
+    }
+    
+    OrcWritable copied2 = deepCopier.deepCopy(copied);
+    assertTrue(copied2 != copied);
+    assertEquals(copied2, copied);
+    
+    copied2.setObjectInspector(oi);
+    copied2.setSerde(serde);
+    OrcStruct copiedStruct2 = copied2.get();
+    assertTrue(copiedStruct2 != copiedStruct);
+    assertEquals(copiedStruct2, copiedStruct);
+    
+    List<Object> copiedItems2 = oi.getStructFieldsDataAsList(copiedStruct2);
+    for (int i = 0; i < items.size(); i++) {
+      assertTrue(copiedItems2.get(i) != copiedItems.get(i));
+      assertEquals(copiedItems2.get(i), copiedItems.get(i));
+    }
+  }
+  
+  @Test
+  public void testCompareTo() {
+    String typeStr = "struct<a:int,b:string,c:float>";
+    TypeInfo info = TypeInfoUtils.getTypeInfoFromTypeString(typeStr);
+    StructObjectInspector oi = (StructObjectInspector) OrcStruct.createObjectInspector(info);
+    BinarySortableSerDe serde = OrcUtils.createBinarySerde(info);
+    
+    OrcStruct struct1 = OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("AAA"), new FloatWritable(3.2f));
+    OrcStruct struct2 = OrcUtils.createOrcStruct(info, new IntWritable(1), new Text("AAB"), null);
+    OrcStruct struct3 = OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("AAA"), null);
+    OrcStruct struct4 = OrcUtils.createOrcStruct(info, new IntWritable(2), new Text("AAA"), new FloatWritable(3.2f));
+    
+    OrcWritable writable1 = new OrcWritable();
+    writable1.set(struct1);
+    OrcWritable writable2 = new OrcWritable();
+    writable2.set(struct2);
+    OrcWritable writable3 = new OrcWritable();
+    writable3.set(struct3);
+    OrcWritable writable4 = new OrcWritable();
+    writable4.set(struct4);
+    
+    writable1.setObjectInspector(oi);
+    writable2.setObjectInspector(oi);
+    writable3.setObjectInspector(oi);
+    writable4.setObjectInspector(oi);
+    writable1.setSerde(serde);
+    writable2.setSerde(serde);
+    writable3.setSerde(serde);
+    writable4.setSerde(serde);
+    
+    assertTrue(writable1.compareTo(writable2) < 0);
+    assertTrue(writable2.compareTo(writable3) < 0);
+    assertTrue(writable1.compareTo(writable3) < 0);
+    assertTrue(writable3.compareTo(writable4) < 0);
+  }
+
+}


Mime
View raw message