incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [5/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
new file mode 100644
index 0000000..6b49735
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.io.impl.InputBundle;
+import com.google.common.collect.Lists;
+
+public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException,
+      InterruptedException {
+    List<InputSplit> splits = Lists.newArrayList();
+    Configuration conf = job.getConfiguration();
+    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
+
+    // First, build a map of InputFormats to Paths
+    for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
+      InputBundle inputBundle = entry.getKey();
+      Job jobCopy = new Job(conf);
+      InputFormat<?,?> format = (InputFormat<?,?>) ReflectionUtils.newInstance(
+          inputBundle.getInputFormatClass(), jobCopy.getConfiguration());
+      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue()
+          .entrySet()) {
+        Integer nodeIndex = nodeEntry.getKey();
+        List<Path> paths = nodeEntry.getValue();
+        FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
+
+        // Get splits for each input path and tag with InputFormat
+        // and Mapper types by wrapping in a TaggedInputSplit.
+        List<InputSplit> pathSplits = format.getSplits(jobCopy);
+        for (InputSplit pathSplit : pathSplits) {
+          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(),
+              inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
+        }
+      }
+    }
+    return splits;
+  }
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CrunchRecordReader<K,V>(inputSplit, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
new file mode 100644
index 0000000..b57ca58
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class CrunchInputSplit extends InputSplit implements Configurable, Writable {
+
+  private InputSplit inputSplit;
+  private Class<? extends InputFormat> inputFormatClass;
+  private Map<String, String> extraConf;
+  private int nodeIndex;
+  private Configuration conf;
+
+  public CrunchInputSplit() {
+    // default constructor
+  }
+
+  public CrunchInputSplit(InputSplit inputSplit,
+      Class<? extends InputFormat> inputFormatClass, Map<String, String> extraConf,
+      int nodeIndex, Configuration conf) {
+    this.inputSplit = inputSplit;
+    this.inputFormatClass = inputFormatClass;
+    this.extraConf = extraConf;
+    this.nodeIndex = nodeIndex;
+    this.conf = conf;
+  }
+
+  public int getNodeIndex() {
+    return nodeIndex;
+  }
+
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    nodeIndex = in.readInt();
+    int extraConfSize = in.readInt();
+    if (extraConfSize > 0) {
+      for (int i = 0; i < extraConfSize; i++) {
+        conf.set(in.readUTF(), in.readUTF());
+      }
+    }
+    inputFormatClass = (Class<? extends InputFormat<?,?>>) readClass(in);
+    Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+        .newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream) in);
+    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(nodeIndex);
+    out.writeInt(extraConf.size());
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      out.writeUTF(e.getKey());
+      out.writeUTF(e.getValue());
+    }
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, inputSplit.getClass().getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = factory.getSerializer(inputSplit.getClass());
+    serializer.open((DataOutputStream) out);
+    serializer.serialize(inputSplit);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
new file mode 100644
index 0000000..8fa1d56
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import org.apache.crunch.io.impl.InputBundle;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CrunchInputs {
+
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+  
+  public static void addInputPath(Job job, Path path,
+      InputBundle inputBundle, int nodeIndex) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+    String existing = conf.get(RuntimeParameters.MULTI_INPUTS);
+    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP
+        + inputs);
+  }
+
+  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(
+      JobContext job) {
+    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
+    Configuration conf = job.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0));
+      if (!formatNodeMap.containsKey(inputBundle)) {
+        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
+      }
+      Integer nodeIndex = Integer.valueOf(fields.get(1));
+      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
+        formatNodeMap.get(inputBundle).put(nodeIndex,
+            Lists.<Path> newLinkedList());
+      }
+      formatNodeMap.get(inputBundle).get(nodeIndex)
+          .add(new Path(fields.get(2)));
+    }
+    return formatNodeMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
new file mode 100644
index 0000000..814c8c3
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
+  
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+  
+  @Override
+  protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
+    List<RTNode> nodes;
+    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
+    try {
+      nodes = ctxt.getNodes();
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    if (nodes.size() == 1) {
+      this.node = nodes.get(0);
+    } else {
+      CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
+      this.node = nodes.get(split.getNodeIndex());
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void map(Object k, Object v,
+      Mapper<Object, Object, Object, Object>.Context context) {
+    if (debug) {
+      try {
+        node.process(k, v);
+      } catch (Exception e) {
+        LOG.error("Mapper exception", e);
+      }
+    } else {
+      node.process(k, v);
+    }
+  }
+
+  @Override
+  protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
new file mode 100644
index 0000000..5967aa9
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchRecordReader<K, V> extends RecordReader<K, V> {
+
+  private final RecordReader<K, V> delegate;
+
+  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf());
+    this.delegate = inputFormat.createRecordReader(
+        crunchSplit.getInputSplit(), TaskAttemptContextFactory.create(
+            crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return delegate.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return delegate.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return delegate.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputSplit delegateSplit = crunchSplit.getInputSplit();
+    delegate.initialize(delegateSplit, TaskAttemptContextFactory.create(
+        crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return delegate.nextKeyValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
new file mode 100644
index 0000000..aa5fc95
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
+  
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+  
+  protected NodeContext getNodeContext() {
+    return NodeContext.REDUCE;
+  }
+  
+  @Override
+  protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
+    this.ctxt = new CrunchTaskContext(context, getNodeContext());
+    try {
+      List<RTNode> nodes = ctxt.getNodes();
+      this.node = nodes.get(0);
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void reduce(Object key, Iterable<Object> values,
+      Reducer<Object, Object, Object, Object>.Context context) {
+    if (debug) {
+      try {
+        node.processIterable(key, values);
+      } catch (Exception e) {
+        LOG.error("Reducer exception", e);
+      }
+    } else {
+      node.processIterable(key, values);
+    }
+  }
+
+  @Override
+  protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
new file mode 100644
index 0000000..9eac51c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+public class CrunchRuntimeException extends RuntimeException {
+
+  private boolean logged = false;
+  
+  public CrunchRuntimeException(String msg) {
+    super(msg);
+  }
+  
+  public CrunchRuntimeException(Exception e) {
+    super(e);
+  }
+  
+  public CrunchRuntimeException(String msg, Exception e) {
+    super(msg, e);
+  }
+
+  public boolean wasLogged() {
+    return logged;
+  }
+  
+  public void markLogged() {
+    this.logged = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
new file mode 100644
index 0000000..dd04813
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.util.DistCache;
+
+public class CrunchTaskContext {
+
+  private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+  private final NodeContext nodeContext;
+  private CrunchMultipleOutputs<Object, Object> multipleOutputs;
+
+  public CrunchTaskContext(
+      TaskInputOutputContext<Object, Object, Object, Object> taskContext,
+      NodeContext nodeContext) {
+    this.taskContext = taskContext;
+    this.nodeContext = nodeContext;
+  }
+
+  public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
+    return taskContext;
+  }
+
+  public NodeContext getNodeContext() {
+    return nodeContext;
+  }
+
+  public List<RTNode> getNodes() throws IOException {
+    Configuration conf = taskContext.getConfiguration();
+    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+    @SuppressWarnings("unchecked")
+    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
+    if (nodes != null) {
+      for (RTNode node : nodes) {
+        node.initialize(this);
+      }
+    }
+    return nodes;
+  }
+  
+  public boolean isDebugRun() {
+    Configuration conf = taskContext.getConfiguration();
+    return conf.getBoolean(RuntimeParameters.DEBUG, false);
+  }
+  
+  public void cleanup() {
+    if (multipleOutputs != null) {
+      try {
+        multipleOutputs.close();
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public CrunchMultipleOutputs<Object, Object> getMultipleOutputs() {
+    if (multipleOutputs == null) {
+      multipleOutputs = new CrunchMultipleOutputs<Object, Object>(taskContext);
+    }
+    return multipleOutputs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
new file mode 100644
index 0000000..648e87c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.impl.mr.plan.DoNode;
+
+/**
+ * Enum that is associated with a serialized {@link DoNode} instance, so we know
+ * how to use it within the context of a particular MR job.
+ * 
+ */
+public enum NodeContext {
+  MAP, REDUCE, COMBINE;
+
+  public String getConfigurationKey() {
+    return "crunch.donode." + toString().toLowerCase();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
new file mode 100644
index 0000000..4debfc1
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
+import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
+import org.apache.crunch.impl.mr.emit.OutputEmitter;
+import org.apache.crunch.types.Converter;
+
+public class RTNode implements Serializable {
+  
+  private static final Log LOG = LogFactory.getLog(RTNode.class);
+  
+  private final String nodeName;
+  private DoFn<Object, Object> fn;
+  private final List<RTNode> children;
+  private final Converter inputConverter;
+  private final Converter outputConverter;
+  private final String outputName;
+
+  private transient Emitter<Object> emitter;
+
+  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children,
+      Converter inputConverter, Converter outputConverter, String outputName) {
+    this.fn = fn;
+    this.nodeName = name;
+    this.children = children;
+    this.inputConverter = inputConverter;
+    this.outputConverter = outputConverter;
+    this.outputName = outputName;
+  }
+
+  public void initialize(CrunchTaskContext ctxt) {
+    if (emitter != null) {
+      // Already initialized
+      return;
+    }
+    
+    fn.setContext(ctxt.getContext());
+    for (RTNode child : children) {
+      child.initialize(ctxt);
+    }
+
+    if (outputConverter != null) {
+      if (outputName != null) {
+        this.emitter = new MultipleOutputEmitter(
+            outputConverter, ctxt.getMultipleOutputs(), outputName);
+      } else {
+        this.emitter = new OutputEmitter(
+            outputConverter, ctxt.getContext());
+      }
+    } else if (!children.isEmpty()) {
+      this.emitter = new IntermediateEmitter(children);
+    } else {
+      throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
+    }
+  }
+
+  public boolean isLeafNode() {
+    return outputConverter != null && children.isEmpty();
+  }
+
+  public void process(Object input) {
+    try {
+      fn.process(input, emitter);
+    } catch (CrunchRuntimeException e) {
+      if (!e.wasLogged()) {
+        LOG.info(String.format("Crunch exception in '%s' for input: %s",
+            nodeName, input.toString()), e);
+        e.markLogged();
+      }
+      throw e;
+    }
+  }
+
+  public void process(Object key, Object value) {
+    process(inputConverter.convertInput(key, value));
+  }
+
+  public void processIterable(Object key, Iterable values) {
+    process(inputConverter.convertIterableInput(key, values));
+  }
+  
+  public void cleanup() {
+    fn.cleanup(emitter);
+    emitter.flush();
+    for (RTNode child : children) {
+      child.cleanup();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children="
+        + children + ", inputConverter=" + inputConverter
+        + ", outputConverter=" + outputConverter + ", outputName=" + outputName
+        + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
new file mode 100644
index 0000000..5e534eb
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+/**
+ * Parameters used during the runtime execution.
+ *
+ */
+public class RuntimeParameters {
+
+  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
+  
+  public static final String MULTI_INPUTS = "crunch.inputs.dir";
+
+  public static final String DEBUG = "crunch.debug";
+  
+  // Not instantiated
+  private RuntimeParameters() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
new file mode 100644
index 0000000..2cfa615
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptContextFactory {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
+
+  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
+  
+  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
+    return INSTANCE.createInternal(conf, taskAttemptId);
+  }
+  
+  private Constructor taskAttemptConstructor;
+  
+  private TaskAttemptContextFactory() {
+    Class implClass = TaskAttemptContext.class;
+    if (implClass.isInterface()) {
+      try {
+        implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      } catch (ClassNotFoundException e) {
+        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
+      }
+    }
+    try {
+      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
+    } catch (Exception e) {
+      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
+    }
+  }
+  
+  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
+    try {
+      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
+    } catch (Exception e) {
+      LOG.error("Could not construct a TaskAttemptContext instance", e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/At.java b/crunch/src/main/java/org/apache/crunch/io/At.java
new file mode 100644
index 0000000..2f5fe8b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/At.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSourceTarget;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.Writables;
+
+/**
+ * Static factory methods for creating various {@link SourceTarget} types.
+ *
+ */
+public class At {
+  public static <T> AvroFileSourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
+	return avroFile(new Path(pathName), avroType);
+  }
+  
+  public static <T> AvroFileSourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
+	return new AvroFileSourceTarget<T>(path, avroType);
+  }
+  
+  public static HBaseSourceTarget hbaseTable(String table) {
+	return hbaseTable(table, new Scan());
+  }
+  
+  public static HBaseSourceTarget hbaseTable(String table, Scan scan) {
+	return new HBaseSourceTarget(table, scan);
+  }
+  
+  public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
+	return sequenceFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> SeqFileSourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
+	return new SeqFileSourceTarget<T>(path, ptype);
+  }
+  
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType,
+      PType<V> valueType) {
+	return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+  
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType,
+      PType<V> valueType) {
+	PTypeFamily ptf = keyType.getFamily();
+	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+  
+  public static TextFileSourceTarget<String> textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static TextFileSourceTarget<String> textFile(Path path) {
+	return textFile(path, Writables.strings());
+  }
+  
+  public static <T> TextFileSourceTarget<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> TextFileSourceTarget<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
new file mode 100644
index 0000000..0e4014a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/CompositePathIterable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+public class CompositePathIterable<T> implements Iterable<T> {
+
+  private final FileStatus[] stati;
+  private final FileSystem fs;
+  private final FileReaderFactory<T> readerFactory;
+
+  private static final PathFilter FILTER = new PathFilter() {
+	@Override
+	public boolean accept(Path path) {
+	  return !path.getName().startsWith("_");
+	}
+  };
+  
+  public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
+    
+    if (!fs.exists(path)){
+      throw new IOException("No files found to materialize at: " + path);
+    }
+    
+    FileStatus[] stati = null;
+    try {
+      stati = fs.listStatus(path, FILTER);
+    } catch (FileNotFoundException e) {
+      stati = null;
+    }
+    if (stati == null) {
+      throw new IOException("No files found to materialize at: " + path);
+    }
+
+    if (stati.length == 0) {
+      return Collections.emptyList();
+    } else {
+      return new CompositePathIterable<S>(stati, fs, readerFactory);
+    }
+
+  }
+  
+  private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
+	this.stati = stati;
+	this.fs = fs;
+	this.readerFactory = readerFactory;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+
+	return new UnmodifiableIterator<T>() {
+	  private int index = 0;
+	  private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
+	  
+	  @Override
+	  public boolean hasNext() {
+		if (!iter.hasNext()) {
+		  while (index < stati.length) {
+       	    iter = readerFactory.read(fs, stati[index++].getPath());
+			if (iter.hasNext()) {
+			  return true;
+			}
+		  }
+		  return false;
+		}
+		return true;
+	  }
+
+	  @Override
+	  public T next() {
+		return iter.next();
+	  }
+	};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
new file mode 100644
index 0000000..5cccb7b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface FileReaderFactory<T> {
+  Iterator<T> read(FileSystem fs, Path path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
new file mode 100644
index 0000000..175c316
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/From.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.avro.AvroFileSource;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.seq.SeqFileSource;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.Writables;
+
+/**
+ * Static factory methods for creating various {@link Source} types.
+ *
+ */
+public class From {
+
+  public static <K, V> TableSource<K, V> formattedFile(String path,
+      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
+	return formattedFile(new Path(path), formatClass, keyType, valueType);
+  }
+
+  public static <K, V> TableSource<K, V> formattedFile(Path path,
+      Class<? extends FileInputFormat> formatClass, PType<K> keyType, PType<V> valueType) {
+	PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);                                             	
+  }
+
+  public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
+	return avroFile(new Path(pathName), avroType);
+  }
+  
+  public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
+	return new AvroFileSource<T>(path, avroType);
+  }
+  
+  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table) {
+	return hbaseTable(table, new Scan());
+  }
+  
+  public static TableSource<ImmutableBytesWritable, Result> hbaseTable(String table, Scan scan) {
+	return new HBaseSourceTarget(table, scan);
+  }
+  
+  public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
+	return sequenceFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
+	return new SeqFileSource<T>(path, ptype);
+  }
+  
+  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType,
+      PType<V> valueType) {
+	return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+  
+  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType,
+      PType<V> valueType) {
+	PTypeFamily ptf = keyType.getFamily();
+	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+  
+  public static Source<String> textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static Source<String> textFile(Path path) {
+	return textFile(path, Writables.strings());
+  }
+  
+  public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> Source<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSource<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
new file mode 100644
index 0000000..09df684
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface MapReduceTarget extends Target {
+  void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java b/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
new file mode 100644
index 0000000..01d7f99
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface OutputHandler {
+  boolean configure(Target target, PType<?> ptype);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
new file mode 100644
index 0000000..884ba43
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+
+public interface PathTarget extends MapReduceTarget {
+  Path getPath();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
new file mode 100644
index 0000000..050171f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.crunch.types.PType;
+
+public abstract class PathTargetImpl implements PathTarget {
+
+  private final Path path;
+  private final Class<OutputFormat> outputFormatClass;
+  private final Class keyClass;
+  private final Class valueClass;
+  
+  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass,
+	  Class keyClass, Class valueClass) {
+	this(new Path(path), outputFormatClass, keyClass, valueClass);
+  }
+  
+  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass,
+	  Class keyClass, Class valueClass) {
+	this.path = path;
+	this.outputFormatClass = outputFormatClass;
+	this.keyClass = keyClass;
+	this.valueClass = valueClass;
+  }
+  
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+	  String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, path);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (name == null) {
+      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+    } else {
+      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass,
+          keyClass, valueClass);
+    }
+  }
+
+  @Override
+  public Path getPath() {
+	return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
new file mode 100644
index 0000000..0ecbec0
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Source;
+
+public interface ReadableSource<T> extends Source<T> {
+  Iterable<T> read(Configuration conf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
new file mode 100644
index 0000000..112508f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.SourceTarget;
+
+/**
+ * An interface that indicates that a {@code SourceTarget} instance can be
+ * read into the local client.
+ *
+ * @param <T> The type of data read.
+ */
+public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
new file mode 100644
index 0000000..522bd42
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Functions for configuring the inputs/outputs of MapReduce jobs.
+ * 
+ */
+public class SourceTargetHelper {
+
+	private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
+
+	public static long getPathSize(Configuration conf, Path path) throws IOException {
+		return getPathSize(FileSystem.get(conf), path);
+	}
+
+	public static long getPathSize(FileSystem fs, Path path) throws IOException {
+
+		if (!fs.exists(path)) {
+			return -1L;
+		}
+
+		FileStatus[] stati = null;
+		try {
+			stati = fs.listStatus(path);
+			if (stati == null) {
+				throw new FileNotFoundException(path + " doesn't exist");
+			}
+		} catch (FileNotFoundException e) {
+			LOG.warn("Returning 0 for getPathSize on non-existant path '" + path + "'");
+			return 0L;
+		}
+
+		if (stati.length == 0) {
+			return 0L;
+		}
+		long size = 0;
+		for (FileStatus status : stati) {
+			size += status.getLen();
+		}
+		return size;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
new file mode 100644
index 0000000..afe3655
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/To.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.hbase.HBaseTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.io.seq.SeqFileTarget;
+import org.apache.crunch.io.text.TextFileTarget;
+
+/**
+ * Static factory methods for creating various {@link Target} types.
+ *
+ */
+public class To {
+  
+  public static Target formattedFile(String pathName, Class<? extends FileOutputFormat> formatClass) {
+	return formattedFile(new Path(pathName), formatClass);
+  }
+  
+  public static Target formattedFile(Path path, Class<? extends FileOutputFormat> formatClass) {
+	return new FileTargetImpl(path, formatClass);
+  }
+  
+  public static Target avroFile(String pathName) {
+	return avroFile(new Path(pathName));
+  }
+  
+  public static Target avroFile(Path path) {
+	return new AvroFileTarget(path);
+  }
+  
+  public static Target hbaseTable(String table) {
+	return new HBaseTarget(table);
+  }
+  
+  public static Target sequenceFile(String pathName) {
+	return sequenceFile(new Path(pathName));
+  }
+  
+  public static Target sequenceFile(Path path) {
+	return new SeqFileTarget(path);
+  }
+  
+  public static Target textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static Target textFile(Path path) {
+	return new TextFileTarget(path);
+  }  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
new file mode 100644
index 0000000..c92c3ee
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.avro.AvroType;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+
+	private static final Log LOG = LogFactory
+			.getLog(AvroFileReaderFactory.class);
+
+	private final DatumReader<T> recordReader;
+	private final MapFn<T, T> mapFn;
+	private final Configuration conf;
+
+	public AvroFileReaderFactory(AvroType<T> atype, Configuration conf) {
+		this.recordReader = createDatumReader(atype);
+		this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+		this.conf = conf;
+	}
+
+	private DatumReader<T> createDatumReader(AvroType<T> avroType) {
+		if (avroType.isSpecific()) {
+			return new SpecificDatumReader<T>(avroType.getSchema());
+		} else if (avroType.isGeneric()) {
+			return new GenericDatumReader<T>(avroType.getSchema());
+		} else {
+			return new ReflectDatumReader<T>(avroType.getSchema());
+		}
+	}
+
+	@Override
+	public Iterator<T> read(FileSystem fs, final Path path) {
+		this.mapFn.setConfigurationForTest(conf);
+		this.mapFn.initialize();
+		try {
+			FsInput fsi = new FsInput(path, fs.getConf());
+			final DataFileReader<T> reader = new DataFileReader<T>(fsi,
+					recordReader);
+			return new UnmodifiableIterator<T>() {
+				@Override
+				public boolean hasNext() {
+					return reader.hasNext();
+				}
+
+				@Override
+				public T next() {
+					return mapFn.map(reader.next());
+				}
+			};
+		} catch (IOException e) {
+			LOG.info("Could not read avro file at path: " + path, e);
+			return Iterators.emptyIterator();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
new file mode 100644
index 0000000..3cbe924
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.io.impl.InputBundle;
+import org.apache.crunch.types.avro.AvroInputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+  public AvroFileSource(Path path, AvroType<T> ptype) {
+    super(path, ptype, new InputBundle(AvroInputFormat.class)
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(!ptype.isSpecific()))
+        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName()));
+  }
+
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
+        (AvroType<T>) ptype, conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
new file mode 100644
index 0000000..e1ff540
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+
+public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public AvroFileSourceTarget(Path path, AvroType<T> atype) {
+	super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path));
+  }
+  
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
new file mode 100644
index 0000000..f0340a3
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+public class AvroFileTarget extends FileTargetImpl {
+  public AvroFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public AvroFileTarget(Path path) {
+    super(path, AvroOutputFormat.class);
+  }
+    
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+  
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!(ptype instanceof AvroType)) {
+      return false;
+    }
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath,
+      String name) {
+    AvroType<?> atype = (AvroType<?>) ptype;
+    Configuration conf = job.getConfiguration();
+    String schemaParam = null;
+    if (name == null) {
+      schemaParam = "avro.output.schema";
+    } else {
+      schemaParam = "avro.output.schema." + name;
+    }
+    String outputSchema = conf.get(schemaParam);
+    if (outputSchema == null) {
+      conf.set(schemaParam, atype.getSchema().toString());
+    } else if (!outputSchema.equals(atype.getSchema().toString())) {
+      throw new IllegalStateException("Avro targets must use the same output schema");
+    }
+    Avros.configureReflectDataFactory(conf);
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class,
+        outputPath, name);
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof AvroType) {
+      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
new file mode 100644
index 0000000..0df84e5
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+
+public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
+    TableSource<ImmutableBytesWritable, Result> {
+
+  private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
+      Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
+  
+  protected Scan scan;
+  
+  public HBaseSourceTarget(String table, Scan scan) {
+    super(table);
+    this.scan = scan;
+  }
+
+  @Override
+  public PType<Pair<ImmutableBytesWritable, Result>> getType() {
+    return PTYPE;
+  }
+
+  @Override
+  public PTableType<ImmutableBytesWritable, Result> getTableType() {
+    return PTYPE;
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof HBaseSourceTarget)) {
+      return false;
+    }
+    HBaseSourceTarget o = (HBaseSourceTarget) other;
+    // XXX scan does not have equals method
+    return table.equals(o.table) && scan.equals(o.scan);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(table).append(scan).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+  
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setInputFormatClass(TableInputFormat.class);
+    job.setMapperClass(CrunchMapper.class);
+    HBaseConfiguration.addHbaseResources(conf);
+    conf.set(TableInputFormat.INPUT_TABLE, table);
+    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    TableMapReduceUtil.addDependencyJars(job);
+  }
+  
+  static String convertScanToString(Scan scan) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(out);
+    scan.write(dos);
+    return Base64.encodeBytes(out.toByteArray());
+  }
+
+  @Override
+  public long getSize(Configuration conf) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
new file mode 100644
index 0000000..5e0b1c9
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.PType;
+
+public class HBaseTarget implements MapReduceTarget {
+
+  protected String table;
+  
+  public HBaseTarget(String table) {
+    this.table = table;
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if(this == other)
+      return true;
+    if(other == null)
+      return false;
+    if(!other.getClass().equals(getClass()))
+      return false;
+    HBaseTarget o = (HBaseTarget)other;
+    return table.equals(o.table);
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(table).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return "HBaseTable(" + table + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if(Put.class.equals(ptype.getTypeClass())) {
+      handler.configure(this, ptype);
+      return true;      
+    }
+    return false;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Configuration conf = job.getConfiguration();
+    HBaseConfiguration.addHbaseResources(conf);
+    job.setOutputFormatClass(TableOutputFormat.class);
+    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+    try {
+      TableMapReduceUtil.addDependencyJars(job);      
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    return null;
+  }
+}


Mime
View raw message