Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B685C200C21 for ; Mon, 20 Feb 2017 13:27:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B560E160B62; Mon, 20 Feb 2017 12:27:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8E85160B7D for ; Mon, 20 Feb 2017 13:27:04 +0100 (CET) Received: (qmail 31190 invoked by uid 500); 20 Feb 2017 12:27:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 31005 invoked by uid 99); 20 Feb 2017 12:27:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Feb 2017 12:27:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9B779E04DD; Mon, 20 Feb 2017 12:27:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 20 Feb 2017 12:27:11 -0000 Message-Id: <4e1b21ab23fe41eb8910aebdad733623@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/32] ignite git commit: IGNITE-4526: Add Spark Shared RDD examples Reviewed by Denis Magda archived-at: Mon, 20 Feb 2017 12:27:06 -0000 IGNITE-4526: Add Spark Shared RDD examples Reviewed by Denis Magda Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b461cb47 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b461cb47 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b461cb47 Branch: refs/heads/ignite-4705 Commit: b461cb47882861356ede58775bd9e253dcf26202 Parents: 79e1e53 Author: Manish Mishra Authored: Tue Feb 14 16:54:11 2017 -0800 Committer: Denis Magda Committed: Tue Feb 14 16:54:11 2017 -0800 ---------------------------------------------------------------------- examples/config/spark/example-shared-rdd.xml | 83 ++++++++++++++ examples/pom.xml | 27 ++++- .../examples/java8/spark/SharedRDDExample.java | 110 +++++++++++++++++++ .../examples/spark/ScalarSharedRDDExample.scala | 89 +++++++++++++++ .../examples/SharedRDDExampleSelfTest.java | 36 ++++++ .../IgniteExamplesJ8SelfTestSuite.java | 3 + .../tests/examples/ScalarExamplesSelfTest.scala | 6 + .../apache/ignite/spark/JavaIgniteContext.scala | 6 + 8 files changed, 359 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/config/spark/example-shared-rdd.xml ---------------------------------------------------------------------- diff --git a/examples/config/spark/example-shared-rdd.xml b/examples/config/spark/example-shared-rdd.xml new file mode 100644 index 0000000..83de6a3 --- /dev/null +++ b/examples/config/spark/example-shared-rdd.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + + + + + + + java.lang.Integer + java.lang.Integer + + + + + + + + + + + + + + + + + + + + + 127.0.0.1:47500..47509 + + + + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 3a6a026..1c4ad25 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -17,7 +17,8 @@ limitations under the License. --> - + 4.0.0 @@ -138,6 +139,18 @@ + + + org.apache.ignite + ignite-spark + ${project.version} + + + + org.jboss.netty + netty + 3.2.9.Final + @@ -172,6 +185,18 @@ + + + org.apache.ignite + ignite-spark_2.10 + ${project.version} + + + + org.jboss.netty + netty + 3.2.9.Final + http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java new file mode 100644 index 0000000..392180d --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/spark/SharedRDDExample.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.spark; + +import org.apache.ignite.spark.JavaIgniteContext; +import org.apache.ignite.spark.JavaIgniteRDD; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.DataFrame; +import scala.Tuple2; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * This example demonstrates how to create an JavaIgnitedRDD and share it with multiple spark workers. The goal of this + * particular example is to provide the simplest code example of this logic. + *

+ * This example will start Ignite in the embedded mode and will start an JavaIgniteContext on each Spark worker node. + *

+ * The example can work in the standalone mode as well that can be enabled by setting JavaIgniteContext's + * {@code standalone} property to {@code true} and running an Ignite node separately with + * `examples/config/spark/example-shared-rdd.xml` config. + */ +public class SharedRDDExample { + /** + * Executes the example. + * @param args Command line arguments, none required. + */ + public static void main(String args[]) { + // Spark Configuration. + SparkConf sparkConf = new SparkConf() + .setAppName("JavaIgniteRDDExample") + .setMaster("local") + .set("spark.executor.instances", "2"); + + // Spark context. + JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger().setLevel(Level.ERROR); + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO); + + // Creates Ignite context with specific configuration and runs Ignite in the embedded mode. + JavaIgniteContext igniteContext = new JavaIgniteContext( + sparkContext,"examples/config/spark/example-shared-rdd.xml", false); + + // Create a Java Ignite RDD of Type (Int,Int) Integer Pair. + JavaIgniteRDD sharedRDD = igniteContext.fromCache("sharedRDD"); + + // Define data to be stored in the Ignite RDD (cache). + List data = IntStream.range(0, 20).boxed().collect(Collectors.toList()); + + // Preparing a Java RDD. + JavaRDD javaRDD = sparkContext.parallelize(data); + + // Fill the Ignite RDD in with Int pairs. Here Pairs are represented as Scala Tuple2. + sharedRDD.savePairs(javaRDD.mapToPair(new PairFunction() { + @Override public Tuple2 call(Integer val) throws Exception { + return new Tuple2(val, val); + } + })); + + System.out.println(">>> Iterating over Ignite Shared RDD..."); + + // Iterate over the Ignite RDD. + sharedRDD.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); + + System.out.println(">>> Transforming values stored in Ignite Shared RDD..."); + + // Filter out even values as a transformed RDD. + JavaPairRDD transformedValues = + sharedRDD.filter((Tuple2 pair) -> pair._2() % 2 == 0); + + // Print out the transformed values. + transformedValues.foreach((x) -> System.out.println("(" + x._1 + "," + x._2 + ")")); + + System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); + + // Execute SQL query over the Ignite RDD. + DataFrame df = sharedRDD.sql("select _val from Integer where _key < 9"); + + // Show the result of the execution. + df.show(); + + // Close IgniteContext on all the workers. + igniteContext.close(true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala new file mode 100644 index 0000000..18662e8 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/spark/ScalarSharedRDDExample.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.scalar.examples.spark + +import org.apache.ignite.spark.{IgniteContext, IgniteRDD} +import org.apache.log4j.{Level, Logger} +import org.apache.spark.{SparkConf, SparkContext} + +/** + * This example demonstrates how to create an IgnitedRDD and share it with multiple spark workers. + * The goal of this particular example is to provide the simplest code example of this logic. + *

+ * This example will start Ignite in the embedded mode and will start an IgniteContext on each Spark worker node. + *

+ * The example can work in the standalone mode as well that can be enabled by setting IgniteContext's {@code isClient} + * property to {@code true} and running an Ignite node separately with `examples/config/spark/ + * example-shared-rdd.xml` config. + *

+ */ +object ScalarSharedRDDExample extends App { + // Spark Configuration. + private val conf = new SparkConf() + .setAppName("IgniteRDDExample") + .setMaster("local") + .set("spark.executor.instances", "2") + + // Spark context. + val sparkContext = new SparkContext(conf) + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger.setLevel(Level.ERROR) + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) + + // Defines spring cache Configuration path. + private val CONFIG = "examples/config/spark/example-shared-rdd.xml" + + // Creates Ignite context with above configuration. + val igniteContext = new IgniteContext(sparkContext, CONFIG, false) + + // Creates an Ignite Shared RDD of Type (Int,Int) Integer Pair. + val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD") + + // Fill the Ignite Shared RDD in with Int pairs. + sharedRDD.savePairs(sparkContext.parallelize(1 to 100000, 10).map(i => (i, i))) + + // Transforming Pairs to contain their Squared value. + sharedRDD.mapValues(x => (x * x)) + + // Retrieve sharedRDD back from the Cache. + val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache("sharedRDD") + + // Perform some transformations on IgniteRDD and print. + val squareAndRootPair = transformedValues.map { case (x, y) => (x, Math.sqrt(y.toDouble)) } + + println(">>> Transforming values stored in Ignite Shared RDD...") + + // Filter out pairs which square roots are less than 100 and + // take the first five elements from the transformed IgniteRDD and print them. + squareAndRootPair.filter(_._2 < 100.0).take(5).foreach(println) + + println(">>> Executing SQL query over Ignite Shared RDD...") + + // Execute a SQL query over the Ignite Shared RDD. + val df = transformedValues.sql("select _val from Integer where _val < 100 and _val > 9 ") + + // Show ten rows from the result set. + df.show(10) + + // Close IgniteContext on all workers. + igniteContext.close(true) + + // Stop SparkContext. + sparkContext.stop() +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java new file mode 100644 index 0000000..0fafb4d --- /dev/null +++ b/examples/src/test/java8/org/apache/ignite/java8/examples/SharedRDDExampleSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.java8.examples; + +import org.apache.ignite.examples.java8.spark.SharedRDDExample; +import org.junit.Test; + +/** + * SharedRDD examples self test. + */ +public class SharedRDDExampleSelfTest { + static final String[] EMPTY_ARGS = new String[0]; + /** + * @throws Exception If failed. + */ + @Test + public void testSharedRDDExample() throws Exception { + SharedRDDExample.main(EMPTY_ARGS); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java ---------------------------------------------------------------------- diff --git a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java index 949324c..c32339f 100644 --- a/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java +++ b/examples/src/test/java8/org/apache/ignite/java8/testsuites/IgniteExamplesJ8SelfTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.java8.examples.EventsExamplesMultiNodeSelfTest; import org.apache.ignite.java8.examples.EventsExamplesSelfTest; import org.apache.ignite.java8.examples.IndexingBridgeMethodTest; import org.apache.ignite.java8.examples.MessagingExamplesSelfTest; +import org.apache.ignite.java8.examples.SharedRDDExampleSelfTest; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP; @@ -49,6 +50,8 @@ public class IgniteExamplesJ8SelfTestSuite extends TestSuite { suite.addTest(new TestSuite(IndexingBridgeMethodTest.class)); suite.addTest(new TestSuite(CacheExamplesSelfTest.class)); suite.addTest(new TestSuite(BasicExamplesSelfTest.class)); + suite.addTest(new TestSuite(SharedRDDExampleSelfTest.class)); + // suite.addTest(new TestSuite(ContinuationExamplesSelfTest.class)); // suite.addTest(new TestSuite(ContinuousMapperExamplesSelfTest.class)); // suite.addTest(new TestSuite(DeploymentExamplesSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala index 94c41ad..28e509e 100644 --- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala +++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala @@ -18,6 +18,7 @@ package org.apache.ignite.scalar.tests.examples import org.apache.ignite.scalar.examples._ +import org.apache.ignite.scalar.examples.spark._ import org.apache.ignite.scalar.scalar import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest import org.scalatest.junit.JUnitSuiteLike @@ -95,4 +96,9 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik def testScalarSnowflakeSchemaExample() { ScalarSnowflakeSchemaExample.main(EMPTY_ARGS) } + + /** */ + def testScalarSharedRDDExample() { + ScalarSharedRDDExample.main(EMPTY_ARGS) + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b461cb47/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala index 689a22d..d8a521b 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -51,6 +51,12 @@ class JavaIgniteContext[K, V]( }) } + def this(sc: JavaSparkContext, springUrl: String, standalone: Boolean) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }, standalone) + } + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null, false))