Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C620C200AE1 for ; Mon, 6 Jun 2016 09:36:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C4FEB160A0E; Mon, 6 Jun 2016 07:36:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BE6B7160A56 for ; Mon, 6 Jun 2016 09:36:35 +0200 (CEST) Received: (qmail 21782 invoked by uid 500); 6 Jun 2016 07:36:34 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 21698 invoked by uid 99); 6 Jun 2016 07:36:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jun 2016 07:36:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B4BB9DFE59; Mon, 6 Jun 2016 07:36:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: dmagda@apache.org To: commits@ignite.apache.org Date: Mon, 06 Jun 2016 07:36:35 -0000 Message-Id: <6071ef01f2ab4dacaff29474da56a5c8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/13] ignite git commit: IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues archived-at: Mon, 06 Jun 2016 07:36:37 -0000 IGNITE-3232 - Inline transformers for IgniteRDD.savePairs and IgniteRDD.saveValues Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9df1b905 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9df1b905 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9df1b905 Branch: refs/heads/master Commit: 9df1b905cd86384c1d191785d70a8e5c8e741e48 Parents: 91862c7 Author: Valentin Kulichenko Authored: Thu Jun 2 16:03:12 2016 +0300 Committer: Valentin Kulichenko Committed: Fri Jun 3 20:12:11 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spark/IgniteRDD.scala | 75 +++++++++++++++++++ .../org/apache/ignite/spark/JavaIgniteRDD.scala | 9 +++ .../org/apache/ignite/spark/IgniteRDDSpec.scala | 77 +++++++++++++++++++- 3 files changed, 160 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 0d8e730..036dfe6 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -180,6 +180,39 @@ class IgniteRDD[K, V] ( } /** + * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + */ + def saveValues[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) ⇒ V) = { + rdd.foreachPartition(it ⇒ { + val ig = ic.ignite() + + ensureCache() + + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ig.dataStreamer[Object, V](cacheName) + + try { + it.foreach(t ⇒ { + val value = f(t, ic) + + val key = affinityKeyFunc(value, node.orNull) + + streamer.addData(key, value) + }) + } + finally { + streamer.close() + } + }) + } + + /** * Saves values from the given key-value RDD into Ignite. * * @param rdd RDD instance to save values from. @@ -209,6 +242,48 @@ class IgniteRDD[K, V] ( } /** + * Saves values from the given RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing + * values in Ignite cache. + */ + def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) ⇒ (K, V), overwrite: Boolean) = { + rdd.foreachPartition(it ⇒ { + val ig = ic.ignite() + + // Make sure to deploy the cache + ensureCache() + + val streamer = ig.dataStreamer[K, V](cacheName) + + try { + streamer.allowOverwrite(overwrite) + + it.foreach(t ⇒ { + val tup = f(t, ic) + + streamer.addData(tup._1, tup._2) + }) + } + finally { + streamer.close() + } + }) + } + + /** + * Saves values from the given RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param f Transformation function. + */ + def savePairs[T](rdd: RDD[T], f: (T, IgniteContext[K, V]) ⇒ (K, V)): Unit = { + savePairs(rdd, f, overwrite = false) + } + + /** * Removes all values from the underlying Ignite cache. */ def clear(): Unit = { http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala index 2e8702e..cac0e15 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -80,12 +80,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) ⇒ V) = rdd.saveValues(JavaRDD.toRDD(jrdd), f) + def savePairs(jrdd: JavaPairRDD[K, V]) = { val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) rdd.savePairs(rrdd) } + def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) ⇒ (K, V), overwrite: Boolean = false) = { + rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite) + } + + def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext[K, V]) ⇒ (K, V)): Unit = + savePairs(jrdd, f, overwrite = false) + def clear(): Unit = rdd.clear() } http://git-wip-us.apache.org/repos/asf/ignite/blob/9df1b905/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala index 8e36275..15a51ae 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRDDSpec.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkContext import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.junit.JUnitRunner +import scala.collection.JavaConversions._ import IgniteRDDSpec._ @@ -34,7 +35,7 @@ import scala.annotation.meta.field @RunWith(classOf[JUnitRunner]) class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { describe("IgniteRDD") { - it("should successfully store data to ignite") { + it("should successfully store data to ignite using savePairs") { val sc = new SparkContext("local[*]", "test") try { @@ -59,6 +60,80 @@ class IgniteRDDSpec extends FunSpec with Matchers with BeforeAndAfterAll with Be } } + it("should successfully store data to ignite using savePairs with inline transformation") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () ⇒ configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).savePairs( + sc.parallelize(0 to 10000, 2), (i: Int, ic) ⇒ (String.valueOf(i), "val" + i)) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + for (i ← 0 to 10000) { + val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i)) + + assert(res != null, "Value was not put to cache for key: " + i) + assert("val" + i == res, "Invalid value stored for key: " + i) + } + } + finally { + sc.stop() + } + } + + it("should successfully store data to ignite using saveValues") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () ⇒ configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).saveValues( + sc.parallelize(0 to 10000, 2).map(i ⇒ "val" + i)) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e ⇒ e.getValue) + + for (i ← 0 to 10000) + assert(values.contains("val" + i), "Value not found for index: " + i) + } + finally { + sc.stop() + } + } + + it("should successfully store data to ignite using saveValues with inline transformation") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () ⇒ configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).saveValues( + sc.parallelize(0 to 10000, 2), (i: Int, ic) ⇒ "val" + i) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + val values = ignite.cache[String, String](PARTITIONED_CACHE_NAME).toList.map(e ⇒ e.getValue) + + for (i ← 0 to 10000) + assert(values.contains("val" + i), "Value not found for index: " + i) + } + finally { + sc.stop() + } + } + it("should successfully read data from ignite") { val sc = new SparkContext("local[*]", "test")