crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/9] Crunch on Spark
Date Wed, 11 Dec 2013 20:47:48 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/resources/urls.txt b/crunch-spark/src/it/resources/urls.txt
new file mode 100644
index 0000000..827e711
--- /dev/null
+++ b/crunch-spark/src/it/resources/urls.txt
@@ -0,0 +1,11 @@
+www.A.com	www.B.com
+www.A.com	www.C.com
+www.A.com	www.D.com
+www.A.com	www.E.com
+www.B.com	www.D.com
+www.B.com	www.E.com
+www.C.com	www.D.com
+www.D.com	www.B.com
+www.E.com	www.A.com
+www.F.com	www.B.com
+www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
new file mode 100644
index 0000000..c86835c
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/ByteArray.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class ByteArray implements Serializable, Comparable<ByteArray> {
+
+  public final byte[] value;
+
+  public ByteArray(byte[] value) {
+    this.value = value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || getClass() != o.getClass()) return false;
+    ByteArray byteArray = (ByteArray) o;
+    if (!Arrays.equals(value, byteArray.value)) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return value != null ? Arrays.hashCode(value) : 0;
+  }
+
+  @Override
+  public int compareTo(ByteArray other) {
+    return UnsignedBytes.lexicographicalComparator().compare(value, other.value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
new file mode 100644
index 0000000..e1cb5c7
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/CounterAccumulatorParam.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.AccumulatorParam;
+
+import java.util.Map;
+
+public class CounterAccumulatorParam implements AccumulatorParam<Map<String, Long>> {
+  @Override
+  public Map<String, Long> addAccumulator(Map<String, Long> current, Map<String, Long> added) {
+    for (Map.Entry<String, Long> e : added.entrySet()) {
+      Long cnt = current.get(e.getKey());
+      cnt = (cnt == null) ? e.getValue() : cnt + e.getValue();
+      current.put(e.getKey(), cnt);
+    }
+    return current;
+  }
+
+  @Override
+  public Map<String, Long> addInPlace(Map<String, Long> first, Map<String, Long> second) {
+    return addAccumulator(first, second);
+  }
+
+  @Override
+  public Map<String, Long> zero(Map<String, Long> counts) {
+    return Maps.newHashMap();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
new file mode 100644
index 0000000..400ae7b
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.base.Function;
+import org.apache.crunch.Pair;
+import scala.Tuple2;
+
+import javax.annotation.Nullable;
+
+public class GuavaUtils {
+  public static <K, V> Function<Tuple2<K, V>, Pair<K, V>> tuple2PairFunc() {
+    return new Function<Tuple2<K, V>, Pair<K, V>>() {
+      @Override
+      public Pair<K, V> apply(@Nullable Tuple2<K, V> kv) {
+        return kv == null ? null : Pair.of(kv._1, kv._2);
+      }
+    };
+  }
+
+  public static <K, V> Function<Pair<K, V>, Tuple2<K, V>> pair2tupleFunc() {
+    return new Function<Pair<K, V>, Tuple2<K, V>>() {
+      @Override
+      public Tuple2<K, V> apply(@Nullable Pair<K, V> kv) {
+        return kv == null ? null : new Tuple2<K, V>(kv.first(), kv.second());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
new file mode 100644
index 0000000..9af70ed
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/IntByteArray.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import java.io.Serializable;
+
+public class IntByteArray extends ByteArray implements Serializable {
+  public final int partition;
+
+  public IntByteArray(int partition, byte[] bytes) {
+    super(bytes);
+    this.partition = partition;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    IntByteArray that = (IntByteArray) o;
+    if (partition != that.partition) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * super.hashCode() + partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkCollection.java
new file mode 100644
index 0000000..6bd668d
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkCollection.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import org.apache.spark.api.java.JavaRDDLike;
+
+public interface SparkCollection {
+  JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkComparator.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkComparator.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkComparator.java
new file mode 100644
index 0000000..c87a8db
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkComparator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import org.apache.avro.mapred.AvroKeyComparator;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+
+public class SparkComparator implements Comparator<ByteArray>, Serializable {
+
+  private final Class<? extends RawComparator> cmpClass;
+  private final GroupingOptions options;
+  private final PGroupedTableType ptype;
+  private final SparkRuntimeContext ctxt;
+  private transient RawComparator<?> cmp;
+
+  public SparkComparator(GroupingOptions options,
+                         PGroupedTableType ptype,
+                         SparkRuntimeContext ctxt) {
+    if (options.getSortComparatorClass() != null) {
+      this.cmpClass = options.getSortComparatorClass();
+    } else if (AvroTypeFamily.getInstance().equals(ptype.getFamily())) {
+      this.cmpClass = AvroKeyComparator.class;
+    } else {
+      this.cmpClass = null;
+    }
+    this.options = options;
+    this.ptype = ptype;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public int compare(ByteArray s1, ByteArray s2) {
+    byte[] b1 = s1.value;
+    byte[] b2 = s2.value;
+    return rawComparator().compare(b1, 0, b1.length, b2, 0, b2.length);
+  }
+
+  private RawComparator<?> rawComparator() {
+    if (cmp == null) {
+      try {
+        ptype.initialize(ctxt.getConfiguration());
+        Job job = new Job(ctxt.getConfiguration());
+        ptype.configureShuffle(job, options);
+        if (cmpClass != null) {
+          cmp = ReflectionUtils.newInstance(cmpClass, job.getConfiguration());
+        } else {
+          cmp = WritableComparator.get(((WritableType) ptype.getTableType().getKeyType()).getSerializationClass());
+          if (cmp == null) {
+            cmp = new ByteWritable.Comparator();
+          }
+        }
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error configuring sort comparator", e);
+      }
+    }
+    return cmp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPartitioner.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPartitioner.java
new file mode 100644
index 0000000..ac0e4e1
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPartitioner.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+public class SparkPartitioner extends org.apache.spark.Partitioner {
+
+  private final int numPartitions;
+
+  public SparkPartitioner(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  @Override
+  public int numPartitions() {
+    return numPartitions;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    return ((IntByteArray) key).partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
new file mode 100644
index 0000000..674f0c8
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.crunch.CachingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.Map;
+
+public class SparkPipeline extends DistributedPipeline {
+
+  private final String sparkConnect;
+  private JavaSparkContext sparkContext;
+  private final Map<PCollection<?>, StorageLevel> cachedCollections = Maps.newHashMap();
+
+  public SparkPipeline(String sparkConnect, String appName) {
+    super(appName, new Configuration(), new SparkCollectFactory());
+    this.sparkConnect = Preconditions.checkNotNull(sparkConnect);
+  }
+
+  public SparkPipeline(JavaSparkContext sparkContext, String appName) {
+    super(appName, new Configuration(), new SparkCollectFactory());
+    this.sparkContext = Preconditions.checkNotNull(sparkContext);
+    this.sparkConnect = sparkContext.getSparkHome().orNull();
+  }
+
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+    ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollection);
+    MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc);
+    if (!outputTargetsToMaterialize.containsKey(pcollection)) {
+      outputTargetsToMaterialize.put((PCollectionImpl) pcollection, c);
+    }
+    return c;
+  }
+
+  @Override
+  public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
+    cachedCollections.put(pcollection, toStorageLevel(options));
+  }
+
+  private StorageLevel toStorageLevel(CachingOptions options) {
+    return StorageLevel.apply(
+        options.useDisk(),
+        options.useMemory(),
+        options.deserialized(),
+        options.replicas());
+  }
+
+  @Override
+  public PipelineResult run() {
+    try {
+      PipelineExecution exec = runAsync();
+      exec.waitUntilDone();
+      return exec.getResult();
+    } catch (Exception e) {
+      // TODO: How to handle this without changing signature?
+      // LOG.error("Exception running pipeline", e);
+      return PipelineResult.EMPTY;
+    }
+  }
+
+  @Override
+  public PipelineExecution runAsync() {
+    Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize = Maps.newHashMap();
+    for (PCollectionImpl<?> c : outputTargets.keySet()) {
+      if (outputTargetsToMaterialize.containsKey(c)) {
+        toMaterialize.put(c, outputTargetsToMaterialize.get(c));
+        outputTargetsToMaterialize.remove(c);
+      }
+    }
+    if (sparkContext == null) {
+      this.sparkContext = new JavaSparkContext(sparkConnect, getName());
+    }
+    SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize,
+        cachedCollections);
+    runtime.execute();
+    outputTargets.clear();
+    return runtime;
+  }
+
+  @Override
+  public PipelineResult done() {
+    PipelineResult res = super.done();
+    if (sparkContext != null) {
+      sparkContext.stop();
+      sparkContext = null;
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
new file mode 100644
index 0000000..99268cc
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.fn.MapFunction;
+import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
+import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SparkRuntime extends AbstractFuture<PipelineResult> implements PipelineExecution {
+
+  private SparkPipeline pipeline;
+  private JavaSparkContext sparkContext;
+  private Configuration conf;
+  private CombineFn combineFn;
+  private SparkRuntimeContext ctxt;
+  private Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private Map<PCollection<?>, StorageLevel> toCache;
+  private final CountDownLatch doneSignal = new CountDownLatch(1);
+  private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
+  private PipelineResult result;
+  private boolean started;
+  private Thread monitorThread;
+
+  // Note that this is the oppposite of the MR sort
+  static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
+    @Override
+    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
+      int cmp = left.getDepth() - right.getDepth();
+      if (cmp == 0) {
+        // Ensure we don't throw away two output collections at the same depth.
+        // Using the collection name would be nicer here, but names aren't
+        // necessarily unique.
+        cmp = new Integer(left.hashCode()).compareTo(right.hashCode());
+      }
+      return cmp;
+    }
+  };
+
+  public SparkRuntime(SparkPipeline pipeline,
+                      JavaSparkContext sparkContext,
+                      Configuration conf,
+                      Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+                      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
+                      Map<PCollection<?>, StorageLevel> toCache) {
+    this.pipeline = pipeline;
+    this.sparkContext = sparkContext;
+    this.conf = conf;
+    this.ctxt = new SparkRuntimeContext(
+        sparkContext.broadcast(conf),
+        sparkContext.accumulator(Maps.<String, Long>newHashMap(), new CounterAccumulatorParam()));
+    this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
+    this.outputTargets.putAll(outputTargets);
+    this.toMaterialize = toMaterialize;
+    this.toCache = toCache;
+    this.status.set(Status.READY);
+    this.monitorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        monitorLoop();
+      }
+    });
+  }
+
+  public void setCombineFn(CombineFn combineFn) {
+    this.combineFn = combineFn;
+  }
+
+  public CombineFn getCombineFn() {
+    CombineFn ret = combineFn;
+    this.combineFn = null;
+    return ret;
+  }
+
+  private void distributeFiles() {
+    try {
+      URI[] uris = DistributedCache.getCacheFiles(conf);
+      if (uris != null) {
+        URI[] outURIs = new URI[uris.length];
+        for (int i = 0; i < uris.length; i++) {
+          Path path = new Path(uris[i]);
+          FileSystem fs = path.getFileSystem(conf);
+          if (fs.isFile(path)) {
+            outURIs[i] = uris[i];
+          } else {
+            Path mergePath = new Path(path.getParent(), "sparkreadable-" + path.getName());
+            FileUtil.copyMerge(fs, path, fs, mergePath, false, conf, "");
+            outURIs[i] = mergePath.toUri();
+          }
+          sparkContext.addFile(outURIs[i].toString());
+        }
+        DistributedCache.setCacheFiles(outURIs, conf);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error retrieving cache files", e);
+    }
+  }
+
+  public synchronized SparkRuntime execute() {
+    if (!started) {
+      monitorThread.start();
+      started = true;
+    }
+    return this;
+  }
+
+  public JavaSparkContext getSparkContext() {
+    return sparkContext;
+  }
+
+  public SparkRuntimeContext getRuntimeContext() {
+    return ctxt;
+  }
+
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  public boolean isValid(JavaRDDLike<?, ?> rdd) {
+    return (rdd != null); //TODO: support multi-contexts
+  }
+
+  public StorageLevel getStorageLevel(PCollection<?> pcollection) {
+    return toCache.get(pcollection);
+  }
+
+  @Override
+  public String getPlanDotFile() {
+    return "";
+  }
+
+  @Override
+  public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+    doneSignal.await(timeout, timeUnit);
+  }
+
+  @Override
+  public void waitUntilDone() throws InterruptedException {
+    doneSignal.await();
+  }
+
+  private void monitorLoop() {
+    status.set(Status.RUNNING);
+    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR);
+    for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
+      targetDeps.put(pcollect, pcollect.getTargetDependencies());
+    }
+
+    while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) {
+      Set<Target> allTargets = Sets.newHashSet();
+      for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+        allTargets.addAll(outputTargets.get(pcollect));
+      }
+
+      Map<PCollectionImpl<?>, JavaRDDLike<?, ?>> pcolToRdd = Maps.newTreeMap(DEPTH_COMPARATOR);
+      for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+        if (Sets.intersection(allTargets, targetDeps.get(pcollect)).isEmpty()) {
+          JavaRDDLike<?, ?> rdd = ((SparkCollection) pcollect).getJavaRDDLike(this);
+          pcolToRdd.put(pcollect, rdd);
+        }
+      }
+      distributeFiles();
+      for (Map.Entry<PCollectionImpl<?>, JavaRDDLike<?, ?>> e : pcolToRdd.entrySet()) {
+        JavaRDDLike<?, ?> rdd = e.getValue();
+        PType<?> ptype = e.getKey().getPType();
+        Set<Target> targets = outputTargets.get(e.getKey());
+        if (targets.size() > 1) {
+          rdd.rdd().cache();
+        }
+        for (Target t : targets) {
+          Configuration conf = new Configuration(getConfiguration());
+          if (t instanceof MapReduceTarget) { //TODO: check this earlier
+            Converter c = t.getConverter(ptype);
+            JavaPairRDD<?, ?> outRDD;
+            if (rdd instanceof JavaRDD) {
+              outRDD = ((JavaRDD) rdd)
+                  .map(new MapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new OutputConverterFunction(c));
+            } else {
+              outRDD = ((JavaPairRDD) rdd)
+                  .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new OutputConverterFunction(c));
+            }
+
+            try {
+              Job job = new Job(conf);
+              if (t instanceof PathTarget) {
+                PathTarget pt = (PathTarget) t;
+                pt.configureForMapReduce(job, ptype, pt.getPath(), null);
+                Path tmpPath = pipeline.createTempPath();
+                outRDD.saveAsNewAPIHadoopFile(
+                    tmpPath.toString(),
+                    c.getKeyClass(),
+                    c.getValueClass(),
+                    job.getOutputFormatClass(),
+                    job.getConfiguration());
+                pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
+              } else if (t instanceof MapReduceTarget) {
+                MapReduceTarget mrt = (MapReduceTarget) t;
+                mrt.configureForMapReduce(job, ptype, new Path("/tmp"), null);
+                outRDD.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
+              } else {
+                throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t);
+              }
+            } catch (Exception et) {
+              et.printStackTrace();
+              status.set(Status.FAILED);
+              set(PipelineResult.EMPTY);
+              doneSignal.countDown();
+            }
+          }
+        }
+      }
+      for (PCollectionImpl<?> output : pcolToRdd.keySet()) {
+        if (toMaterialize.containsKey(output)) {
+          MaterializableIterable mi = toMaterialize.get(output);
+          if (mi.isSourceTarget()) {
+            output.materializeAt((SourceTarget) mi.getSource());
+          }
+        }
+        targetDeps.remove(output);
+      }
+    }
+    if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
+      status.set(Status.SUCCEEDED);
+      result = new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("Spark", null)),
+          Status.SUCCEEDED);
+      set(result);
+    } else {
+      set(PipelineResult.EMPTY);
+    }
+    doneSignal.countDown();
+  }
+
+  @Override
+  public PipelineResult get() throws InterruptedException, ExecutionException {
+    if (getStatus() == Status.READY) {
+      execute();
+    }
+    return super.get();
+  }
+
+  @Override
+  public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
+      ExecutionException {
+    if (getStatus() == Status.READY) {
+      execute();
+    }
+    return super.get(timeout, unit);
+  }
+
+  @Override
+  public Status getStatus() {
+    return status.get();
+  }
+
+  @Override
+  public PipelineResult getResult() {
+    return result;
+  }
+
+  @Override
+  public void kill() throws InterruptedException {
+    if (started) {
+      sparkContext.stop();
+      set(PipelineResult.EMPTY);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
new file mode 100644
index 0000000..92336a3
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.spark.Accumulator;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.broadcast.Broadcast;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SparkRuntimeContext implements Serializable {
+
+  private Broadcast<Configuration> broadConf;
+  private Accumulator<Map<String, Long>> counters;
+  private transient TaskInputOutputContext context;
+
+  public SparkRuntimeContext(
+      Broadcast<Configuration> broadConf,
+      Accumulator<Map<String, Long>> counters) {
+    this.broadConf = broadConf;
+    this.counters = counters;
+  }
+
+  public void initialize(DoFn<?, ?> fn) {
+    if (context == null) {
+      configureLocalFiles();
+      context = getTaskIOContext(broadConf, counters);
+    }
+    fn.setContext(context);
+    fn.initialize();
+  }
+
+  private void configureLocalFiles() {
+    try {
+      URI[] uris = DistributedCache.getCacheFiles(getConfiguration());
+      if (uris != null) {
+        List<String> allFiles = Lists.newArrayList();
+        for (URI uri : uris) {
+          File f = new File(uri.getPath());
+          allFiles.add(SparkFiles.get(f.getName()));
+        }
+        DistributedCache.setLocalFiles(getConfiguration(), Joiner.on(',').join(allFiles));
+      }
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  public Configuration getConfiguration() {
+    return broadConf.value();
+  }
+
+  public static TaskInputOutputContext getTaskIOContext(
+      final Broadcast<Configuration> conf,
+      final Accumulator<Map<String, Long>> counters) {
+    ProxyFactory factory = new ProxyFactory();
+    Class<TaskInputOutputContext> superType = TaskInputOutputContext.class;
+    Class[] types = new Class[0];
+    Object[] args = new Object[0];
+    final TaskAttemptID taskAttemptId = new TaskAttemptID();
+    if (superType.isInterface()) {
+      factory.setInterfaces(new Class[] { superType });
+    } else {
+      types = new Class[] { Configuration.class, TaskAttemptID.class, RecordWriter.class, OutputCommitter.class,
+          StatusReporter.class };
+      args = new Object[] { conf.value(), taskAttemptId, null, null, null };
+      factory.setSuperclass(superType);
+    }
+
+    final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter",
+        "progress", "getTaskAttemptID");
+    factory.setFilter(new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        return handledMethods.contains(m.getName());
+      }
+    });
+    MethodHandler handler = new MethodHandler() {
+      @Override
+      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
+        String name = m.getName();
+        if ("getConfiguration".equals(name)) {
+          return conf.value();
+        } else if ("progress".equals(name)) {
+          // no-op
+          return null;
+        } else if ("getTaskAttemptID".equals(name)) {
+          return taskAttemptId;
+        } else if ("getCounter".equals(name)){ // getCounter
+          if (args.length == 1) {
+            return getCounter(counters, args[0].getClass().getName(), ((Enum) args[0]).name());
+          } else {
+            return getCounter(counters, (String) args[0], (String) args[1]);
+          }
+        } else {
+          throw new IllegalStateException("Unhandled method " + name);
+        }
+      }
+    };
+
+    try {
+      Object newInstance = factory.create(types, args, handler);
+      return (TaskInputOutputContext<?, ?, ?, ?>) newInstance;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static Counter getCounter(final Accumulator<Map<String, Long>> accum, final String group,
+                                    final String counterName) {
+    ProxyFactory factory = new ProxyFactory();
+    Class<Counter> superType = Counter.class;
+    Class[] types = new Class[0];
+    Object[] args = new Object[0];
+    if (superType.isInterface()) {
+      factory.setInterfaces(new Class[] { superType });
+    } else {
+      types = new Class[] { String.class, String.class };
+      args = new Object[] { group, counterName };
+      factory.setSuperclass(superType);
+    }
+
+    final Set<String> handledMethods = ImmutableSet.of("getDisplayName", "getName",
+        "getValue", "increment", "setValue", "setDisplayName");
+    factory.setFilter(new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        return handledMethods.contains(m.getName());
+      }
+    });
+    MethodHandler handler = new MethodHandler() {
+      @Override
+      public Object invoke(Object arg0, Method m, Method arg2, Object[] args) throws Throwable {
+        String name = m.getName();
+        if ("increment".equals(name)) {
+          accum.add(ImmutableMap.of(group + ":" + counterName, (Long) args[0]));
+          return null;
+        } else if ("getDisplayName".equals(name)) {
+          return counterName;
+        } else if ("getName".equals(name)) {
+          return counterName;
+        } else if ("setDisplayName".equals(name)) {
+          // No-op
+          return null;
+        } else if ("setValue".equals(name)) {
+          throw new UnsupportedOperationException("Cannot set counter values in Spark, only increment them");
+        } else if ("getValue".equals(name)) {
+          throw new UnsupportedOperationException("Cannot read counters during Spark execution");
+        } else {
+          throw new IllegalStateException("Unhandled method " + name);
+        }
+      }
+    };
+    try {
+      Object newInstance = factory.create(types, args, handler);
+      return (Counter) newInstance;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
new file mode 100644
index 0000000..72888b8
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.impl.dist.collect.BaseDoCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.FlatMapDoFn;
+import org.apache.crunch.impl.spark.fn.FlatMapPairDoFn;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.storage.StorageLevel;
+
+public class DoCollection<S> extends BaseDoCollection<S> implements SparkCollection {
+
+  private JavaRDDLike<?, ?> rdd;
+
+  <T> DoCollection(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype,
+                   ParallelDoOptions options) {
+    super(name, parent, fn, ntype, options);
+  }
+
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime);
+    fn.configure(runtime.getConfiguration());
+    if (parentRDD instanceof JavaRDD) {
+      return ((JavaRDD) parentRDD).mapPartitions(new FlatMapDoFn(fn, runtime.getRuntimeContext()));
+    } else {
+      return ((JavaPairRDD) parentRDD).mapPartitions(new FlatMapPairDoFn(fn, runtime.getRuntimeContext()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
new file mode 100644
index 0000000..c0e4bb1
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.impl.dist.collect.BaseDoTable;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.PairFlatMapDoFn;
+import org.apache.crunch.impl.spark.fn.PairFlatMapPairDoFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.storage.StorageLevel;
+
+public class DoTable<K, V> extends BaseDoTable<K, V> implements SparkCollection {
+
+  private JavaRDDLike<?, ?> rdd;
+
+  <S> DoTable(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
+              ParallelDoOptions options) {
+    super(name, parent, fn, ntype, options);
+  }
+
+  <S> DoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      CombineFn<K, V> combineFn,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> ntype) {
+    super(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
+  }
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    if (combineFn instanceof CombineFn && getOnlyParent() instanceof PGroupedTableImpl) {
+      runtime.setCombineFn((CombineFn) combineFn);
+    }
+    JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime);
+    fn.configure(runtime.getConfiguration());
+    if (parentRDD instanceof JavaRDD) {
+      return ((JavaRDD) parentRDD).mapPartitions(new PairFlatMapDoFn(fn, runtime.getRuntimeContext()));
+    } else {
+      return ((JavaPairRDD) parentRDD).mapPartitions(new PairFlatMapPairDoFn(fn, runtime.getRuntimeContext()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
new file mode 100644
index 0000000..0d1d5e0
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.InputConverterFunction;
+import org.apache.crunch.impl.spark.fn.MapFunction;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+
+import java.io.IOException;
+
+public class InputCollection<S> extends BaseInputCollection<S> implements SparkCollection {
+
+  InputCollection(Source<S> source, DistributedPipeline pipeline) {
+    super(source, pipeline);
+  }
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    try {
+      Job job = new Job(runtime.getConfiguration());
+      FileInputFormat.addInputPaths(job, "/tmp"); //placeholder
+      source.configureSource(job, -1);
+      JavaPairRDD<?, ?> input = runtime.getSparkContext().newAPIHadoopRDD(
+          job.getConfiguration(),
+          CrunchInputFormat.class,
+          source.getConverter().getKeyClass(),
+          source.getConverter().getValueClass());
+      input.rdd().setName(source.toString());
+      return input
+          .map(new InputConverterFunction(source.getConverter()))
+          .map(new MapFunction(source.getType().getInputMapFn(), runtime.getRuntimeContext()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
new file mode 100644
index 0000000..7e8471c
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.BaseInputTable;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.api.java.JavaRDDLike;
+
+import java.io.IOException;
+
+public class InputTable<K, V> extends BaseInputTable<K, V> implements SparkCollection {
+
+  public InputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
+    super(source, pipeline);
+  }
+
+  @Override
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    try {
+      Job job = new Job(runtime.getConfiguration());
+      source.configureSource(job, -1); // TODO: a custom input format for crunch-spark
+      return runtime.getSparkContext().newAPIHadoopRDD(
+          job.getConfiguration(),
+          CrunchInputFormat.class,
+          source.getConverter().getKeyClass(),
+          source.getConverter().getValueClass());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
new file mode 100644
index 0000000..95811cf
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.PTableBase;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkComparator;
+import org.apache.crunch.impl.spark.SparkPartitioner;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.CombineMapsideFunction;
+import org.apache.crunch.impl.spark.fn.MapOutputFunction;
+import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.impl.spark.fn.PairMapIterableFunction;
+import org.apache.crunch.impl.spark.fn.PartitionedMapOutputFunction;
+import org.apache.crunch.impl.spark.fn.ReduceGroupingFunction;
+import org.apache.crunch.impl.spark.fn.ReduceInputFunction;
+import org.apache.crunch.impl.spark.serde.AvroSerDe;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.crunch.impl.spark.serde.WritableSerDe;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.WritableType;
+import org.apache.crunch.util.PartitionUtils;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.List;
+
+public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements SparkCollection {
+
+  private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
+
+  private JavaRDDLike<?, ?> rdd;
+
+  PGroupedTableImpl(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
+    super(parent, groupingOptions);
+  }
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime, runtime.getCombineFn());
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime, CombineFn<K, V> combineFn) {
+    JavaPairRDD<K, V> parentRDD = (JavaPairRDD<K, V>) ((SparkCollection)getOnlyParent()).getJavaRDDLike(runtime);
+    if (combineFn != null) {
+      parentRDD = parentRDD.mapPartitions(new CombineMapsideFunction<K, V>(combineFn, runtime.getRuntimeContext()));
+    }
+    SerDe keySerde, valueSerde;
+    PTableType<K, V> parentType = ptype.getTableType();
+    if (parentType instanceof AvroType) {
+      keySerde = new AvroSerDe((AvroType) parentType.getKeyType());
+      valueSerde = new AvroSerDe((AvroType) parentType.getValueType());
+    } else {
+      keySerde = new WritableSerDe(((WritableType) parentType.getKeyType()).getSerializationClass());
+      valueSerde = new WritableSerDe(((WritableType) parentType.getValueType()).getSerializationClass());
+    }
+
+    int numPartitions = (groupingOptions.getNumReducers() > 0) ? groupingOptions.getNumReducers() :
+        PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
+    if (numPartitions <= 0) {
+      LOG.warn("Attempted to set a non-positive number of partitions");
+      numPartitions = 1;
+    }
+
+    JavaPairRDD<ByteArray, List<byte[]>> groupedRDD;
+    if (groupingOptions.getPartitionerClass() != null) {
+      groupedRDD = parentRDD
+          .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext()))
+          .map(new PartitionedMapOutputFunction(keySerde, valueSerde, ptype, groupingOptions.getPartitionerClass(),
+              numPartitions, runtime.getRuntimeContext()))
+          .groupByKey(new SparkPartitioner(numPartitions));
+    } else {
+      groupedRDD = parentRDD
+          .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext()))
+          .map(new MapOutputFunction(keySerde, valueSerde))
+          .groupByKey(numPartitions);
+    }
+
+    if (groupingOptions.requireSortedKeys() || groupingOptions.getSortComparatorClass() != null) {
+      SparkComparator scmp = new SparkComparator(groupingOptions, ptype, runtime.getRuntimeContext());
+      groupedRDD = groupedRDD.sortByKey(scmp);
+    }
+    if (groupingOptions.getGroupingComparatorClass() != null) {
+      groupedRDD = groupedRDD.mapPartitions(
+          new ReduceGroupingFunction(groupingOptions, ptype, runtime.getRuntimeContext()));
+    }
+
+    return groupedRDD
+        .map(new ReduceInputFunction(keySerde, valueSerde))
+        .map(new PairMapIterableFunction(ptype.getInputMapFn(), runtime.getRuntimeContext()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
new file mode 100644
index 0000000..389d91c
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/SparkCollectFactory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.BaseDoCollection;
+import org.apache.crunch.impl.dist.collect.BaseDoTable;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.BaseInputTable;
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionFactory;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PTableBase;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class SparkCollectFactory implements PCollectionFactory {
+
+  @Override
+  public <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline pipeline) {
+    return new InputCollection<S>(source, pipeline);
+  }
+
+  @Override
+  public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
+    return new InputTable<K, V>(source, pipeline);
+  }
+
+  @Override
+  public <S> BaseUnionCollection<S> createUnionCollection(List<? extends PCollectionImpl<S>> internal) {
+    return new UnionCollection<S>(internal);
+  }
+
+  @Override
+  public <S, T> BaseDoCollection<T> createDoCollection(
+      String name,
+      PCollectionImpl<S> parent,
+      DoFn<S, T> fn,
+      PType<T> type,
+      ParallelDoOptions options) {
+    return new DoCollection<T>(name,  parent, fn, type, options);
+  }
+
+  @Override
+  public <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> type,
+      ParallelDoOptions options) {
+    return new DoTable<K, V>(name, parent, fn, type, options);
+  }
+
+  @Override
+  public <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      CombineFn<K, V> combineFn,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> type) {
+    return new DoTable<K, V>(name, parent, combineFn, fn, type);
+  }
+
+  @Override
+  public <K, V> BaseGroupedTable<K, V> createGroupedTable(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
+    return new PGroupedTableImpl<K, V>(parent, groupingOptions);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> createUnionTable(List<PTableBase<K, V>> internal) {
+    return new UnionTable<K, V>(internal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
new file mode 100644
index 0000000..4761754
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.IntByteArray;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class ToByteArrayFunction extends PairFunction<Tuple2<IntByteArray, List<byte[]>>, ByteArray, List<byte[]>> {
+  @Override
+  public Tuple2<ByteArray, List<byte[]>> call(Tuple2<IntByteArray, List<byte[]>> t) throws Exception {
+    return new Tuple2<ByteArray, List<byte[]>>(t._1, t._2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
new file mode 100644
index 0000000..4e8b25a
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionCollection.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.List;
+
+public class UnionCollection<S> extends BaseUnionCollection<S> implements SparkCollection {
+
+  private JavaRDDLike<?, ?> rdd;
+
+  UnionCollection(List<? extends PCollectionImpl<S>> collections) {
+    super(collections);
+  }
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    List<PCollectionImpl<?>> parents = getParents();
+    JavaRDD[] rdds = new JavaRDD[parents.size()];
+    for (int i = 0; i < rdds.length; i++) {
+      rdds[i] = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+    }
+    return runtime.getSparkContext().union(rdds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
new file mode 100644
index 0000000..867a95d
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PTableBase;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.storage.StorageLevel;
+
+import java.util.List;
+
+public class UnionTable<K, V> extends BaseUnionTable<K, V> implements SparkCollection {
+
+  private JavaRDDLike<?, ?> rdd;
+
+  UnionTable(List<PTableBase<K, V>> tables) {
+    super(tables);
+  }
+
+  public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+    if (!runtime.isValid(rdd)) {
+      rdd = getJavaRDDLikeInternal(runtime);
+      rdd.rdd().setName(getName());
+      StorageLevel sl = runtime.getStorageLevel(this);
+      if (sl != null) {
+        rdd.rdd().persist(sl);
+      }
+    }
+    return rdd;
+  }
+
+  private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) {
+    List<PCollectionImpl<?>> parents = getParents();
+    JavaPairRDD[] rdds = new JavaPairRDD[parents.size()];
+    for (int i = 0; i < rdds.length; i++) {
+      rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
+    }
+    return runtime.getSparkContext().union(rdds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
new file mode 100644
index 0000000..3600f16
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class CombineMapsideFunction<K, V> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K, V> {
+
+  private static final int REDUCE_EVERY_N = 50000;
+
+  private final CombineFn<K,V> combineFn;
+  private final SparkRuntimeContext ctxt;
+
+  public CombineMapsideFunction(CombineFn<K, V> combineFn, SparkRuntimeContext ctxt) {
+    this.combineFn = combineFn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
+    ctxt.initialize(combineFn);
+    Multimap<K, V> cache = HashMultimap.create();
+    int cnt = 0;
+    while (iter.hasNext()) {
+      Tuple2<K, V> t = iter.next();
+      cache.put(t._1, t._2);
+      cnt++;
+      if (cnt % REDUCE_EVERY_N == 0) {
+        cache = reduce(cache);
+      }
+    }
+
+    return Iterables.transform(reduce(cache).entries(), new Function<Map.Entry<K, V>, Tuple2<K, V>>() {
+      @Override
+      public Tuple2<K, V> apply(Map.Entry<K, V> input) {
+        return new Tuple2<K, V>(input.getKey(), input.getValue());
+      }
+    });
+  }
+
+  private Multimap<K, V> reduce(Multimap<K, V> cache) {
+    Set<K> keys = cache.keySet();
+    Multimap<K, V> res = HashMultimap.create(keys.size(), keys.size());
+    for (K key : keys) {
+      for (Pair<K, V> p : reduce(key, cache.get(key))) {
+        res.put(p.first(), p.second());
+      }
+    }
+    return res;
+  }
+
+  private List<Pair<K, V>> reduce(K key, Iterable<V> values) {
+    InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
+    combineFn.process(Pair.of(key, values), emitter);
+    combineFn.cleanup(emitter);
+    return emitter.getOutput();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchIterable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchIterable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchIterable.java
new file mode 100644
index 0000000..f5ccf87
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchIterable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.util.DoFnIterator;
+
+import java.util.Iterator;
+
+public class CrunchIterable<S, T> implements Iterable<T> {
+  private final DoFn<S, T> fn;
+  private final Iterator<S> input;
+
+  public CrunchIterable(DoFn<S, T> fn, Iterator<S> input) {
+    this.fn = fn;
+    this.input = input;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new DoFnIterator<S, T>(input, fn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
new file mode 100644
index 0000000..cfb6d42
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+import java.util.Iterator;
+
+public class FlatMapDoFn<S, T> extends FlatMapFunction<Iterator<S>, T> {
+  private final DoFn<S, T> fn;
+  private final SparkRuntimeContext ctxt;
+
+  public FlatMapDoFn(DoFn<S, T> fn, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<T> call(Iterator<S> input) throws Exception {
+    ctxt.initialize(fn);
+    return new CrunchIterable<S, T>(fn, input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
new file mode 100644
index 0000000..30aef83
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Iterators;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.GuavaUtils;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+
+public class FlatMapPairDoFn<K, V, T> extends FlatMapFunction<Iterator<Tuple2<K, V>>, T> {
+  private final DoFn<Pair<K, V>, T> fn;
+  private final SparkRuntimeContext ctxt;
+
+  public FlatMapPairDoFn(DoFn<Pair<K, V>, T> fn, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
+    ctxt.initialize(fn);
+    return new CrunchIterable<Pair<K, V>, T>(fn,
+        Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
new file mode 100644
index 0000000..52869a4
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.types.Converter;
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+public class InputConverterFunction<K, V, S> extends Function<Tuple2<K, V>, S> {
+  private Converter<K, V, S, ?> converter;
+
+  public InputConverterFunction(Converter<K, V, S, ?> converter) {
+    this.converter = converter;
+  }
+
+  @Override
+  public S call(Tuple2<K, V> kv) throws Exception {
+    return converter.convertInput(kv._1, kv._2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
new file mode 100644
index 0000000..b08aaeb
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.Function;
+
+public class MapFunction extends Function<Object, Object> {
+  private final MapFn fn;
+  private final SparkRuntimeContext ctxt;
+  private boolean initialized;
+
+  public MapFunction(MapFn fn, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Object call(Object o) throws Exception {
+    if (!initialized) {
+      ctxt.initialize(fn);
+      initialized = true;
+    }
+    return fn.map(o);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
new file mode 100644
index 0000000..47ef752
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+public class MapOutputFunction<K, V> extends PairFunction<Pair<K, V>, ByteArray, byte[]> {
+
+  private final SerDe keySerde;
+  private final SerDe valueSerde;
+
+  public MapOutputFunction(SerDe keySerde, SerDe valueSerde) {
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+  }
+
+  @Override
+  public Tuple2<ByteArray, byte[]> call(Pair<K, V> p) throws Exception {
+    return new Tuple2<ByteArray, byte[]>(
+        new ByteArray(keySerde.toBytes(p.first())),
+        valueSerde.toBytes(p.second()));
+  }
+}


Mime
View raw message