kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/3] kudu git commit: KUDU-1824. KuduRDD.collect fails because of NoSerializableException
Date Fri, 10 Mar 2017 05:04:11 GMT
Repository: kudu
Updated Branches:
  refs/heads/master fc50b98aa -> 542ba4ed7


KUDU-1824. KuduRDD.collect fails because of NoSerializableException

The internal KuduRow class has been removed, and instead we copy into a
serializable Spark row format.

This also fixes a few style issues.

Change-Id: I42618188003d2eef66088f3101803d1750e4134b
Reviewed-on: http://gerrit.cloudera.org:8080/5636
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6db54007
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6db54007
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6db54007

Branch: refs/heads/master
Commit: 6db540070a9326190c30996851207dfa0fb8066d
Parents: fc50b98
Author: Dan Burkert <danburkert@apache.org>
Authored: Fri Jan 6 16:26:03 2017 -0800
Committer: Todd Lipcon <todd@apache.org>
Committed: Fri Mar 10 00:51:33 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    | 34 +++++++-------------
 .../apache/kudu/spark/kudu/KuduRDDTest.scala    | 32 ++++++++++++++++++
 2 files changed, 44 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6db54007/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index c771337..8a5d859 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -18,13 +18,12 @@ package org.apache.kudu.spark.kudu
 
 import scala.collection.JavaConverters._
 
+import org.apache.kudu.client._
+import org.apache.kudu.{Type, client}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 
-import org.apache.kudu.client._
-import org.apache.kudu.{Type, client}
-
 /**
   * A Resilient Distributed Dataset backed by a Kudu table.
   *
@@ -59,7 +58,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
     val client: KuduClient = kuduContext.syncClient
     val partition: KuduPartition = part.asInstanceOf[KuduPartition]
     val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowResultIteratorScala(scanner)
+    new RowIterator(scanner)
   }
 
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -70,15 +69,15 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext,
 /**
   * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]].
   */
-private[spark] class KuduPartition(val index: Int,
-                                   val scanToken: Array[Byte],
-                                   val locations: Array[String]) extends Partition {}
+private class KuduPartition(val index: Int,
+                            val scanToken: Array[Byte],
+                            val locations: Array[String]) extends Partition {}
 
 /**
   * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
   * @param scanner the wrapped scanner
   */
-private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row]
{
+private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] {
 
   private var currentIterator: RowResultIterator = null
 
@@ -90,17 +89,7 @@ private[spark] class RowResultIteratorScala(private val scanner: KuduScanner)
ex
     currentIterator.hasNext
   }
 
-  override def next(): Row = new KuduRow(currentIterator.next())
-}
-
-/**
-  * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]].
-  * @param rowResult the wrapped row result
-  */
-private[spark] class KuduRow(private val rowResult: RowResult) extends Row {
-  override def length: Int = rowResult.getColumnProjection.getColumnCount
-
-  override def get(i: Int): Any = {
+  private def get(rowResult: RowResult, i: Int): Any = {
     if (rowResult.isNull(i)) null
     else rowResult.getColumnType(i) match {
       case Type.BOOL => rowResult.getBoolean(i)
@@ -116,7 +105,8 @@ private[spark] class KuduRow(private val rowResult: RowResult) extends
Row {
     }
   }
 
-  override def copy(): Row = Row.fromSeq(Range(0, length).map(get))
-
-  override def toString(): String = rowResult.toString
+  override def next(): Row = {
+    val rowResult = currentIterator.next()
+    Row.fromSeq(Range(0, rowResult.getColumnProjection.getColumnCount).map(get(rowResult,
_)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/6db54007/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
new file mode 100644
index 0000000..d609e41
--- /dev/null
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.kudu
+
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class KuduRDDTest extends FunSuite with TestContext with BeforeAndAfter {
+
+  test("collect rows") {
+    insertRows(100)
+    val rdd = kuduContext.kuduRDD(sc, tableName, List("key"))
+    assert(rdd.collect.length == 100)
+  }
+}


Mime
View raw message