crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-410: Spark 1.0.0 updates.
Date Sun, 24 Aug 2014 20:17:25 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 2d20e7772 -> 435465870


CRUNCH-410: Spark 1.0.0 updates.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/43546587
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/43546587
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/43546587

Branch: refs/heads/master
Commit: 4354658703c50f9506b30e6f0768b4a473817807
Parents: 2d20e77
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jun 2 13:53:43 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Aug 24 12:54:14 2014 -0700

----------------------------------------------------------------------
 crunch-spark/pom.xml                            |  5 +
 .../org/apache/crunch/SparkTaskAttemptIT.java   | 75 +++++++++++++++
 .../crunch/fn/SDoubleFlatMapFunction.java       | 40 ++++++++
 .../org/apache/crunch/fn/SDoubleFunction.java   | 36 +++++++
 .../org/apache/crunch/fn/SFlatMapFunction.java  | 40 ++++++++
 .../org/apache/crunch/fn/SFlatMapFunction2.java | 42 +++++++++
 .../java/org/apache/crunch/fn/SFunction.java    | 36 +++++++
 .../java/org/apache/crunch/fn/SFunction2.java   | 38 ++++++++
 .../java/org/apache/crunch/fn/SFunctions.java   | 99 ++++++++++++++++++++
 .../apache/crunch/fn/SPairFlatMapFunction.java  | 43 +++++++++
 .../org/apache/crunch/fn/SPairFunction.java     | 40 ++++++++
 .../java/org/apache/crunch/fn/SparkDoFn.java    | 33 +++++++
 .../java/org/apache/crunch/fn/SparkMapFn.java   | 33 +++++++
 .../apache/crunch/impl/spark/SparkRuntime.java  | 10 +-
 .../crunch/impl/spark/SparkRuntimeContext.java  | 23 ++++-
 .../crunch/impl/spark/collect/DoCollection.java | 12 +--
 .../crunch/impl/spark/collect/DoTable.java      | 15 ++-
 .../impl/spark/collect/PGroupedTableImpl.java   | 12 ++-
 .../impl/spark/collect/ToByteArrayFunction.java |  4 +-
 .../crunch/impl/spark/collect/UnionTable.java   |  2 +-
 .../impl/spark/fn/CombineMapsideFunction.java   |  4 +-
 .../crunch/impl/spark/fn/CrunchPairTuple2.java  | 38 ++++++++
 .../crunch/impl/spark/fn/FlatMapDoFn.java       | 41 --------
 .../crunch/impl/spark/fn/FlatMapIndexFn.java    | 50 ++++++++++
 .../crunch/impl/spark/fn/FlatMapPairDoFn.java   |  4 +-
 .../impl/spark/fn/InputConverterFunction.java   |  2 +-
 .../crunch/impl/spark/fn/MapFunction.java       |  4 +-
 .../crunch/impl/spark/fn/MapOutputFunction.java |  2 +-
 .../impl/spark/fn/OutputConverterFunction.java  |  2 +-
 .../crunch/impl/spark/fn/PairFlatMapDoFn.java   |  4 +-
 .../impl/spark/fn/PairFlatMapPairDoFn.java      | 49 ----------
 .../crunch/impl/spark/fn/PairMapFunction.java   |  4 +-
 .../impl/spark/fn/PairMapIterableFunction.java  |  4 +-
 .../spark/fn/PartitionedMapOutputFunction.java  |  2 +-
 .../impl/spark/fn/ReduceGroupingFunction.java   | 14 +--
 .../impl/spark/fn/ReduceInputFunction.java      |  7 +-
 pom.xml                                         |  6 +-
 37 files changed, 726 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-spark/pom.xml b/crunch-spark/pom.xml
index dbe8169..e1a42eb 100644
--- a/crunch-spark/pom.xml
+++ b/crunch-spark/pom.xml
@@ -34,6 +34,11 @@ under the License.
       <artifactId>scala-library</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java
new file mode 100644
index 0000000..6ff0b18
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkTaskAttemptIT.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class SparkTaskAttemptIT {
+  @Rule
+  public TemporaryPath tempDir = new TemporaryPath();
+
+  private SparkPipeline pipeline;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new SparkPipeline("local", "taskattempt");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    pipeline.done();
+  }
+
+  @Test
+  public void testTaskAttempts() throws Exception {
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+
+    PCollection<String> first = pipeline.read(From.textFile(inputPath));
+    PCollection<String> second = pipeline.read(From.textFile(inputPath2));
+
+    Iterable<Pair<Integer, Long>> cnts = first.union(second)
+        .parallelDo(new TaskMapFn(), Avros.ints())
+        .count()
+        .materialize();
+    assertEquals(ImmutableSet.of(Pair.of(0, 4L), Pair.of(1, 3L)), Sets.newHashSet(cnts));
+  }
+
+  private static class TaskMapFn extends MapFn<String, Integer> {
+    @Override
+    public Integer map(String input) {
+      return getContext().getTaskAttemptID().getTaskID().getId();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
new file mode 100644
index 0000000..f3f67cc
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Emitter;
+import org.apache.spark.api.java.function.DoubleFlatMapFunction;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link DoubleFlatMapFunction}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T, Double>
+    implements DoubleFlatMapFunction<T> {
+  @Override
+  public void process(T input, Emitter<Double> emitter) {
+    try {
+      for (Double d : call(input)) {
+        emitter.emit(d);
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java
new file mode 100644
index 0000000..e32b4fa
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.spark.api.java.function.DoubleFunction;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link DoubleFunction}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SDoubleFunction<T> extends SparkMapFn<T, Double> implements DoubleFunction<T> {
+  @Override
+  public Double map(T input) {
+    try {
+      return call(input);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
new file mode 100644
index 0000000..1fecb76
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Emitter;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link FlatMapFunction}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T, R>
+    implements FlatMapFunction<T, R> {
+  @Override
+  public void process(T input, Emitter<R> emitter) {
+    try {
+      for (R r : call(input)) {
+        emitter.emit(r);
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
new file mode 100644
index 0000000..0798f63
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.spark.api.java.function.FlatMapFunction2;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link FlatMapFunction2}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K, V>, R>
+    implements FlatMapFunction2<K, V, R> {
+  @Override
+  public void process(Pair<K, V> input, Emitter<R> emitter) {
+    try {
+      for (R r : call(input.first(), input.second())) {
+        emitter.emit(r);
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java
new file mode 100644
index 0000000..3cacf52
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link Function}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SFunction<T, R> extends SparkMapFn<T, R> implements Function<T, R> {
+  @Override
+  public R map(T input) {
+    try {
+      return call(input);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java
new file mode 100644
index 0000000..8322026
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunction2.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pair;
+import org.apache.spark.api.java.function.Function2;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link Function2}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SFunction2<K, V, R> extends SparkMapFn<Pair<K, V>, R>
+    implements Function2<K, V, R> {
+  @Override
+  public R map(Pair<K, V> input) {
+    try {
+      return call(input.first(), input.second());
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
new file mode 100644
index 0000000..cc59746
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.spark.api.java.function.DoubleFlatMapFunction;
+import org.apache.spark.api.java.function.DoubleFunction;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.FlatMapFunction2;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+/**
+ * Utility methods for wrapping existing Spark Java API Functions for
+ * Crunch compatibility.
+ */
+public final class SFunctions {
+
+  public static <T, R> SFunction<T, R> wrap(final Function<T, R> f) {
+    return new SFunction<T, R>() {
+      @Override
+      public R call(T t) throws Exception {
+        return f.call(t);
+      }
+    };
+  }
+
+  public static <K, V, R> SFunction2<K, V, R> wrap(final Function2<K, V, R> f) {
+    return new SFunction2<K, V, R>() {
+      @Override
+      public R call(K k, V v) throws Exception {
+        return f.call(k, v);
+      }
+    };
+  }
+
+  public static <T, K, V> SPairFunction<T, K, V> wrap(final PairFunction<T, K, V> f) {
+    return new SPairFunction<T, K, V>() {
+      @Override
+      public Tuple2<K, V> call(T t) throws Exception {
+        return f.call(t);
+      }
+    };
+  }
+
+  public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T, R> f) {
+    return new SFlatMapFunction<T, R>() {
+      @Override
+      public Iterable<R> call(T t) throws Exception {
+        return f.call(t);
+      }
+    };
+  }
+
+  public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K, V, R> f) {
+    return new SFlatMapFunction2<K, V, R>() {
+      @Override
+      public Iterable<R> call(K k, V v) throws Exception {
+        return f.call(k, v);
+      }
+    };
+  }
+
+  public static <T> SDoubleFunction<T> wrap(final DoubleFunction<T> f) {
+    return new SDoubleFunction<T>() {
+      @Override
+      public double call(T t) throws Exception {
+        return f.call(t);
+      }
+    };
+  }
+
+  public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T> f) {
+    return new SDoubleFlatMapFunction<T>() {
+      @Override
+      public Iterable<Double> call(T t) throws Exception {
+        return f.call(t);
+      }
+    };
+  }
+
+  private SFunctions() {}
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
new file mode 100644
index 0000000..3b8e75a
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link PairFlatMapFunction}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T, Pair<K, V>>
+    implements PairFlatMapFunction<T, K, V> {
+  @Override
+  public void process(T input, Emitter<Pair<K, V>> emitter) {
+    try {
+      for (Tuple2<K, V> kv : call(input)) {
+        emitter.emit(Pair.of(kv._1(), kv._2()));
+      }
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java
new file mode 100644
index 0000000..9d9317c
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Pair;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+/**
+ * A Crunch-compatible abstract base class for Spark's {@link PairFunction}. Subclasses
+ * of this class may be used against either Crunch {@code PCollections} or Spark {@code RDDs}.
+ */
+public abstract class SPairFunction<T, K, V> extends SparkMapFn<T, Pair<K, V>>
+    implements PairFunction<T, K, V> {
+  @Override
+  public Pair<K, V> map(T input) {
+    try {
+      Tuple2<K, V> t = call(input);
+      return t == null ? null : Pair.of(t._1(), t._2());
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java
new file mode 100644
index 0000000..05adfa7
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkDoFn.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+
+abstract class SparkDoFn<T, R> extends DoFn<T, R> {
+  @Override
+  public final void initialize() {
+    // Forced no-op for Spark compatibility
+  }
+
+  @Override
+  public final void cleanup(Emitter<R> emitter) {
+    // Forced no-op for Spark compatibility
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java
new file mode 100644
index 0000000..1f90b55
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SparkMapFn.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+
+abstract class SparkMapFn<T, R> extends MapFn<T, R> {
+  @Override
+  public final void initialize() {
+    // Forced no-op for Spark compatibility
+  }
+
+  @Override
+  public final void cleanup(Emitter<R> emitter) {
+    // Forced no-op for Spark compatibility
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index b5bbc8d..687274a 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -121,7 +121,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     this.conf = conf;
     this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(),
         new CounterAccumulatorParam());
-    this.ctxt = new SparkRuntimeContext(counters, sparkContext.broadcast(WritableUtils.toByteArray(conf)));
+    this.ctxt = new SparkRuntimeContext(sparkContext.appName(), counters,
+        sparkContext.broadcast(WritableUtils.toByteArray(conf)));
     this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);
     this.outputTargets.putAll(outputTargets);
     this.toMaterialize = toMaterialize;
@@ -305,11 +306,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
             if (rdd instanceof JavaRDD) {
               outRDD = ((JavaRDD) rdd)
                   .map(new MapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt))
-                  .map(new OutputConverterFunction(c));
+                  .mapToPair(new OutputConverterFunction(c));
             } else {
               outRDD = ((JavaPairRDD) rdd)
                   .map(new PairMapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn() : ident, ctxt))
-                  .map(new OutputConverterFunction(c));
+                  .mapToPair(new OutputConverterFunction(c));
             }
             try {
               Job job = new Job(conf);
@@ -368,7 +369,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
 
   private Counters getCounters() {
     Counters c = new Counters();
-    for (Map.Entry<String, Map<String, Long>> e : counters.value().entrySet()) {
+    Map<String, Map<String, Long>> values = counters.value();
+    for (Map.Entry<String, Map<String, Long>> e : values.entrySet()) {
       CounterGroup cg = c.getGroup(e.getKey());
       for (Map.Entry<String, Long> f : e.getValue().entrySet()) {
         cg.findCounter(f.getKey()).setValue(f.getValue());

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index ca52c29..44d3573 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.impl.spark;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.ByteStreams;
@@ -28,8 +29,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.mapred.SparkCounter;
 import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.spark.Accumulator;
 import org.apache.spark.SparkFiles;
@@ -44,14 +47,18 @@ import java.util.Map;
 
 public class SparkRuntimeContext implements Serializable {
 
+  private String jobName;
   private Broadcast<byte[]> broadConf;
   private final Accumulator<Map<String, Map<String, Long>>> counters;
   private transient Configuration conf;
   private transient TaskInputOutputContext context;
+  private transient Integer lastTID;
 
   public SparkRuntimeContext(
+      String jobName,
       Accumulator<Map<String, Map<String, Long>>> counters,
       Broadcast<byte[]> broadConf) {
+    this.jobName = jobName;
     this.counters = counters;
     this.broadConf = broadConf;
   }
@@ -61,11 +68,19 @@ public class SparkRuntimeContext implements Serializable {
     this.conf = null;
   }
 
-  public void initialize(DoFn<?, ?> fn) {
-    if (context == null) {
+  public void initialize(DoFn<?, ?> fn, Integer tid) {
+    if (context == null || !Objects.equal(lastTID, tid)) {
+      TaskAttemptID attemptID;
+      if (tid != null) {
+        TaskID taskId = new TaskID(new JobID(jobName, 0), false, tid);
+        attemptID = new TaskAttemptID(taskId, 0);
+        lastTID = tid;
+      } else {
+        attemptID = new TaskAttemptID();
+        lastTID = null;
+      }
       configureLocalFiles();
-      context = TaskInputOutputContextFactory.create(getConfiguration(), new TaskAttemptID(),
-          new SparkReporter(counters));
+      context = TaskInputOutputContextFactory.create(getConfiguration(), attemptID, new SparkReporter(counters));
     }
     fn.setContext(context);
     fn.initialize();

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
index 72888b8..6db14f2 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoCollection.java
@@ -23,11 +23,9 @@ import org.apache.crunch.impl.dist.collect.BaseDoCollection;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
-import org.apache.crunch.impl.spark.fn.FlatMapDoFn;
-import org.apache.crunch.impl.spark.fn.FlatMapPairDoFn;
+import org.apache.crunch.impl.spark.fn.FlatMapIndexFn;
 import org.apache.crunch.types.PType;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.storage.StorageLevel;
 
@@ -56,10 +54,8 @@ public class DoCollection<S> extends BaseDoCollection<S> implements SparkCollect
   private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime) {
     JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime);
     fn.configure(runtime.getConfiguration());
-    if (parentRDD instanceof JavaRDD) {
-      return ((JavaRDD) parentRDD).mapPartitions(new FlatMapDoFn(fn, runtime.getRuntimeContext()));
-    } else {
-      return ((JavaPairRDD) parentRDD).mapPartitions(new FlatMapPairDoFn(fn, runtime.getRuntimeContext()));
-    }
+    return parentRDD.mapPartitionsWithIndex(
+        new FlatMapIndexFn(fn, parentRDD instanceof JavaPairRDD, runtime.getRuntimeContext()),
+        false);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
index c0e4bb1..f593590 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/DoTable.java
@@ -25,11 +25,10 @@ import org.apache.crunch.impl.dist.collect.BaseDoTable;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
-import org.apache.crunch.impl.spark.fn.PairFlatMapDoFn;
-import org.apache.crunch.impl.spark.fn.PairFlatMapPairDoFn;
+import org.apache.crunch.impl.spark.fn.CrunchPairTuple2;
+import org.apache.crunch.impl.spark.fn.FlatMapIndexFn;
 import org.apache.crunch.types.PTableType;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.storage.StorageLevel;
 
@@ -69,10 +68,10 @@ public class DoTable<K, V> extends BaseDoTable<K, V> implements SparkCollection
     }
     JavaRDDLike<?, ?> parentRDD = ((SparkCollection) getOnlyParent()).getJavaRDDLike(runtime);
     fn.configure(runtime.getConfiguration());
-    if (parentRDD instanceof JavaRDD) {
-      return ((JavaRDD) parentRDD).mapPartitions(new PairFlatMapDoFn(fn, runtime.getRuntimeContext()));
-    } else {
-      return ((JavaPairRDD) parentRDD).mapPartitions(new PairFlatMapPairDoFn(fn, runtime.getRuntimeContext()));
-    }
+    return parentRDD
+        .mapPartitionsWithIndex(
+            new FlatMapIndexFn(fn, parentRDD instanceof JavaPairRDD, runtime.getRuntimeContext()),
+            false)
+        .mapPartitionsToPair(new CrunchPairTuple2());
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
index 4de50b8..5e6594e 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/PGroupedTableImpl.java
@@ -83,7 +83,8 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S
   private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime runtime, CombineFn<K, V> combineFn) {
     JavaPairRDD<K, V> parentRDD = (JavaPairRDD<K, V>) ((SparkCollection)getOnlyParent()).getJavaRDDLike(runtime);
     if (combineFn != null) {
-      parentRDD = parentRDD.mapPartitions(new CombineMapsideFunction<K, V>(combineFn, runtime.getRuntimeContext()));
+      parentRDD = parentRDD.mapPartitionsToPair(
+          new CombineMapsideFunction<K, V>(combineFn, runtime.getRuntimeContext()));
     }
     SerDe keySerde, valueSerde;
     PTableType<K, V> parentType = ptype.getTableType();
@@ -106,13 +107,14 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S
     if (groupingOptions.getPartitionerClass() != null) {
       groupedRDD = parentRDD
           .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext()))
-          .map(new PartitionedMapOutputFunction(keySerde, valueSerde, ptype, groupingOptions.getPartitionerClass(),
+          .mapToPair(
+              new PartitionedMapOutputFunction(keySerde, valueSerde, ptype, groupingOptions.getPartitionerClass(),
               numPartitions, runtime.getRuntimeContext()))
           .groupByKey(new SparkPartitioner(numPartitions));
     } else {
       groupedRDD = parentRDD
           .map(new PairMapFunction(ptype.getOutputMapFn(), runtime.getRuntimeContext()))
-          .map(new MapOutputFunction(keySerde, valueSerde))
+          .mapToPair(new MapOutputFunction(keySerde, valueSerde))
           .groupByKey(numPartitions);
     }
 
@@ -121,12 +123,12 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements S
       groupedRDD = groupedRDD.sortByKey(scmp);
     }
     if (groupingOptions.getGroupingComparatorClass() != null) {
-      groupedRDD = groupedRDD.mapPartitions(
+      groupedRDD = groupedRDD.mapPartitionsToPair(
           new ReduceGroupingFunction(groupingOptions, ptype, runtime.getRuntimeContext()));
     }
 
     return groupedRDD
         .map(new ReduceInputFunction(keySerde, valueSerde))
-        .map(new PairMapIterableFunction(ptype.getInputMapFn(), runtime.getRuntimeContext()));
+        .mapToPair(new PairMapIterableFunction(ptype.getInputMapFn(), runtime.getRuntimeContext()));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
index 4761754..9d2a10c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/ToByteArrayFunction.java
@@ -25,9 +25,9 @@ import scala.Tuple2;
 
 import java.util.List;
 
-public class ToByteArrayFunction extends PairFunction<Tuple2<IntByteArray, List<byte[]>>, ByteArray, List<byte[]>> {
+public class ToByteArrayFunction implements PairFunction<Tuple2<IntByteArray, List<byte[]>>, ByteArray, List<byte[]>> {
   @Override
   public Tuple2<ByteArray, List<byte[]>> call(Tuple2<IntByteArray, List<byte[]>> t) throws Exception {
-    return new Tuple2<ByteArray, List<byte[]>>(t._1, t._2);
+    return new Tuple2<ByteArray, List<byte[]>>(t._1(), t._2());
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
index b2776c5..826e5c4 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/UnionTable.java
@@ -59,7 +59,7 @@ public class UnionTable<K, V> extends BaseUnionTable<K, V> implements SparkColle
         rdds[i] = (JavaPairRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
       } else {
         JavaRDD rdd = (JavaRDD) ((SparkCollection) parents.get(i)).getJavaRDDLike(runtime);
-        rdds[i] = rdd.mapPartitions(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext()));
+        rdds[i] = rdd.mapPartitionsToPair(new PairFlatMapDoFn(IdentityFn.getInstance(), runtime.getRuntimeContext()));
       }
     }
     return runtime.getSparkContext().union(rdds);

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
index 3cc8e05..1bea08d 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class CombineMapsideFunction<K, V> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K, V> {
+public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterator<Tuple2<K, V>>, K, V> {
 
   private static final int REDUCE_EVERY_N = 50000;
 
@@ -46,7 +46,7 @@ public class CombineMapsideFunction<K, V> extends PairFlatMapFunction<Iterator<T
 
   @Override
   public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter) throws Exception {
-    ctxt.initialize(combineFn);
+    ctxt.initialize(combineFn, null);
     Map<K, List<V>> cache = Maps.newHashMap();
     int cnt = 0;
     while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
new file mode 100644
index 0000000..d6c544c
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Iterators;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.GuavaUtils;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+
+public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K, V>>, K, V> {
+  @Override
+  public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator) throws Exception {
+    return new Iterable<Tuple2<K, V>>() {
+      @Override
+      public Iterator<Tuple2<K, V>> iterator() {
+        return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
deleted file mode 100644
index cfb6d42..0000000
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapDoFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.spark.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.impl.spark.SparkRuntimeContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-
-import java.util.Iterator;
-
-public class FlatMapDoFn<S, T> extends FlatMapFunction<Iterator<S>, T> {
-  private final DoFn<S, T> fn;
-  private final SparkRuntimeContext ctxt;
-
-  public FlatMapDoFn(DoFn<S, T> fn, SparkRuntimeContext ctxt) {
-    this.fn = fn;
-    this.ctxt = ctxt;
-  }
-
-  @Override
-  public Iterable<T> call(Iterator<S> input) throws Exception {
-    ctxt.initialize(fn);
-    return new CrunchIterable<S, T>(fn, input);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java
new file mode 100644
index 0000000..4eb5884
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapIndexFn.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.GuavaUtils;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.crunch.util.DoFnIterator;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+
+public class FlatMapIndexFn<S, T> implements Function2<Integer, Iterator, Iterator<T>> {
+  private final DoFn<S, T> fn;
+  private final boolean convertInput;
+  private final SparkRuntimeContext ctxt;
+
+  public FlatMapIndexFn(DoFn<S, T> fn, boolean convertInput, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.convertInput = convertInput;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterator<T> call(Integer partitionId, Iterator input) throws Exception {
+    ctxt.initialize(fn, partitionId);
+    Iterator in = convertInput ? Iterators.transform(input, GuavaUtils.tuple2PairFunc()) : input;
+    return new DoFnIterator<S, T>(in, fn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
index 30aef83..8ec2834 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
@@ -27,7 +27,7 @@ import scala.Tuple2;
 
 import java.util.Iterator;
 
-public class FlatMapPairDoFn<K, V, T> extends FlatMapFunction<Iterator<Tuple2<K, V>>, T> {
+public class FlatMapPairDoFn<K, V, T> implements FlatMapFunction<Iterator<Tuple2<K, V>>, T> {
   private final DoFn<Pair<K, V>, T> fn;
   private final SparkRuntimeContext ctxt;
 
@@ -38,7 +38,7 @@ public class FlatMapPairDoFn<K, V, T> extends FlatMapFunction<Iterator<Tuple2<K,
 
   @Override
   public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception {
-    ctxt.initialize(fn);
+    ctxt.initialize(fn, null);
     return new CrunchIterable<Pair<K, V>, T>(fn,
         Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
index 36745c1..d231527 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
@@ -21,7 +21,7 @@ import org.apache.crunch.types.Converter;
 import org.apache.spark.api.java.function.Function;
 import scala.Tuple2;
 
-public class InputConverterFunction<K, V, S> extends Function<Tuple2<K, V>, S> {
+public class InputConverterFunction<K, V, S> implements Function<Tuple2<K, V>, S> {
   private Converter<K, V, S, ?> converter;
 
   public InputConverterFunction(Converter<K, V, S, ?> converter) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
index b08aaeb..f611b4b 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapFunction.java
@@ -21,7 +21,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.impl.spark.SparkRuntimeContext;
 import org.apache.spark.api.java.function.Function;
 
-public class MapFunction extends Function<Object, Object> {
+public class MapFunction implements Function<Object, Object> {
   private final MapFn fn;
   private final SparkRuntimeContext ctxt;
   private boolean initialized;
@@ -34,7 +34,7 @@ public class MapFunction extends Function<Object, Object> {
   @Override
   public Object call(Object o) throws Exception {
     if (!initialized) {
-      ctxt.initialize(fn);
+      ctxt.initialize(fn, null);
       initialized = true;
     }
     return fn.map(o);

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
index 47ef752..b8cd7c6 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/MapOutputFunction.java
@@ -23,7 +23,7 @@ import org.apache.crunch.impl.spark.serde.SerDe;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
-public class MapOutputFunction<K, V> extends PairFunction<Pair<K, V>, ByteArray, byte[]> {
+public class MapOutputFunction<K, V> implements PairFunction<Pair<K, V>, ByteArray, byte[]> {
 
   private final SerDe keySerde;
   private final SerDe valueSerde;

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
index b1184d8..0e611b0 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
@@ -21,7 +21,7 @@ import org.apache.crunch.types.Converter;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
-public class OutputConverterFunction<K, V, S> extends PairFunction<S, K, V> {
+public class OutputConverterFunction<K, V, S> implements PairFunction<S, K, V> {
   private Converter<K, V, S, ?> converter;
 
   public OutputConverterFunction(Converter<K, V, S, ?> converter) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
index b2d93a0..7f289cc 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
@@ -27,7 +27,7 @@ import scala.Tuple2;
 
 import java.util.Iterator;
 
-public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>, K, V> {
+public class PairFlatMapDoFn<T, K, V> implements PairFlatMapFunction<Iterator<T>, K, V> {
   private final DoFn<T, Pair<K, V>> fn;
   private final SparkRuntimeContext ctxt;
 
@@ -38,7 +38,7 @@ public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>, K
 
   @Override
   public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception {
-    ctxt.initialize(fn);
+    ctxt.initialize(fn, null);
     return Iterables.transform(
         new CrunchIterable<T, Pair<K, V>>(fn, input),
         GuavaUtils.<K, V>pair2tupleFunc());

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
deleted file mode 100644
index bc3e701..0000000
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.spark.fn;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.spark.GuavaUtils;
-import org.apache.crunch.impl.spark.SparkRuntimeContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import scala.Tuple2;
-
-import java.util.Iterator;
-
-public class PairFlatMapPairDoFn<K, V, K2, V2> extends PairFlatMapFunction<Iterator<Tuple2<K, V>>, K2, V2> {
-  private final DoFn<Pair<K, V>, Pair<K2, V2>> fn;
-  private final SparkRuntimeContext ctxt;
-
-  public PairFlatMapPairDoFn(DoFn<Pair<K, V>, Pair<K2, V2>> fn, SparkRuntimeContext ctxt) {
-    this.fn = fn;
-    this.ctxt = ctxt;
-  }
-
-  @Override
-  public Iterable<Tuple2<K2, V2>> call(Iterator<Tuple2<K, V>> input) throws Exception {
-    ctxt.initialize(fn);
-    return Iterables.transform(
-        new CrunchIterable<Pair<K, V>, Pair<K2, V2>>(
-            fn,
-            Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())),
-        GuavaUtils.<K2, V2>pair2tupleFunc());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
index 673bbab..652452c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
@@ -23,7 +23,7 @@ import org.apache.crunch.impl.spark.SparkRuntimeContext;
 import org.apache.spark.api.java.function.Function;
 import scala.Tuple2;
 
-public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> {
+public class PairMapFunction<K, V, S> implements Function<Tuple2<K, V>, S> {
   private final MapFn<Pair<K, V>, S> fn;
   private final SparkRuntimeContext ctxt;
   private boolean initialized;
@@ -36,7 +36,7 @@ public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S> {
   @Override
   public S call(Tuple2<K, V> kv) throws Exception {
     if (!initialized) {
-      ctxt.initialize(fn);
+      ctxt.initialize(fn, null);
       initialized = true;
     }
     return fn.map(Pair.of(kv._1(), kv._2()));

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
index 7bfe378..f36070f 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
@@ -26,7 +26,7 @@ import scala.Tuple2;
 
 import java.util.List;
 
-public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K, List<V>>, S, Iterable<T>> {
+public class PairMapIterableFunction<K, V, S, T> implements PairFunction<Pair<K, List<V>>, S, Iterable<T>> {
 
   private final MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>> fn;
   private final SparkRuntimeContext runtimeContext;
@@ -42,7 +42,7 @@ public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K, Li
   @Override
   public Tuple2<S, Iterable<T>> call(Pair<K, List<V>> input) throws Exception {
     if (!initialized) {
-      runtimeContext.initialize(fn);
+      runtimeContext.initialize(fn, null);
       initialized = true;
     }
     Pair<S, Iterable<T>> out = fn.map(input);

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
index a10b7f6..e88217d 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
@@ -34,7 +34,7 @@ import scala.Tuple2;
 
 import java.io.IOException;
 
-public class PartitionedMapOutputFunction<K, V> extends PairFunction<Pair<K, V>, IntByteArray, byte[]> {
+public class PartitionedMapOutputFunction<K, V> implements PairFunction<Pair<K, V>, IntByteArray, byte[]> {
 
   private final SerDe<K> keySerde;
   private final SerDe<V> valueSerde;

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
index 35dd7dd..d3dd69e 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
@@ -33,7 +33,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2<ByteArray, List<byte[]>>>,
+public class ReduceGroupingFunction implements PairFlatMapFunction<Iterator<Tuple2<ByteArray, List<byte[]>>>,
     ByteArray, List<byte[]>> {
 
   private final GroupingOptions options;
@@ -97,13 +97,13 @@ public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2<
       while (iter.hasNext()) {
         Tuple2<ByteArray, List<byte[]>> t = iter.next();
         if (key == null) {
-          key = t._1;
-          bytes.addAll(t._2);
-        } else if (cmp.compare(key.value, 0, key.value.length, t._1.value, 0, t._1.value.length) == 0) {
-          bytes.addAll(t._2);
+          key = t._1();
+          bytes.addAll(t._2());
+        } else if (cmp.compare(key.value, 0, key.value.length, t._1().value, 0, t._1().value.length) == 0) {
+          bytes.addAll(t._2());
         } else {
-          nextKey = t._1;
-          next = Lists.newArrayList(t._2);
+          nextKey = t._1();
+          next = Lists.newArrayList(t._2());
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
index 4ebdfaa..11e3bde 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
@@ -19,6 +19,7 @@
  */
 package org.apache.crunch.impl.spark.fn;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.crunch.Pair;
 import org.apache.crunch.impl.spark.ByteArray;
@@ -28,7 +29,7 @@ import scala.Tuple2;
 
 import java.util.List;
 
-public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<byte[]>>, Pair<K, List<V>>> {
+public class ReduceInputFunction<K, V> implements Function<Tuple2<ByteArray, Iterable<byte[]>>, Pair<K, Iterable<V>>> {
   private final SerDe<K> keySerDe;
   private final SerDe<V> valueSerDe;
 
@@ -38,7 +39,7 @@ public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<b
   }
 
   @Override
-  public Pair<K, List<V>> call(Tuple2<ByteArray, List<byte[]>> kv) throws Exception {
-    return Pair.of(keySerDe.fromBytes(kv._1.value), Lists.transform(kv._2, valueSerDe.fromBytesFunction()));
+  public Pair<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> kv) throws Exception {
+    return Pair.of(keySerDe.fromBytes(kv._1().value), Iterables.transform(kv._2(), valueSerDe.fromBytesFunction()));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/43546587/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d86df7e..8e2f5a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@ under the License.
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>
     <scalatest.version>1.9.1</scalatest.version>
-    <spark.version>0.9.1</spark.version>
+    <spark.version>1.0.0</spark.version>
     <jsr305.version>1.3.9</jsr305.version>
   </properties>
 
@@ -239,6 +239,10 @@ under the License.
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-core</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-ipc</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
 


Mime
View raw message