crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject crunch git commit: CRUNCH-618: Run on Spark 2. Contributed by Gergő Pásztor.
Date Thu, 13 Apr 2017 15:32:11 GMT
Repository: crunch
Updated Branches:
  refs/heads/master ce9aaa3a5 -> 047d8fd36


CRUNCH-618: Run on Spark 2. Contributed by Gergő Pásztor.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/047d8fd3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/047d8fd3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/047d8fd3

Branch: refs/heads/master
Commit: 047d8fd36773608a3d2cf6445881173e7d26377c
Parents: ce9aaa3
Author: Tom White <tomwhite@apache.org>
Authored: Thu Apr 13 16:10:23 2017 +0100
Committer: Tom White <tomwhite@apache.org>
Committed: Thu Apr 13 16:10:23 2017 +0100

----------------------------------------------------------------------
 crunch-kafka/pom.xml                            |  2 +-
 crunch-scrunch/pom.xml                          |  2 +-
 .../interpreter/InterpreterJarTest.scala        | 23 +++++++++-------
 .../org/apache/crunch/scrunch/PTypeFamily.scala |  4 +--
 .../scrunch/interpreter/InterpreterRunner.scala | 27 ++++++++++---------
 .../org/apache/crunch/fn/IterableIterator.java  | 28 ++++++++++++++++++++
 .../crunch/fn/SDoubleFlatMapFunction.java       |  2 +-
 .../org/apache/crunch/fn/SFlatMapFunction.java  |  2 +-
 .../org/apache/crunch/fn/SFlatMapFunction2.java |  2 +-
 .../java/org/apache/crunch/fn/SFunctions.java   |  8 +++---
 .../apache/crunch/fn/SPairFlatMapFunction.java  |  2 +-
 .../impl/spark/fn/CombineMapsideFunction.java   |  4 +--
 .../crunch/impl/spark/fn/CrunchPairTuple2.java  |  9 ++-----
 .../crunch/impl/spark/fn/FlatMapPairDoFn.java   |  4 +--
 .../crunch/impl/spark/fn/PairFlatMapDoFn.java   |  4 +--
 .../impl/spark/fn/ReduceGroupingFunction.java   |  9 ++-----
 pom.xml                                         | 10 +++----
 17 files changed, 83 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
index 961b106..32429f5 100644
--- a/crunch-kafka/pom.xml
+++ b/crunch-kafka/pom.xml
@@ -40,7 +40,7 @@ under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
     </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 51925fb..9376059 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -38,7 +38,7 @@ under the License.
       <artifactId>scala-compiler</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
+      <groupId>jline</groupId>
       <artifactId>jline</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
index 5ebc303..bc9bd0f 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/interpreter/InterpreterJarTest.scala
@@ -22,8 +22,6 @@ import java.io.FileOutputStream
 import java.util.jar.JarFile
 import java.util.jar.JarOutputStream
 
-import scala.tools.nsc.io.VirtualDirectory
-
 import com.google.common.io.Files
 import org.junit.Assert.assertNotNull
 import org.junit.Test
@@ -31,30 +29,35 @@ import org.apache.crunch.test.CrunchTestSupport
 import org.scalatest.junit.JUnitSuite
 import org.apache.crunch.scrunch.CrunchSuite
 
+import scala.tools.nsc.interpreter.{ReplDir, ReplOutput}
+import scala.tools.nsc.settings.MutableSettings
+
 /**
- * Tests creating jars from a {@link scala.tools.nsc.io.VirtualDirectory}.
+ * Tests creating jars from a {@link scala.tools.nsc.interpreter.ReplDir}.
  */
 class InterpreterJarTest extends CrunchSuite {
 
   /**
-   * Tests transforming a virtual directory into a temporary jar file.
+   * Tests transforming an output directory into a temporary jar file.
    */
-  @Test def virtualDirToJar: Unit = {
-    // Create a virtual directory and populate with some mock content.
-    val root = new VirtualDirectory("testDir", None)
+  @Test def outputDirToJar: Unit = {
+    // Create an output directory and populate with some mock content.
+    val settings = new MutableSettings(e => println("ERROR: "+e))
+    val dirSetting = settings.StringSetting("-Yrepl-outdir", "path", "Test path", "")
+    val root: ReplDir = new ReplOutput(dirSetting).dir
     // Add some subdirectories to the root.
     (1 to 10).foreach { i =>
-      val subdir = root.subdirectoryNamed("subdir" + i).asInstanceOf[VirtualDirectory]
+      val subdir = root.subdirectoryNamed("subdir" + i)
       // Add some classfiles to each sub directory.
       (1 to 10).foreach { j =>
         subdir.fileNamed("MyClass" + j + ".class")
       }
     }
 
-    // Now generate a jar file from the virtual directory.
+    // Now generate a jar file from the output directory.
     val tempJar = new File(tempDir.getRootFile(), "replJar.jar")
     val jarStream = new JarOutputStream(new FileOutputStream(tempJar))
-    InterpreterRunner.addVirtualDirectoryToJar(root, "top/pack/name/", jarStream)
+    InterpreterRunner.addOutputDirectoryToJar(root, "top/pack/name/", jarStream)
     jarStream.close()
 
     // Verify the contents of the jar.

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 47cf637..a140acd 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -264,8 +264,8 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
   }
 
   private def products[T <: Product](tpe: Type, mirror: Mirror): PType[T] = {
-    val ctor = tpe.member(nme.CONSTRUCTOR).asMethod
-    val args = ctor.paramss.head.map(x => (x.name.toString,
+    val ctor = tpe.member(termNames.CONSTRUCTOR).asMethod
+    val args = ctor.paramLists.head.map(x => (x.name.toString,
       typeToPType(x.typeSignature, mirror)))
     val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]]
: _*)
     val rtc = mirror.runtimeClass(tpe)

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
index 0d84381..9416f1f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/interpreter/InterpreterRunner.scala
@@ -29,11 +29,9 @@ import scala.tools.nsc.ObjectRunner
 import scala.tools.nsc.Properties
 import scala.tools.nsc.ScriptRunner
 import scala.tools.nsc.interpreter.ILoop
-import scala.tools.nsc.io.Jar
-import scala.tools.nsc.io.VirtualDirectory
+import scala.tools.nsc.io.{AbstractFile, Jar}
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
-
 import org.apache.crunch.util.DistCache
 import org.apache.commons.io.IOUtils
 
@@ -126,7 +124,7 @@ object InterpreterRunner extends MainGenericRunner {
       ScriptRunner.runCommand(settings, combinedCode, thingToRun +: command.arguments)
     }
     else runTarget() match {
-      case Left(ex) => errorFn(ex.toString())
+      case Left(ex) => errorFn(ex.getMessage(), Some(ex))
       case Right(b) => b
     }
   }
@@ -145,11 +143,11 @@ object InterpreterRunner extends MainGenericRunner {
   def createReplCodeJar(): File = {
     var jarStream: JarOutputStream = null
     try {
-      val virtualDirectory = repl.intp.virtualDirectory.asInstanceOf[VirtualDirectory]
+      val outputDirectory = repl.replOutput.dir
       val tempDir = Files.createTempDir()
       val tempJar = new File(tempDir, "replJar.jar")
       jarStream = new JarOutputStream(new FileOutputStream(tempJar))
-      addVirtualDirectoryToJar(virtualDirectory, "", jarStream)
+      addOutputDirectoryToJar(outputDirectory, "", jarStream)
       return tempJar
     } finally {
       IOUtils.closeQuietly(jarStream)
@@ -157,14 +155,14 @@ object InterpreterRunner extends MainGenericRunner {
   }
 
   /**
-   * Add the contents of the specified virtual directory to a jar. This method will recursively
+   * Add the contents of the specified output directory to a jar. This method will recursively
    * descend into subdirectories to add their contents.
    *
-   * @param dir The virtual directory whose contents should be added.
-   * @param entryPath The entry path for classes found in the virtual directory.
+   * @param dir The output directory whose contents should be added.
+   * @param entryPath The entry path for classes found in the output directory.
    * @param jarStream An output stream for writing the jar file.
    */
-  def addVirtualDirectoryToJar(dir: VirtualDirectory, entryPath: String, jarStream:
+  def addOutputDirectoryToJar(dir: AbstractFile, entryPath: String, jarStream:
       JarOutputStream): Unit = {
     dir.foreach { file =>
       if (file.isDirectory) {
@@ -173,8 +171,7 @@ object InterpreterRunner extends MainGenericRunner {
         val entry: JarEntry = new JarEntry(dirPath)
         jarStream.putNextEntry(entry)
         jarStream.closeEntry()
-        addVirtualDirectoryToJar(file.asInstanceOf[VirtualDirectory],
-            dirPath, jarStream)
+        addOutputDirectoryToJar(file, dirPath, jarStream)
       } else if (file.hasExtension("class")) {
         // Add class files as an entry in the jar file and write the class to the jar.
         val entry: JarEntry = new JarEntry(entryPath + file.name)
@@ -197,7 +194,11 @@ object InterpreterRunner extends MainGenericRunner {
       // Generate a jar of REPL code and add to the distributed cache.
       val replJarFile = createReplCodeJar()
       DistCache.addJarToDistributedCache(configuration, replJarFile)
-      // Get the paths to jars added with the :cp command.
+      /**
+        * Get the paths to jars added with the :cp command.
+        * The next line will cause a Deprecation Warning, because of the 'repl.addedClasspath',
but
+        * we can safely ignore it as we are not using it to modify the classpath.
+        */
       val addedJarPaths = repl.addedClasspath.split(':')
       addedJarPaths.foreach {
         path => if (path.endsWith(".jar")) DistCache.addJarToDistributedCache(configuration,
path)

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
new file mode 100644
index 0000000..3c06c13
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/IterableIterator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import java.util.Iterator;
+
+class IterableIterator<T> implements Iterable<T> {
+  private final Iterator<T> itr;
+  IterableIterator(Iterator<T> itr) {
+    this.itr = itr;
+  }
+  public Iterator<T> iterator() { return itr;}
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
index f3f67cc..01b0b4c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SDoubleFlatMapFunction.java
@@ -30,7 +30,7 @@ public abstract class SDoubleFlatMapFunction<T> extends SparkDoFn<T,
Double>
   @Override
   public void process(T input, Emitter<Double> emitter) {
     try {
-      for (Double d : call(input)) {
+      for (Double d : new IterableIterator<Double>(call(input))) {
         emitter.emit(d);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
index 1fecb76..9ee4d9f 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction.java
@@ -30,7 +30,7 @@ public abstract class SFlatMapFunction<T, R> extends SparkDoFn<T,
R>
   @Override
   public void process(T input, Emitter<R> emitter) {
     try {
-      for (R r : call(input)) {
+      for (R r : new IterableIterator<R>(call(input))) {
         emitter.emit(r);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
index 0798f63..d7c6514 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFlatMapFunction2.java
@@ -32,7 +32,7 @@ public abstract class SFlatMapFunction2<K, V, R> extends DoFn<Pair<K,
V>, R>
   @Override
   public void process(Pair<K, V> input, Emitter<R> emitter) {
     try {
-      for (R r : call(input.first(), input.second())) {
+      for (R r : new IterableIterator<R>(call(input.first(), input.second()))) {
         emitter.emit(r);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
index cc59746..0ba7a37 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SFunctions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch.fn;
 
+import java.util.Iterator;
+
 import org.apache.spark.api.java.function.DoubleFlatMapFunction;
 import org.apache.spark.api.java.function.DoubleFunction;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -62,7 +64,7 @@ public final class SFunctions {
   public static <T, R> SFlatMapFunction<T, R> wrap(final FlatMapFunction<T,
R> f) {
     return new SFlatMapFunction<T, R>() {
       @Override
-      public Iterable<R> call(T t) throws Exception {
+      public Iterator<R> call(T t) throws Exception {
         return f.call(t);
       }
     };
@@ -71,7 +73,7 @@ public final class SFunctions {
   public static <K, V, R> SFlatMapFunction2<K, V, R> wrap(final FlatMapFunction2<K,
V, R> f) {
     return new SFlatMapFunction2<K, V, R>() {
       @Override
-      public Iterable<R> call(K k, V v) throws Exception {
+      public Iterator<R> call(K k, V v) throws Exception {
         return f.call(k, v);
       }
     };
@@ -89,7 +91,7 @@ public final class SFunctions {
   public static <T> SDoubleFlatMapFunction<T> wrap(final DoubleFlatMapFunction<T>
f) {
     return new SDoubleFlatMapFunction<T>() {
       @Override
-      public Iterable<Double> call(T t) throws Exception {
+      public Iterator<Double> call(T t) throws Exception {
         return f.call(t);
       }
     };

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
index 3b8e75a..2becd48 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/fn/SPairFlatMapFunction.java
@@ -32,7 +32,7 @@ public abstract class SPairFlatMapFunction<T, K, V> extends SparkDoFn<T,
Pair<K,
   @Override
   public void process(T input, Emitter<Pair<K, V>> emitter) {
     try {
-      for (Tuple2<K, V> kv : call(input)) {
+      for (Tuple2<K, V> kv : new IterableIterator<Tuple2<K, V>>(call(input)))
{
         emitter.emit(Pair.of(kv._1(), kv._2()));
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
index 1bea08d..231de77 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CombineMapsideFunction.java
@@ -45,7 +45,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato
   }
 
   @Override
-  public Iterable<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter)
throws Exception {
+  public Iterator<Tuple2<K, V>> call(Iterator<Tuple2<K, V>> iter)
throws Exception {
     ctxt.initialize(combineFn, null);
     Map<K, List<V>> cache = Maps.newHashMap();
     int cnt = 0;
@@ -63,7 +63,7 @@ public class CombineMapsideFunction<K, V> implements PairFlatMapFunction<Iterato
       }
     }
 
-    return new Flattener<K, V>(cache);
+    return new Flattener<K, V>(cache).iterator();
   }
 
   private Map<K, List<V>> reduce(Map<K, List<V>> cache) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
index d6c544c..ca3011f 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/CrunchPairTuple2.java
@@ -27,12 +27,7 @@ import java.util.Iterator;
 
 public class CrunchPairTuple2<K, V> implements PairFlatMapFunction<Iterator<Pair<K,
V>>, K, V> {
   @Override
-  public Iterable<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator)
throws Exception {
-    return new Iterable<Tuple2<K, V>>() {
-      @Override
-      public Iterator<Tuple2<K, V>> iterator() {
-        return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
-      }
-    };
+  public Iterator<Tuple2<K, V>> call(final Iterator<Pair<K, V>> iterator)
throws Exception {
+    return Iterators.transform(iterator, GuavaUtils.<K, V>pair2tupleFunc());
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
index 8ec2834..aca59f3 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/FlatMapPairDoFn.java
@@ -37,9 +37,9 @@ public class FlatMapPairDoFn<K, V, T> implements FlatMapFunction<Iterator<Tuple2
   }
 
   @Override
-  public Iterable<T> call(Iterator<Tuple2<K, V>> input) throws Exception
{
+  public Iterator<T> call(Iterator<Tuple2<K, V>> input) throws Exception
{
     ctxt.initialize(fn, null);
     return new CrunchIterable<Pair<K, V>, T>(fn,
-        Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc()));
+        Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())).iterator();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
index 7f289cc..c012e96 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
@@ -37,10 +37,10 @@ public class PairFlatMapDoFn<T, K, V> implements PairFlatMapFunction<Iterator<T>
   }
 
   @Override
-  public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception
{
+  public Iterator<Tuple2<K, V>> call(Iterator<T> input) throws Exception
{
     ctxt.initialize(fn, null);
     return Iterables.transform(
         new CrunchIterable<T, Pair<K, V>>(fn, input),
-        GuavaUtils.<K, V>pair2tupleFunc());
+        GuavaUtils.<K, V>pair2tupleFunc()).iterator();
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
index d3dd69e..eb14dfe 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
@@ -50,14 +50,9 @@ public class ReduceGroupingFunction implements PairFlatMapFunction<Iterator<Tupl
   }
 
   @Override
-  public Iterable<Tuple2<ByteArray, List<byte[]>>> call(
+  public Iterator<Tuple2<ByteArray, List<byte[]>>> call(
       final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception
{
-    return new Iterable<Tuple2<ByteArray, List<byte[]>>>() {
-      @Override
-      public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() {
-        return new GroupingIterator(iter, rawComparator());
-      }
-    };
+    return new GroupingIterator(iter, rawComparator());
   }
 
   private RawComparator<?> rawComparator() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/047d8fd3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7a570c0..5c45568 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,11 +105,11 @@ under the License.
     <avro.classifier>hadoop2</avro.classifier>
 
     <kafka.version>0.10.0.1</kafka.version>
-    <scala.base.version>2.10</scala.base.version>
-    <scala.version>2.10.4</scala.version>
+    <scala.base.version>2.11</scala.base.version>
+    <scala.version>2.11.8</scala.version>
     <scalatest.version>2.2.4</scalatest.version>
-    <spark.version>1.3.1</spark.version>
-    <jline.version>2.10.4</jline.version>
+    <spark.version>2.0.0</spark.version>
+    <jline.version>2.12.1</jline.version>
     <jsr305.version>1.3.9</jsr305.version>
   </properties>
 
@@ -456,7 +456,7 @@ under the License.
       </dependency>
 
       <dependency>
-        <groupId>org.scala-lang</groupId>
+        <groupId>jline</groupId>
         <artifactId>jline</artifactId>
         <version>${jline.version}</version>
       </dependency>


Mime
View raw message