incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [3/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionCollection.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionCollection.java
deleted file mode 100644
index 12981ce..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionCollection.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-
-public class UnionCollection<S> extends PCollectionImpl<S> {
-
-  private List<PCollectionImpl<S>> parents;
-  private long size = 0;
-  
-  private static <S> String flatName(List<PCollectionImpl<S>> collections) {
-    StringBuilder sb = new StringBuilder("union(");
-    for (int i = 0; i < collections.size(); i++) {
-      if (i != 0) {
-        sb.append(',');
-      }
-      sb.append(collections.get(i).getName());
-    }
-    return sb.append(')').toString();
-  }
-  
-  UnionCollection(List<PCollectionImpl<S>> collections) {
-    super(flatName(collections));
-    this.parents = ImmutableList.copyOf(collections);
-    this.pipeline = (MRPipeline) parents.get(0).getPipeline();
-    for (PCollectionImpl<S> parent : parents) {
-      if (this.pipeline != parent.getPipeline()) {
-        throw new IllegalStateException(
-            "Cannot union PCollections from different Pipeline instances");
-      }
-      size += parent.getSize();
-    }
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return size;
-  }
-  
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitUnionCollection(this);
-  }
-
-  @Override
-  public PType<S> getPType() {
-    return parents.get(0).getPType();
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    throw new UnsupportedOperationException(
-        "Unioned collection does not support DoNodes");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionTable.java b/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionTable.java
deleted file mode 100644
index 4de8920..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/collect/UnionTable.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class UnionTable<K, V> extends PTableBase<K, V> {
-
-  private PTableType<K, V> ptype;
-  private List<PCollectionImpl<Pair<K, V>>> parents;
-  private long size;
-  
-  private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
-    StringBuilder sb = new StringBuilder("union(");
-    for (int i = 0; i < tables.size(); i++) {
-      if (i != 0) {
-        sb.append(',');
-      }
-      sb.append(tables.get(i).getName());
-    }
-    return sb.append(')').toString();
-  }
-  
-  public UnionTable(List<PTableBase<K, V>> tables) {
-    super(flatName(tables));
-    this.ptype = tables.get(0).getPTableType();
-    this.pipeline = (MRPipeline) tables.get(0).getPipeline();
-    this.parents = Lists.newArrayList();
-    for (PTableBase<K, V> parent : tables) {
-      if (pipeline != parent.getPipeline()) {
-        throw new IllegalStateException(
-            "Cannot union PTables from different Pipeline instances");
-      }
-      this.parents.add(parent);
-      size += parent.getSize();
-    }
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return size;
-  }
-  
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return ptype;
-  }
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return ptype;
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(
-        parents));
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    throw new UnsupportedOperationException(
-        "Unioned table does not support do nodes");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/emit/IntermediateEmitter.java b/src/main/java/com/cloudera/crunch/impl/mr/emit/IntermediateEmitter.java
deleted file mode 100644
index abfb074..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/emit/IntermediateEmitter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.emit;
-
-import java.util.List;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.impl.mr.run.RTNode;
-import com.google.common.collect.ImmutableList;
-
-/**
- * An {@link Emitter} implementation that links the output of one {@link DoFn}
- * to the input of another {@code DoFn}.
- * 
- */
-public class IntermediateEmitter implements Emitter<Object> {
-
-  private final List<RTNode> children;
-
-  public IntermediateEmitter(List<RTNode> children) {
-    this.children = ImmutableList.copyOf(children);
-  }
-
-  public void emit(Object emitted) {
-    for (RTNode child : children) {
-      child.process(emitted);
-    }
-  }
-
-  public void flush() {
-    // No-op
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/emit/MultipleOutputEmitter.java b/src/main/java/com/cloudera/crunch/impl/mr/emit/MultipleOutputEmitter.java
deleted file mode 100644
index ea13112..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.emit;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
-import com.cloudera.crunch.types.Converter;
-
-public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
-
-  private final Converter converter;
-  private final CrunchMultipleOutputs<K, V> outputs;
-  private final String outputName;
-
-  public MultipleOutputEmitter(Converter converter,
-      CrunchMultipleOutputs<K, V> outputs, String outputName) {
-    this.converter = converter;
-    this.outputs = outputs;
-    this.outputName = outputName;
-  }
-
-  @Override
-  public void emit(T emitted) {
-    try {
-      this.outputs.write(outputName, converter.outputKey(emitted),
-          converter.outputValue(emitted));
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-  @Override
-  public void flush() {
-    // No-op
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/emit/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/emit/OutputEmitter.java b/src/main/java/com/cloudera/crunch/impl/mr/emit/OutputEmitter.java
deleted file mode 100644
index b129262..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/emit/OutputEmitter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.emit;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
-import com.cloudera.crunch.types.Converter;
-
-public class OutputEmitter<T, K, V> implements Emitter<T> {
-
-  private final Converter<K, V, Object, Object> converter;
-  private final TaskInputOutputContext<?, ?, K, V> context;
-
-  public OutputEmitter(Converter<K, V, Object, Object> converter,
-      TaskInputOutputContext<?, ?, K, V> context) {
-    this.converter = converter;
-    this.context = context;
-  }
-
-  public void emit(T emitted) {
-    try {
-      K key = converter.outputKey(emitted);
-      V value = converter.outputValue(emitted);
-      this.context.write(key, value);
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (InterruptedException e) {
-      throw new CrunchRuntimeException(e);
-    }
-  }
-
-  public void flush() {
-    // No-op
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/exec/CrunchJob.java b/src/main/java/com/cloudera/crunch/impl/mr/exec/CrunchJob.java
deleted file mode 100644
index e8eeb7d..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/exec/CrunchJob.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.exec;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.crunch.impl.mr.plan.MSCROutputHandler;
-import com.cloudera.crunch.impl.mr.plan.PlanningParameters;
-import com.google.common.collect.Lists;
-
-public class CrunchJob extends CrunchControlledJob {
-
-  private final Log log = LogFactory.getLog(CrunchJob.class);
-  
-  private final Path workingPath;
-  private final List<Path> multiPaths;
-  private final boolean mapOnlyJob;
-  
-  public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException {
-    super(job, Lists.<CrunchControlledJob>newArrayList());
-    this.workingPath = workingPath;
-    this.multiPaths = handler.getMultiPaths();
-    this.mapOnlyJob = handler.isMapOnlyJob();
-  }  
-  
-  private synchronized void handleMultiPaths() throws IOException {
-    if (!multiPaths.isEmpty()) {
-      // Need to handle moving the data from the output directory of the
-      // job to the output locations specified in the paths.
-      FileSystem fs = FileSystem.get(job.getConfiguration());
-      for (int i = 0; i < multiPaths.size(); i++) {
-        Path src = new Path(workingPath,
-            PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
-        Path[] srcs = FileUtil.stat2Paths(fs.globStatus(src), src);
-        Path dst = multiPaths.get(i);
-        if (!fs.exists(dst)) {
-          fs.mkdirs(dst);
-        }
-        int minPartIndex = getMinPartIndex(dst, fs);
-        for (Path s : srcs) {
-          fs.rename(s, getDestFile(s, dst, minPartIndex++));
-        }
-      }
-    }
-  }
-  
-  private Path getDestFile(Path src, Path dir, int index) {
-    String form = "part-%s-%05d";
-    if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-      form = form + org.apache.avro.mapred.AvroOutputFormat.EXT;
-    }
-    return new Path(dir, String.format(form, mapOnlyJob ? "m" : "r", index));
-  }
-  
-  private int getMinPartIndex(Path path, FileSystem fs) throws IOException {
-    // Quick and dirty way to ensure unique naming in the directory
-    return fs.listStatus(path).length;
-  }
-  
-  @Override
-  protected void checkRunningState() throws IOException, InterruptedException {
-    try {
-      if (job.isComplete()) {
-        if (job.isSuccessful()) {
-          handleMultiPaths();
-          this.state = State.SUCCESS;
-        } else {
-          this.state = State.FAILED;
-          this.message = "Job failed!";
-        }
-      }
-    } catch (IOException ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (job != null) {
-          job.killJob();
-        }
-      } catch (IOException e) {
-      }
-    }
-  }
-
-  @Override
-  protected synchronized void submit() {
-    super.submit();
-    if (this.state == State.RUNNING) {
-      log.info("Running job \"" + getJobName() + "\"");
-      log.info("Job status available at: " + job.getTrackingURL());
-    } else {
-      log.info("Error occurred starting job \"" + getJobName() + "\":");
-      log.info(getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java b/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
deleted file mode 100644
index f56093b..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/exec/MRExecutor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.exec;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
-
-import com.cloudera.crunch.PipelineResult;
-import com.google.common.collect.Lists;
-
-/**
- *
- *
- */
-public class MRExecutor {
-
-  private static final Log LOG = LogFactory.getLog(MRExecutor.class);
-
-  private final CrunchJobControl control;
-
-  public MRExecutor(Class<?> jarClass) {
-    this.control = new CrunchJobControl(jarClass.toString());
-  }
-
-  public void addJob(CrunchJob job) {
-    this.control.addJob(job);
-  }
-
-  public PipelineResult execute() {
-    try {
-      Thread controlThread = new Thread(control);
-      controlThread.start();
-      while (!control.allFinished()) {
-        Thread.sleep(1000);
-      }
-      control.stop();
-    } catch (InterruptedException e) {
-      LOG.info(e);
-    }
-    List<CrunchControlledJob> failures = control.getFailedJobList();
-    if (!failures.isEmpty()) {
-      System.err.println(failures.size() + " job failure(s) occurred:");
-      for (CrunchControlledJob job : failures) {
-        System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
-      }
-    }
-    List<PipelineResult.StageResult> stages = Lists.newArrayList();
-    for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-      try {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
-      } catch (Exception e) {
-        LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
-      }
-    }
-    return new PipelineResult(stages);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
deleted file mode 100644
index e146a13..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/DoNode.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.impl.mr.run.NodeContext;
-import com.cloudera.crunch.impl.mr.run.RTNode;
-import com.cloudera.crunch.types.Converter;
-import com.cloudera.crunch.types.PGroupedTableType;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class DoNode {
-
-  private static final List<DoNode> NO_CHILDREN = ImmutableList.of();
-
-  private final DoFn fn;
-  private final String name;
-  private final PType<?> ptype;
-  private final List<DoNode> children;
-  private final Converter outputConverter;
-  private final Source<?> source;
-  private String outputName;
-
-  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children,
-      Converter outputConverter, Source<?> source) {
-    this.fn = fn;
-    this.name = name;
-    this.ptype = ptype;
-    this.children = children;
-    this.outputConverter = outputConverter;
-    this.source = source;
-  }
-
-  private static List<DoNode> allowsChildren() {
-    return Lists.newArrayList();
-  }
-
-  public static <K, V> DoNode createGroupingNode(String name,
-      PGroupedTableType<K, V> ptype) {
-    DoFn<?,?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN,
-        ptype.getGroupingConverter(), null);
-  }
-  
-  public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
-    Converter outputConverter = ptype.getConverter();
-    DoFn<?,?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN,
-        outputConverter, null);
-  }
-
-  public static DoNode createFnNode(String name, DoFn<?, ?> function,
-      PType<?> ptype) {
-    return new DoNode(function, name, ptype, allowsChildren(), null, null);
-  }
-
-  public static <S> DoNode createInputNode(Source<S> source) {
-    PType<?> ptype = source.getType();
-    DoFn<?,?> fn = ptype.getInputMapFn();
-    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null,
-        source);
-  }
-
-  public boolean isInputNode() {
-    return source != null;
-  }
-  
-  public boolean isOutputNode() {
-    return outputConverter != null;
-  }
-  
-  public String getName() {
-    return name;
-  }
-  
-  public List<DoNode> getChildren() {
-    return children;
-  }
-  
-  public Source<?> getSource() {
-    return source;
-  }
-
-  public PType<?> getPType() {
-    return ptype;
-  }
-
-  public DoNode addChild(DoNode node) {
-    if (!children.contains(node)) {
-      this.children.add(node);
-    }
-    return this;
-  }
-
-  public void setOutputName(String outputName) {
-    if (outputConverter == null) {
-      throw new IllegalStateException(
-          "Cannot set output name w/o output converter: " + outputName);
-    }
-    this.outputName = outputName;
-  }
-
-  public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) {
-    List<RTNode> childRTNodes = Lists.newArrayList();
-    fn.configure(conf);
-    for (DoNode child : children) {
-      childRTNodes.add(child.toRTNode(false, conf, nodeContext));
-    }
-
-    Converter inputConverter = null;
-    if (inputNode) {
-      if (nodeContext == NodeContext.MAP) {
-        inputConverter = ptype.getConverter();
-      } else {
-        inputConverter = ((PGroupedTableType<?,?>) ptype).getGroupingConverter();
-      }
-    }          
-    return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter,
-        outputName);
-  }
-  
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof DoNode)) {
-      return false;
-    }
-    if (this == other) {
-      return true;
-    }
-    DoNode o = (DoNode) other;
-    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source &&
-        outputConverter == o.outputConverter);
-  }
-  
-  @Override
-  public int hashCode() {
-    HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(name).append(fn).append(source)
-        .append(outputConverter).toHashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilder.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilder.java
deleted file mode 100644
index 1a1ce38..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.util.List;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-/**
- * Visitor that traverses the {@code DoNode} instances in a job and builds
- * a String that identifies the stages of the pipeline that belong to
- * this job.
- */
-public class JobNameBuilder {
-  
-  private static final Joiner JOINER = Joiner.on("+");
-  private static final Joiner CHILD_JOINER = Joiner.on("/");
-  
-  private String pipelineName;
-  List<String> rootStack = Lists.newArrayList();
-  
-  public JobNameBuilder(final String pipelineName){
-    this.pipelineName = pipelineName;
-  }
-  
-  public void visit(DoNode node) {
-    visit(node, rootStack);
-  }
-  
-  public void visit(List<DoNode> nodes) {
-    visit(nodes, rootStack);
-  }
-  
-  private void visit(List<DoNode> nodes, List<String> stack) {
-    if (nodes.size() == 1) {
-      visit(nodes.get(0), stack);
-    } else {
-      List<String> childStack = Lists.newArrayList();
-      for (int i = 0; i < nodes.size(); i++) {
-        DoNode node = nodes.get(i);
-        List<String> subStack = Lists.newArrayList();
-        visit(node, subStack);
-        if (!subStack.isEmpty()) {
-          childStack.add("[" + JOINER.join(subStack) + "]");
-        }
-      }
-      if (!childStack.isEmpty()) {
-        stack.add("[" + CHILD_JOINER.join(childStack) + "]");
-      }
-    }
-  }
-  
-  private void visit(DoNode node, List<String> stack) {
-    String name = node.getName();
-    if (!name.isEmpty()) {
-      stack.add(node.getName());
-    }
-    visit(node.getChildren(), stack);
-  }
-  
-  public String build() {
-    return String.format("%s: %s", pipelineName, JOINER.join(rootStack));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
deleted file mode 100644
index 98fa6ef..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/JobPrototype.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.impl.mr.collect.DoTableImpl;
-import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
-import com.cloudera.crunch.impl.mr.collect.PGroupedTableImpl;
-import com.cloudera.crunch.impl.mr.exec.CrunchJob;
-import com.cloudera.crunch.impl.mr.run.CrunchCombiner;
-import com.cloudera.crunch.impl.mr.run.CrunchInputFormat;
-import com.cloudera.crunch.impl.mr.run.CrunchMapper;
-import com.cloudera.crunch.impl.mr.run.CrunchReducer;
-import com.cloudera.crunch.impl.mr.run.NodeContext;
-import com.cloudera.crunch.impl.mr.run.RTNode;
-import com.cloudera.crunch.util.DistCache;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class JobPrototype {
-
-  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?,?> group,
-      Set<NodePath> inputs, Path workingPath) {
-    return new JobPrototype(inputs, group, workingPath);
-  }
-
-  public static JobPrototype createMapOnlyJob(
-      HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
-    return new JobPrototype(mapNodePaths, workingPath);
-  }
-
-  private final Set<NodePath> mapNodePaths;
-  private final PGroupedTableImpl<?,?> group;
-  private final Set<JobPrototype> dependencies = Sets.newHashSet();
-  private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
-  private final Path workingPath;
-  
-  private HashMultimap<Target, NodePath> targetsToNodePaths;
-  private DoTableImpl<?,?> combineFnTable;
-
-  private CrunchJob job;
-
-  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?,?> group,
-      Path workingPath) {
-    this.mapNodePaths = ImmutableSet.copyOf(inputs);
-    this.group = group;
-    this.workingPath = workingPath;
-    this.targetsToNodePaths = null;
-  }
-
-  private JobPrototype(HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
-    this.group = null;
-    this.mapNodePaths = null;
-    this.workingPath = workingPath;
-    this.targetsToNodePaths = outputPaths;
-  }
-
-  public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
-    if (group == null) {
-      throw new IllegalStateException(
-          "Cannot add a reduce phase to a map-only job");
-    }
-    this.targetsToNodePaths = outputPaths;
-  }
-
-  public void addDependency(JobPrototype dependency) {
-    this.dependencies.add(dependency);
-  }
-
-  public CrunchJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
-    if (job == null) {
-      job = build(jarClass, conf, pipeline);
-      for (JobPrototype proto : dependencies) {
-        job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
-      }
-    }
-    return job;
-  }
-
-  private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
-    Job job = new Job(conf);
-    conf = job.getConfiguration();
-    conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
-    job.setJarByClass(jarClass);
-    
-    Set<DoNode> outputNodes = Sets.newHashSet();
-    Set<Target> targets = targetsToNodePaths.keySet();
-    Path outputPath = new Path(workingPath, "output");
-    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath,
-        group == null);
-    for (Target target : targets) {
-      DoNode node = null;
-      for (NodePath nodePath : targetsToNodePaths.get(target)) {
-        if (node == null) {
-          PCollectionImpl<?> collect = nodePath.tail();
-          node = DoNode.createOutputNode(target.toString(), collect.getPType());
-          outputHandler.configureNode(node, target);
-        }
-        outputNodes.add(walkPath(nodePath.descendingIterator(), node));
-      }
-    }
-
-    job.setMapperClass(CrunchMapper.class);
-    List<DoNode> inputNodes;
-    DoNode reduceNode = null;
-    if (group != null) {
-      job.setReducerClass(CrunchReducer.class);
-      List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
-      serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
-      reduceNode = reduceNodes.get(0);
-
-      if (combineFnTable != null) {
-        job.setCombinerClass(CrunchCombiner.class);
-        DoNode combinerInputNode = group.createDoNode();
-        DoNode combineNode = combineFnTable.createDoNode();
-        combineNode.addChild(group.getGroupingNode());
-        combinerInputNode.addChild(combineNode);
-        serialize(ImmutableList.of(combinerInputNode), conf, workingPath,
-            NodeContext.COMBINE);
-      }
-
-      group.configureShuffle(job);
-
-      DoNode mapOutputNode = group.getGroupingNode();      
-      Set<DoNode> mapNodes = Sets.newHashSet();
-      for (NodePath nodePath : mapNodePaths) {
-        // Advance these one step, since we've already configured
-        // the grouping node, and the PGroupedTableImpl is the tail
-        // of the NodePath.
-        Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
-        iter.next();
-        mapNodes.add(walkPath(iter, mapOutputNode));
-      }
-      inputNodes = Lists.newArrayList(mapNodes);
-    } else { // No grouping
-      job.setNumReduceTasks(0);
-      inputNodes = Lists.newArrayList(outputNodes);
-    }
-    serialize(inputNodes, conf, workingPath, NodeContext.MAP);
-
-    if (inputNodes.size() == 1) {
-      DoNode inputNode = inputNodes.get(0);
-      inputNode.getSource().configureSource(job, -1);
-    } else {
-      for (int i = 0; i < inputNodes.size(); i++) {
-        DoNode inputNode = inputNodes.get(i);
-        inputNode.getSource().configureSource(job, i);
-      }
-      job.setInputFormatClass(CrunchInputFormat.class);
-    }
-    job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-    
-    return new CrunchJob(job, outputPath, outputHandler);
-  }
-
-  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath,
-      NodeContext context) throws IOException {
-    List<RTNode> rtNodes = Lists.newArrayList();
-    for (DoNode node : nodes) {
-      rtNodes.add(node.toRTNode(true, conf, context));
-    }
-    Path path = new Path(workingPath, context.toString());
-    DistCache.write(conf, path, rtNodes);
-  }
-
-  private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
-    JobNameBuilder builder = new JobNameBuilder(pipelineName);
-    builder.visit(mapNodes);
-    if (reduceNode != null) {
-      builder.visit(reduceNode);
-    }
-    return builder.build();
-  }
-  
-  private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
-    while (iter.hasNext()) {
-      PCollectionImpl<?> collect = iter.next();
-      if (combineFnTable != null &&
-          !(collect instanceof PGroupedTableImpl)) {
-        combineFnTable = null;
-      } else if (collect instanceof DoTableImpl &&
-          ((DoTableImpl<?,?>) collect).hasCombineFn()) {
-        combineFnTable = (DoTableImpl<?,?>) collect;
-      }
-      if (!nodes.containsKey(collect)) {
-        nodes.put(collect, collect.createDoNode());
-      }
-      DoNode parent = nodes.get(collect);
-      parent.addChild(working);
-      working = parent;
-    }
-    return working;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCROutputHandler.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCROutputHandler.java
deleted file mode 100644
index 6a0f9ac..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCROutputHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.io.MapReduceTarget;
-import com.cloudera.crunch.io.OutputHandler;
-import com.cloudera.crunch.io.PathTarget;
-import com.cloudera.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-public class MSCROutputHandler implements OutputHandler {
-
-  private final Job job;
-  private final Path path;
-  private final boolean mapOnlyJob;
-  
-  private DoNode workingNode;
-  private List<Path> multiPaths;
-  
-  public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
-    this.job = job;
-    this.path = outputPath;
-    this.mapOnlyJob = mapOnlyJob;
-    this.multiPaths = Lists.newArrayList();
-  }
-  
-  public void configureNode(DoNode node, Target target) {
-    workingNode = node;
-    target.accept(this, node.getPType());
-  }
-  
-  public boolean configure(Target target, PType<?> ptype) {
-    if (target instanceof MapReduceTarget && target instanceof PathTarget) {
-      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size();
-      multiPaths.add(((PathTarget) target).getPath());
-      workingNode.setOutputName(name);
-      ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
-      return true;
-    }
-    if (target instanceof MapReduceTarget) {
-      ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null);
-      return true;
-    }
-    return false;
-  }
-
-  public boolean isMapOnlyJob() {
-    return mapOnlyJob;
-  }
-  
-  public List<Path> getMultiPaths() {
-    return multiPaths;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
deleted file mode 100644
index 35d8fec..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/MSCRPlanner.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.cloudera.crunch.Source;
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.Target;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.impl.mr.collect.DoCollectionImpl;
-import com.cloudera.crunch.impl.mr.collect.DoTableImpl;
-import com.cloudera.crunch.impl.mr.collect.InputCollection;
-import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
-import com.cloudera.crunch.impl.mr.collect.PGroupedTableImpl;
-import com.cloudera.crunch.impl.mr.collect.UnionCollection;
-import com.cloudera.crunch.impl.mr.exec.MRExecutor;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-public class MSCRPlanner {
-
-  // Used to ensure that we always build pipelines starting from the deepest outputs, which
-  // helps ensure that we handle intermediate outputs correctly.
-  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
-    @Override
-    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
-      int cmp = right.getDepth() - left.getDepth();   
-      if (cmp == 0){
-          // Ensure we don't throw away two output collections at the same depth.
-          // Using the collection name would be nicer here, but names aren't necessarily unique
-          cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
-      }
-      return cmp;
-    }
-  };
-  
-  private final MRPipeline pipeline;
-  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-
-  public MSCRPlanner(MRPipeline pipeline,
-      Map<PCollectionImpl<?>, Set<Target>> outputs) {
-    this.pipeline = pipeline;
-    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
-    this.outputs.putAll(outputs);
-  }
-
-  public MRExecutor plan(Class<?> jarClass, Configuration conf)
-      throws IOException {
-    // Constructs all of the node paths, which either start w/an input
-    // or a GBK and terminate in an output collection of any type.
-    NodeVisitor visitor = new NodeVisitor();
-    for (PCollectionImpl<?> output : outputs.keySet()) {
-      visitor.visitOutput(output);
-    }
-
-    // Pull out the node paths.
-    Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths();
-
-    // Keeps track of the dependencies from collections -> jobs and then
-    // between different jobs.
-    Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
-    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies =
-        new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();
-
-    // Find the set of GBKs that DO NOT depend on any other GBK.
-    Set<PGroupedTableImpl<?,?>> workingGroupings = null;
-    while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {
-
-      for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
-        Set<NodePath> mapInputPaths = nodePaths.get(grouping);
-        JobPrototype proto = JobPrototype.createMapReduceJob(grouping,
-            mapInputPaths, pipeline.createTempPath());
-        assignments.put(grouping, proto);
-        if (jobDependencies.containsKey(grouping)) {
-          for (JobPrototype dependency : jobDependencies.get(grouping)) {
-            proto.addDependency(dependency);
-          }
-        }
-      }
-
-      Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = getDependencyPaths(
-          workingGroupings, nodePaths);
-      for (Map.Entry<PGroupedTableImpl<?,?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
-        PGroupedTableImpl<?,?> grouping = entry.getKey();
-        Set<NodePath> currentNodePaths = entry.getValue();
-
-        JobPrototype proto = assignments.get(grouping);
-        Set<NodePath> gbkPaths = Sets.newHashSet();
-        for (NodePath nodePath : currentNodePaths) {
-          PCollectionImpl<?> tail = nodePath.tail();
-          if (tail instanceof PGroupedTableImpl) {
-            gbkPaths.add(nodePath);
-            if (!jobDependencies.containsKey(tail)) {
-              jobDependencies.put(tail, Sets.<JobPrototype>newHashSet());
-            }
-            jobDependencies.get(tail).add(proto);
-          }
-        }
-
-        if (!gbkPaths.isEmpty()) {
-          handleGroupingDependencies(gbkPaths, currentNodePaths);
-        }
-
-        // At this point, all of the dependencies for the working groups will be
-        // file outputs, and so we can add them all to the JobPrototype-- we now have
-        // a complete job.
-        HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create();
-        for (NodePath nodePath : currentNodePaths) {
-          assignments.put(nodePath.tail(), proto);
-          for (Target target : outputs.get(nodePath.tail())) {
-            reduceOutputs.put(target, nodePath);
-          }
-        }
-        proto.addReducePaths(reduceOutputs);
-
-        // We've processed this GBK-- remove it from the set of nodePaths we
-        // need to process in the next step.
-        nodePaths.remove(grouping);
-      }
-    }
-
-    // Process any map-only jobs that are remaining.
-    if (!nodePaths.isEmpty()) {
-      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths
-          .entrySet()) {
-        PCollectionImpl<?> collect = entry.getKey();
-        if (!assignments.containsKey(collect)) {
-          HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
-          for (NodePath nodePath : entry.getValue()) {
-            for (Target target : outputs.get(nodePath.tail())) {
-              mapOutputs.put(target, nodePath);
-            }
-          }
-          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs,
-              pipeline.createTempPath());
-          
-          if (jobDependencies.containsKey(collect)) {
-            for (JobPrototype dependency : jobDependencies.get(collect)) {
-              proto.addDependency(dependency);
-            }
-          }
-          assignments.put(collect, proto);
-        }
-      }
-    }
-
-    MRExecutor exec = new MRExecutor(jarClass);
-    for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
-      exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
-    }
-    return exec;
-  }
-
-  private Map<PGroupedTableImpl<?,?>, Set<NodePath>> getDependencyPaths(
-      Set<PGroupedTableImpl<?,?>> workingGroupings,
-      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
-    Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
-    for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
-      dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
-    }
-
-    // Find the targets that depend on one of the elements of the current
-    // working group.
-    for (PCollectionImpl<?> target : nodePaths.keySet()) {
-      if (!workingGroupings.contains(target)) {
-        for (NodePath nodePath : nodePaths.get(target)) {
-          if (workingGroupings.contains(nodePath.head())) {
-            dependencyPaths.get(nodePath.head()).add(nodePath);
-          }
-        }
-      }
-    }
-    return dependencyPaths;
-  }
-
-  private int getSplitIndex(Set<NodePath> currentNodePaths) {
-    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
-    for (NodePath nodePath : currentNodePaths) {
-      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
-      iter.next(); // prime this past the initial NGroupedTableImpl
-      iters.add(iter);
-    }
-
-    // Find the lowest point w/the lowest cost to be the split point for
-    // all of the dependent paths.
-    boolean end = false;
-    int splitIndex = -1;
-    while (!end) {
-      splitIndex++;
-      PCollectionImpl<?> current = null;
-      for (Iterator<PCollectionImpl<?>> iter : iters) {
-        if (iter.hasNext()) {
-          PCollectionImpl<?> next = iter.next();
-          if (next instanceof PGroupedTableImpl) {
-            end = true;
-            break;
-          } else if (current == null) {
-            current = next;
-          } else if (current != next) {
-            end = true;
-            break;
-          }
-        } else {
-          end = true;
-          break;
-        }
-      }
-    }
-    // TODO: Add costing calcs here.
-    return splitIndex;
-  }
-
-  private void handleGroupingDependencies(Set<NodePath> gbkPaths,
-      Set<NodePath> currentNodePaths) throws IOException {
-    int splitIndex = getSplitIndex(currentNodePaths);
-    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next()
-        .get(splitIndex);
-    if (!outputs.containsKey(splitTarget)) {
-      outputs.put(splitTarget, Sets.<Target>newHashSet());
-    }
-    
-    SourceTarget srcTarget = null;
-    Target targetToReplace = null;
-    for (Target t : outputs.get(splitTarget)) {
-      if (t instanceof SourceTarget) {
-        srcTarget = (SourceTarget<?>) t;
-        break;
-      } else {
-        srcTarget = t.asSourceTarget(splitTarget.getPType());
-        if (srcTarget != null) {
-          targetToReplace = t;
-          break;
-        }
-      }
-    }
-    if (targetToReplace != null) {
-      outputs.get(splitTarget).remove(targetToReplace);
-    } else if (srcTarget == null) {
-      srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
-    }
-    outputs.get(splitTarget).add(srcTarget);
-    splitTarget.materializeAt(srcTarget);
-
-    PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget);
-    Set<NodePath> nextNodePaths = Sets.newHashSet();
-    for (NodePath nodePath : currentNodePaths) {
-      if (gbkPaths.contains(nodePath)) {
-    	nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
-      } else {
-    	nextNodePaths.add(nodePath);
-      }
-    }
-    currentNodePaths.clear();
-    currentNodePaths.addAll(nextNodePaths);
-  }
-
-  private Set<PGroupedTableImpl<?,?>> getWorkingGroupings(
-      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
-    Set<PGroupedTableImpl<?,?>> gbks = Sets.newHashSet();
-    for (PCollectionImpl<?> target : nodePaths.keySet()) {
-      if (target instanceof PGroupedTableImpl) {
-        boolean hasGBKDependency = false;
-        for (NodePath nodePath : nodePaths.get(target)) {
-          if (nodePath.head() instanceof PGroupedTableImpl) {
-            hasGBKDependency = true;
-            break;
-          }
-        }
-        if (!hasGBKDependency) {
-          gbks.add((PGroupedTableImpl<?,?>) target);
-        }
-      }
-    }
-    return gbks;
-  }
-
-  private static class NodeVisitor implements PCollectionImpl.Visitor {
-
-    private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths;
-    private final Map<PCollectionImpl<?>, Source<?>> inputs;
-    private PCollectionImpl<?> workingNode;
-    private NodePath workingPath;
-
-    public NodeVisitor() {
-      this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>();
-      this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>();
-    }
-
-    public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() {
-      return nodePaths;
-    }
-
-    public void visitOutput(PCollectionImpl<?> output) {
-      nodePaths.put(output, Sets.<NodePath> newHashSet());
-      workingNode = output;
-      workingPath = new NodePath();
-      output.accept(this);
-    }
-
-    @Override
-    public void visitInputCollection(InputCollection<?> collection) {
-      workingPath.close(collection);
-      inputs.put(collection, collection.getSource());
-      nodePaths.get(workingNode).add(workingPath);
-    }
-
-    @Override
-    public void visitUnionCollection(UnionCollection<?> collection) {
-      PCollectionImpl<?> baseNode = workingNode;
-      NodePath basePath = workingPath;
-      for (PCollectionImpl<?> parent : collection.getParents()) {
-        workingPath = new NodePath(basePath);
-        workingNode = baseNode;
-        processParent(parent);
-      }
-    }
-
-    @Override
-    public void visitDoFnCollection(DoCollectionImpl<?> collection) {
-      workingPath.push(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    @Override
-    public void visitDoTable(DoTableImpl<?, ?> collection) {
-      workingPath.push(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    @Override
-    public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
-      workingPath.close(collection);
-      nodePaths.get(workingNode).add(workingPath);
-      workingNode = collection;
-      nodePaths.put(workingNode, Sets.<NodePath> newHashSet());
-      workingPath = new NodePath(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    private void processParent(PCollectionImpl<?> parent) {
-      if (!nodePaths.containsKey(parent)) {
-        parent.accept(this);
-      } else {
-        workingPath.close(parent);
-        nodePaths.get(workingNode).add(workingPath);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
deleted file mode 100644
index 9a3cecb..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/NodePath.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
-import com.google.common.collect.Lists;
-
-class NodePath implements Iterable<PCollectionImpl<?>> {
-  private LinkedList<PCollectionImpl<?>> path;
-
-  public NodePath() {
-    this.path = Lists.newLinkedList();
-  }
-
-  public NodePath(PCollectionImpl<?> tail) {
-    this.path = Lists.newLinkedList();
-    this.path.add(tail);
-  }
-
-  public NodePath(NodePath other) {
-    this.path = Lists.newLinkedList(other.path);
-  }
-
-  public void push(PCollectionImpl<?> stage) {
-    this.path.push((PCollectionImpl<?>) stage);
-  }
-
-  public void close(PCollectionImpl<?> head) {
-    this.path.push(head);
-  }
-
-  public Iterator<PCollectionImpl<?>> iterator() {
-    return path.iterator();
-  }
-
-  public Iterator<PCollectionImpl<?>> descendingIterator() {
-    return path.descendingIterator();
-  }
-
-  public PCollectionImpl<?> get(int index) {
-    return path.get(index);
-  }
-
-  public PCollectionImpl<?> head() {
-    return path.peekFirst();
-  }
-
-  public PCollectionImpl<?> tail() {
-    return path.peekLast();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof NodePath)) {
-      return false;
-    }
-    NodePath nodePath = (NodePath) other;
-    return path.equals(nodePath.path);
-  }
-  
-  @Override
-  public int hashCode() {
-    return 17 + 37 * path.hashCode();
-  }
-  
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for (PCollectionImpl<?> collect : path) {
-      sb.append(collect.getName() + "|");
-    }
-    sb.deleteCharAt(sb.length() - 1);
-    return sb.toString();
-  }
-  
-  public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
-    NodePath top = new NodePath();
-    for (int i = 0; i <= splitIndex; i++) {
-      top.path.add(path.get(i));
-    }
-    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
-    nextPath.add(newHead);
-    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
-    path = nextPath;
-    return top;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/plan/PlanningParameters.java b/src/main/java/com/cloudera/crunch/impl/mr/plan/PlanningParameters.java
deleted file mode 100644
index fb3951e..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/plan/PlanningParameters.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-public class PlanningParameters {
-
-  public static final String MULTI_OUTPUT_PREFIX = "out";
-  
-  public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
-  
-  private PlanningParameters() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchCombiner.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchCombiner.java
deleted file mode 100644
index 1261188..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchCombiner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-public class CrunchCombiner extends CrunchReducer {
-
-  @Override
-  protected NodeContext getNodeContext() {
-    return NodeContext.COMBINE;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
deleted file mode 100644
index 2289462..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputFormat.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.crunch.io.impl.InputBundle;
-import com.google.common.collect.Lists;
-
-public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException,
-      InterruptedException {
-    List<InputSplit> splits = Lists.newArrayList();
-    Configuration conf = job.getConfiguration();
-    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
-
-    // First, build a map of InputFormats to Paths
-    for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
-      InputBundle inputBundle = entry.getKey();
-      Job jobCopy = new Job(conf);
-      InputFormat<?,?> format = (InputFormat<?,?>) ReflectionUtils.newInstance(
-          inputBundle.getInputFormatClass(), jobCopy.getConfiguration());
-      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue()
-          .entrySet()) {
-        Integer nodeIndex = nodeEntry.getKey();
-        List<Path> paths = nodeEntry.getValue();
-        FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
-
-        // Get splits for each input path and tag with InputFormat
-        // and Mapper types by wrapping in a TaggedInputSplit.
-        List<InputSplit> pathSplits = format.getSplits(jobCopy);
-        for (InputSplit pathSplit : pathSplits) {
-          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(),
-              inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
-        }
-      }
-    }
-    return splits;
-  }
-
-  @Override
-  public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CrunchRecordReader<K,V>(inputSplit, context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
deleted file mode 100644
index 5e1da12..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputSplit.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class CrunchInputSplit extends InputSplit implements Configurable, Writable {
-
-  private InputSplit inputSplit;
-  private Class<? extends InputFormat> inputFormatClass;
-  private Map<String, String> extraConf;
-  private int nodeIndex;
-  private Configuration conf;
-
-  public CrunchInputSplit() {
-    // default constructor
-  }
-
-  public CrunchInputSplit(InputSplit inputSplit,
-      Class<? extends InputFormat> inputFormatClass, Map<String, String> extraConf,
-      int nodeIndex, Configuration conf) {
-    this.inputSplit = inputSplit;
-    this.inputFormatClass = inputFormatClass;
-    this.extraConf = extraConf;
-    this.nodeIndex = nodeIndex;
-    this.conf = conf;
-  }
-
-  public int getNodeIndex() {
-    return nodeIndex;
-  }
-
-  public InputSplit getInputSplit() {
-    return inputSplit;
-  }
-
-  public Class<? extends InputFormat> getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  @Override
-  public long getLength() throws IOException, InterruptedException {
-    return inputSplit.getLength();
-  }
-
-  @Override
-  public String[] getLocations() throws IOException, InterruptedException {
-    return inputSplit.getLocations();
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    nodeIndex = in.readInt();
-    int extraConfSize = in.readInt();
-    if (extraConfSize > 0) {
-      for (int i = 0; i < extraConfSize; i++) {
-        conf.set(in.readUTF(), in.readUTF());
-      }
-    }
-    inputFormatClass = (Class<? extends InputFormat<?,?>>) readClass(in);
-    Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
-    inputSplit = (InputSplit) ReflectionUtils
-        .newInstance(inputSplitClass, conf);
-    SerializationFactory factory = new SerializationFactory(conf);
-    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
-    deserializer.open((DataInputStream) in);
-    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
-  }
-
-  private Class<?> readClass(DataInput in) throws IOException {
-    String className = Text.readString(in);
-    try {
-      return conf.getClassByName(className);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException("readObject can't find class", e);
-    }
-  }
-
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(nodeIndex);
-    out.writeInt(extraConf.size());
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      out.writeUTF(e.getKey());
-      out.writeUTF(e.getValue());
-    }
-    Text.writeString(out, inputFormatClass.getName());
-    Text.writeString(out, inputSplit.getClass().getName());
-    SerializationFactory factory = new SerializationFactory(conf);
-    Serializer serializer = factory.getSerializer(inputSplit.getClass());
-    serializer.open((DataOutputStream) out);
-    serializer.serialize(inputSplit);
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
deleted file mode 100644
index 2af8e8d..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchInputs.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.crunch.impl.mr.run;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import com.cloudera.crunch.io.impl.InputBundle;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class CrunchInputs {
-
-  private static final char RECORD_SEP = ',';
-  private static final char FIELD_SEP = ';';
-  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
-  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-  
-  public static void addInputPath(Job job, Path path,
-      InputBundle inputBundle, int nodeIndex) {
-    Configuration conf = job.getConfiguration();
-    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
-    String existing = conf.get(RuntimeParameters.MULTI_INPUTS);
-    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP
-        + inputs);
-  }
-
-  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(
-      JobContext job) {
-    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
-    Configuration conf = job.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) {
-      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
-      InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0));
-      if (!formatNodeMap.containsKey(inputBundle)) {
-        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
-      }
-      Integer nodeIndex = Integer.valueOf(fields.get(1));
-      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
-        formatNodeMap.get(inputBundle).put(nodeIndex,
-            Lists.<Path> newLinkedList());
-      }
-      formatNodeMap.get(inputBundle).get(nodeIndex)
-          .add(new Path(fields.get(2)));
-    }
-    return formatNodeMap;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchMapper.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchMapper.java
deleted file mode 100644
index 26a4d21..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchMapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Mapper;
-
-public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
-
-  private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
-  
-  private RTNode node;
-  private CrunchTaskContext ctxt;
-  private boolean debug;
-  
-  @Override
-  protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
-    List<RTNode> nodes;
-    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
-    try {
-      nodes = ctxt.getNodes();
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
-    }
-    if (nodes.size() == 1) {
-      this.node = nodes.get(0);
-    } else {
-      CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
-      this.node = nodes.get(split.getNodeIndex());
-    }
-    this.debug = ctxt.isDebugRun();
-  }
-
-  @Override
-  protected void map(Object k, Object v,
-      Mapper<Object, Object, Object, Object>.Context context) {
-    if (debug) {
-      try {
-        node.process(k, v);
-      } catch (Exception e) {
-        LOG.error("Mapper exception", e);
-      }
-    } else {
-      node.process(k, v);
-    }
-  }
-
-  @Override
-  protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
-    node.cleanup();
-    ctxt.cleanup();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRecordReader.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRecordReader.java
deleted file mode 100644
index 4ce5545..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRecordReader.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-class CrunchRecordReader<K, V> extends RecordReader<K, V> {
-
-  private final RecordReader<K, V> delegate;
-
-  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
-        .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf());
-    this.delegate = inputFormat.createRecordReader(
-        crunchSplit.getInputSplit(), TaskAttemptContextFactory.create(
-            crunchSplit.getConf(), context.getTaskAttemptID()));
-  }
-
-  @Override
-  public void close() throws IOException {
-    delegate.close();
-  }
-
-  @Override
-  public K getCurrentKey() throws IOException, InterruptedException {
-    return delegate.getCurrentKey();
-  }
-
-  @Override
-  public V getCurrentValue() throws IOException, InterruptedException {
-    return delegate.getCurrentValue();
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return delegate.getProgress();
-  }
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
-    InputSplit delegateSplit = crunchSplit.getInputSplit();
-    delegate.initialize(delegateSplit, TaskAttemptContextFactory.create(
-        crunchSplit.getConf(), context.getTaskAttemptID()));
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return delegate.nextKeyValue();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchReducer.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchReducer.java
deleted file mode 100644
index 15a759d..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchReducer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Reducer;
-
-public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
-
-  private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
-  
-  private RTNode node;
-  private CrunchTaskContext ctxt;
-  private boolean debug;
-  
-  protected NodeContext getNodeContext() {
-    return NodeContext.REDUCE;
-  }
-  
-  @Override
-  protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
-    this.ctxt = new CrunchTaskContext(context, getNodeContext());
-    try {
-      List<RTNode> nodes = ctxt.getNodes();
-      this.node = nodes.get(0);
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
-    }
-    this.debug = ctxt.isDebugRun();
-  }
-
-  @Override
-  protected void reduce(Object key, Iterable<Object> values,
-      Reducer<Object, Object, Object, Object>.Context context) {
-    if (debug) {
-      try {
-        node.processIterable(key, values);
-      } catch (Exception e) {
-        LOG.error("Reducer exception", e);
-      }
-    } else {
-      node.processIterable(key, values);
-    }
-  }
-
-  @Override
-  protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
-    node.cleanup();
-    ctxt.cleanup();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
deleted file mode 100644
index 68ef054..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.cloudera.crunch.impl.mr.run;
-
-public class CrunchRuntimeException extends RuntimeException {
-
-  private boolean logged = false;
-  
-  public CrunchRuntimeException(String msg) {
-    super(msg);
-  }
-  
-  public CrunchRuntimeException(Exception e) {
-    super(e);
-  }
-  
-  public CrunchRuntimeException(String msg, Exception e) {
-    super(msg, e);
-  }
-
-  public boolean wasLogged() {
-    return logged;
-  }
-  
-  public void markLogged() {
-    this.logged = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java b/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
deleted file mode 100644
index 0924268..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/CrunchTaskContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-
-import com.cloudera.crunch.impl.mr.plan.PlanningParameters;
-import com.cloudera.crunch.util.DistCache;
-
-public class CrunchTaskContext {
-
-  private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
-  private final NodeContext nodeContext;
-  private CrunchMultipleOutputs<Object, Object> multipleOutputs;
-
-  public CrunchTaskContext(
-      TaskInputOutputContext<Object, Object, Object, Object> taskContext,
-      NodeContext nodeContext) {
-    this.taskContext = taskContext;
-    this.nodeContext = nodeContext;
-  }
-
-  public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
-    return taskContext;
-  }
-
-  public NodeContext getNodeContext() {
-    return nodeContext;
-  }
-
-  public List<RTNode> getNodes() throws IOException {
-    Configuration conf = taskContext.getConfiguration();
-    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
-    @SuppressWarnings("unchecked")
-    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
-    if (nodes != null) {
-      for (RTNode node : nodes) {
-        node.initialize(this);
-      }
-    }
-    return nodes;
-  }
-  
-  public boolean isDebugRun() {
-    Configuration conf = taskContext.getConfiguration();
-    return conf.getBoolean(RuntimeParameters.DEBUG, false);
-  }
-  
-  public void cleanup() {
-    if (multipleOutputs != null) {
-      try {
-        multipleOutputs.close();
-      } catch (IOException e) {
-        throw new CrunchRuntimeException(e);
-      } catch (InterruptedException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public CrunchMultipleOutputs<Object, Object> getMultipleOutputs() {
-    if (multipleOutputs == null) {
-      multipleOutputs = new CrunchMultipleOutputs<Object, Object>(taskContext);
-    }
-    return multipleOutputs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/com/cloudera/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/impl/mr/run/NodeContext.java b/src/main/java/com/cloudera/crunch/impl/mr/run/NodeContext.java
deleted file mode 100644
index b1f8d98..0000000
--- a/src/main/java/com/cloudera/crunch/impl/mr/run/NodeContext.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.run;
-
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-
-/**
- * Enum that is associated with a serialized {@link DoNode} instance, so we know
- * how to use it within the context of a particular MR job.
- * 
- */
-public enum NodeContext {
-  MAP, REDUCE, COMBINE;
-
-  public String getConfigurationKey() {
-    return "crunch.donode." + toString().toLowerCase();
-  }
-}


Mime
View raw message