Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D8C1D185EB for ; Tue, 3 Nov 2015 23:16:21 +0000 (UTC) Received: (qmail 66182 invoked by uid 500); 3 Nov 2015 23:16:21 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 66147 invoked by uid 500); 3 Nov 2015 23:16:21 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 66138 invoked by uid 99); 3 Nov 2015 23:16:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 23:16:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F0CDE03E8; Tue, 3 Nov 2015 23:16:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marmbrus@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-11477] [SQL] support create Dataset from RDD Date: Tue, 3 Nov 2015 23:16:21 +0000 (UTC) Repository: spark Updated Branches: refs/heads/master 1d04dc95c -> f6fcb4874 [SPARK-11477] [SQL] support create Dataset from RDD Author: Wenchen Fan Closes #9434 from cloud-fan/rdd2ds and squashes the following commits: 0892d72 [Wenchen Fan] support create Dataset from RDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fcb487 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fcb487 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fcb487 Branch: refs/heads/master Commit: f6fcb4874ce20a1daa91b7434cf9c0254a89e979 Parents: 1d04dc9 Author: Wenchen Fan Authored: Wed Nov 4 00:15:50 2015 +0100 Committer: Michael Armbrust Committed: Wed Nov 4 00:15:50 2015 +0100 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 9 +++++++++ .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 4 ++++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 7 +++++++ 3 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2cb9443..5ad3871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -499,6 +499,15 @@ class SQLContext private[sql]( new Dataset[T](this, plan) } + def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { + val enc = encoderFor[T] + val attributes = enc.schema.toAttributes + val encoded = data.map(d => enc.toRow(d)) + val plan = LogicalRDD(attributes, encoded)(self) + + new Dataset[T](this, plan) + } + /** * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be * converted to Catalyst rows. http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index f460a86..f2904e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -48,6 +48,10 @@ abstract class SQLImplicits { implicit def newBooleanEncoder: Encoder[Boolean] = ExpressionEncoder[Boolean](flat = true) implicit def newStringEncoder: Encoder[String] = ExpressionEncoder[String](flat = true) + implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = { + DatasetHolder(_sqlContext.createDataset(rdd)) + } + implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { DatasetHolder(_sqlContext.createDataset(s)) } http://git-wip-us.apache.org/repos/asf/spark/blob/f6fcb487/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5973fa7..3e9b621 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -34,6 +34,13 @@ class DatasetSuite extends QueryTest with SharedSQLContext { data: _*) } + test("toDS with RDD") { + val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS() + checkAnswer( + ds.mapPartitions(_ => Iterator(1)), + 1, 1, 1) + } + test("as tuple") { val data = Seq(("a", 1), ("b", 2)).toDF("a", "b") checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org