spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: SPARK-1509: add zipWithIndex zipWithUniqueId methods to java api
Date Tue, 29 Apr 2014 18:30:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8db0f7e28 -> 7d1505841


SPARK-1509: add zipWithIndex zipWithUniqueId methods to java api

Author: witgo <witgo@qq.com>

Closes #423 from witgo/zipWithIndex and squashes the following commits:

039ec04 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
24d74c9 [witgo] review commit
763a5e4 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
59747d1 [witgo] review commit
7bf4d06 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
daa8f84 [witgo] review commit
4070613 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex
18e6c97 [witgo] java api zipWithIndex test
11e2e7f [witgo] add zipWithIndex zipWithUniqueId methods to java api


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d150584
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d150584
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d150584

Branch: refs/heads/master
Commit: 7d1505841069c6ecc3fa7e4896db535f18e4ce84
Parents: 8db0f7e
Author: witgo <witgo@qq.com>
Authored: Tue Apr 29 11:30:47 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Tue Apr 29 11:30:47 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 22 +++++++++++++-
 .../java/org/apache/spark/JavaAPISuite.java     | 31 +++++++++++++++-----
 2 files changed, 45 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d150584/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 574a986..af06d1d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.api.java
 
 import java.util.{Comparator, List => JList, Iterator => JIterator}
-import java.lang.{Iterable => JIterable}
+import java.lang.{Iterable => JIterable, Long => JLong}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable
{
       rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
   }
 
+  /**
+   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids
k, n+k,
+   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
+   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
+   */
+  def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
+    JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
+  }
+
+  /**
+   * Zips this RDD with its element indices. The ordering is first based on the partition
index
+   * and then the ordering of items within each partition. So the first item in the first
+   * partition gets index 0, and the last item in the last partition receives the largest
index.
+   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index
type.
+   * This method needs to trigger a spark job when this RDD contains more than one partitions.
+   */
+  def zipWithIndex(): JavaPairRDD[T, JLong] = {
+    JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
+  }
+
   // Actions (launch a job to return a value to the user program)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7d150584/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 76c6f5a..c3e03ce 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -182,13 +182,30 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(2, foreachCalls);
   }
 
-    @Test
-    public void toLocalIterator() {
-        List<Integer> correct = Arrays.asList(1, 2, 3, 4);
-        JavaRDD<Integer> rdd = sc.parallelize(correct);
-        List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
-        Assert.assertTrue(correct.equals(result));
-    }
+  @Test
+  public void toLocalIterator() {
+    List<Integer> correct = Arrays.asList(1, 2, 3, 4);
+    JavaRDD<Integer> rdd = sc.parallelize(correct);
+    List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
+    Assert.assertTrue(correct.equals(result));
+  }
+
+  @Test
+  public void zipWithUniqueId() {
+    List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+    JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
+    JavaRDD<Long> indexes = zip.values();
+    Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
+  }
+
+  @Test
+  public void zipWithIndex() {
+    List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+    JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
+    JavaRDD<Long> indexes = zip.values();
+    List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
+    Assert.assertTrue(indexes.collect().equals(correctIndexes));
+  }
 
   @SuppressWarnings("unchecked")
   @Test


Mime
View raw message