Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2700D18788 for ; Fri, 12 Jun 2015 11:17:11 +0000 (UTC) Received: (qmail 3957 invoked by uid 500); 12 Jun 2015 11:17:11 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 3928 invoked by uid 500); 12 Jun 2015 11:17:11 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 3919 invoked by uid 99); 12 Jun 2015 11:17:11 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jun 2015 11:17:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 63BC1C0DCD for ; Fri, 12 Jun 2015 11:17:10 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rK3UdlRIwFHo for ; Fri, 12 Jun 2015 11:16:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 89F1C24C28 for ; Fri, 12 Jun 2015 11:16:57 +0000 (UTC) Received: (qmail 2142 invoked by uid 99); 12 Jun 2015 11:16:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Jun 2015 11:16:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98E84DFF09; Fri, 12 Jun 2015 11:16:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 12 Jun 2015 11:16:58 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/24] incubator-ignite git commit: ignite-sprint-6: merge from ignite-sprint-5 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/licenses/apache-2.0.txt ---------------------------------------------------------------------- diff --git a/modules/spark/licenses/apache-2.0.txt b/modules/spark/licenses/apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/spark/licenses/apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/licenses/scala-bsd-license.txt ---------------------------------------------------------------------- diff --git a/modules/spark/licenses/scala-bsd-license.txt b/modules/spark/licenses/scala-bsd-license.txt new file mode 100644 index 0000000..b2be111 --- /dev/null +++ b/modules/spark/licenses/scala-bsd-license.txt @@ -0,0 +1,18 @@ +Copyright (c) 2002-2014 EPFL +Copyright (c) 2011-2014 Typesafe, Inc. + +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. +Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. Neither the name of the EPFL nor the names of its contributors may be used to endorse or promote products +derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +DAMAGE. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/pom.xml ---------------------------------------------------------------------- diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml new file mode 100644 index 0000000..8900a10 --- /dev/null +++ b/modules/spark/pom.xml @@ -0,0 +1,114 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-spark + 1.1.1-SNAPSHOT + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + org.scala-lang + scala-library + 2.11.2 + + + + org.apache.spark + spark-core_2.11 + 1.3.1 + + + + org.apache.spark + spark-sql_2.11 + 1.3.1 + + + + + + org.scalatest + scalatest_2.11 + 2.2.2 + test + + + org.scala-lang + scala-library + + + + + + org.apache.ignite + ignite-indexing + ${project.version} + test + + + + org.springframework + spring-beans + ${spring.version} + test + + + + org.springframework + spring-context + ${spring.version} + test + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala new file mode 100644 index 0000000..e52555a --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + + +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.{Ignition, Ignite} +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.sql.SQLContext + +/** + * Ignite context. + * + * @param sparkContext Spark context. + * @param cfgF Configuration factory. + * @tparam K Key type. + * @tparam V Value type. + */ +class IgniteContext[K, V]( + @scala.transient val sparkContext: SparkContext, + cfgF: () ⇒ IgniteConfiguration, + client: Boolean = true +) extends Serializable with Logging { + @scala.transient private val driver = true + + if (!client) { + val workers = sparkContext.getExecutorStorageStatus.length - 1 + + if (workers <= 0) + throw new IllegalStateException("No Spark executors found to start Ignite nodes.") + + logInfo("Will start Ignite nodes on " + workers + " workers") + + // Start ignite server node on each worker in server mode. + sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite()) + } + + def this( + sc: SparkContext, + springUrl: String + ) { + this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1()) + } + + val sqlContext = new SQLContext(sparkContext) + + /** + * Creates an `IgniteRDD` instance from the given cache name. If the cache does not exist, it will be + * automatically started from template on the first invoked RDD action. + * + * @param cacheName Cache name. + * @return `IgniteRDD` instance. + */ + def fromCache(cacheName: String): IgniteRDD[K, V] = { + new IgniteRDD[K, V](this, cacheName, null) + } + + /** + * Creates an `IgniteRDD` instance from the given cache configuration. If the cache does not exist, it will be + * automatically started using the configuration provided on the first invoked RDD action. + * + * @param cacheCfg Cache configuration to use. + * @return `IgniteRDD` instance. + */ + def fromCache(cacheCfg: CacheConfiguration[K, V]) = { + new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg) + } + + /** + * Gets an Ignite instance supporting this context. Ignite instance will be started + * if it has not been started yet. + * + * @return Ignite instance. + */ + def ignite(): Ignite = { + val igniteCfg = cfgF() + + try { + Ignition.ignite(igniteCfg.getGridName) + } + catch { + case e: Exception ⇒ + try { + igniteCfg.setClientMode(client || driver) + + Ignition.start(igniteCfg) + } + catch { + case e: Exception ⇒ Ignition.ignite(igniteCfg.getGridName) + } + } + } + + /** + * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be + * a no-op. + */ + def close() = { + val igniteCfg = cfgF() + + Ignition.stop(igniteCfg.getGridName, false) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala new file mode 100644 index 0000000..2146acb --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.spark + +import javax.cache.Cache + +import org.apache.ignite.cache.query._ +import org.apache.ignite.cluster.ClusterNode +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.internal.processors.cache.query.QueryCursorEx +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata +import org.apache.ignite.lang.IgniteUuid +import org.apache.ignite.spark.impl._ +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types._ +import org.apache.spark.sql._ +import org.apache.spark._ + +import scala.collection.JavaConversions._ + +/** + * Ignite RDD. Represents Ignite cache as Spark RDD abstraction. + * + * @param ic Ignite context to use. + * @param cacheName Cache name. + * @param cacheCfg Cache configuration. + * @tparam K Key type. + * @tparam V Value type. + */ +class IgniteRDD[K, V] ( + val ic: IgniteContext[K, V], + val cacheName: String, + val cacheCfg: CacheConfiguration[K, V] +) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) { + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + val cache = ensureCache() + + val qry: ScanQuery[K, V] = new ScanQuery[K, V](part.index) + + val partNodes = ic.ignite().affinity(cache.getName).mapPartitionToPrimaryAndBackups(part.index) + + val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator() + + new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry ⇒ { + (entry.getKey, entry.getValue) + }) + } + + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ + override protected[spark] def getPartitions: Array[Partition] = { + ensureCache() + + val parts = ic.ignite().affinity(cacheName).partitions() + + (0 until parts).map(new IgnitePartition(_)).toArray + } + + /** + * Gets preferred locations for the given partition. + * + * @param split Split partition. + * @return + */ + override protected[spark] def getPreferredLocations(split: Partition): Seq[String] = { + ensureCache() + + ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index) + .map(_.asInstanceOf[TcpDiscoveryNode].socketAddresses()).flatten.map(_.getHostName).toList + } + + /** + * Runs an object SQL on corresponding Ignite cache. + * + * @param typeName Type name to run SQL against. + * @param sql SQL query to run. + * @param args Optional SQL query arguments. + * @return RDD with query results. + */ + def objectSql(typeName: String, sql: String, args: Any*): RDD[(K, V)] = { + val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql) + + qry.setArgs(args.map(_.asInstanceOf[Object]):_*) + + new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry ⇒ (entry.getKey, entry.getValue)) + } + + /** + * Runs an SQL fields query. + * + * @param sql SQL statement to run. + * @param args Optional SQL query arguments. + * @return `DataFrame` instance with the query results. + */ + def sql(sql: String, args: Any*): DataFrame = { + val qry = new SqlFieldsQuery(sql) + + qry.setArgs(args.map(_.asInstanceOf[Object]):_*) + + val schema = buildSchema(ensureCache().query(qry).asInstanceOf[QueryCursorEx[java.util.List[_]]].fieldsMeta()) + + val rowRdd = new IgniteSqlRDD[Row, java.util.List[_], K, V](ic, cacheName, cacheCfg, qry, list ⇒ Row.fromSeq(list)) + + ic.sqlContext.createDataFrame(rowRdd, schema) + } + + /** + * Saves values from given RDD into Ignite. A unique key will be generated for each value of the given RDD. + * + * @param rdd RDD instance to save values from. + */ + def saveValues(rdd: RDD[V]) = { + rdd.foreachPartition(it ⇒ { + val ig = ic.ignite() + + ensureCache() + + val locNode = ig.cluster().localNode() + + val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode)) + + val streamer = ig.dataStreamer[Object, V](cacheName) + + try { + it.foreach(value ⇒ { + val key = affinityKeyFunc(value, node.orNull) + + streamer.addData(key, value) + }) + } + finally { + streamer.close() + } + }) + } + + /** + * Saves values from the given key-value RDD into Ignite. + * + * @param rdd RDD instance to save values from. + * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing + * values in Ignite cache. + */ + def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = { + rdd.foreachPartition(it ⇒ { + val ig = ic.ignite() + + // Make sure to deploy the cache + ensureCache() + + val streamer = ig.dataStreamer[K, V](cacheName) + + try { + streamer.allowOverwrite(overwrite) + + it.foreach(tup ⇒ { + streamer.addData(tup._1, tup._2) + }) + } + finally { + streamer.close() + } + }) + } + + /** + * Removes all values from the underlying Ignite cache. + */ + def clear(): Unit = { + ensureCache().removeAll() + } + + /** + * Builds spark schema from query metadata. + * + * @param fieldsMeta Fields metadata. + * @return Spark schema. + */ + private def buildSchema(fieldsMeta: java.util.List[GridQueryFieldMetadata]): StructType = { + new StructType(fieldsMeta.map(i ⇒ new StructField(i.fieldName(), dataType(i.fieldTypeName()), nullable = true)) + .toArray) + } + + /** + * Gets Spark data type based on type name. + * + * @param typeName Type name. + * @return Spark data type. + */ + private def dataType(typeName: String): DataType = typeName match { + case "java.lang.Boolean" ⇒ BooleanType + case "java.lang.Byte" ⇒ ByteType + case "java.lang.Short" ⇒ ShortType + case "java.lang.Integer" ⇒ IntegerType + case "java.lang.Long" ⇒ LongType + case "java.lang.Float" ⇒ FloatType + case "java.lang.Double" ⇒ DoubleType + case "java.lang.String" ⇒ StringType + case "java.util.Date" ⇒ DateType + case "java.sql.Timestamp" ⇒ TimestampType + case "[B" ⇒ BinaryType + + case _ ⇒ StructType(new Array[StructField](0)) + } + + /** + * Generates affinity key for given cluster node. + * + * @param value Value to generate key for. + * @param node Node to generate key for. + * @return Affinity key. + */ + private def affinityKeyFunc(value: V, node: ClusterNode): IgniteUuid = { + val aff = ic.ignite().affinity[IgniteUuid](cacheName) + + Stream.from(1, 1000).map(_ ⇒ IgniteUuid.randomUuid()).find(node == null || aff.mapKeyToNode(_).eq(node)) + .getOrElse(IgniteUuid.randomUuid()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala new file mode 100644 index 0000000..e2d57bf --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignite +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.lang.IgniteOutClosure +import org.apache.spark.api.java.JavaSparkContext + +import scala.reflect.ClassTag + +/** + * Java-friendly Ignite context wrapper. + * + * @param sc Java Spark context. + * @param cfgF Configuration factory. + * @tparam K Key type. + * @tparam V Value type. + */ +class JavaIgniteContext[K, V]( + @scala.transient val sc: JavaSparkContext, + val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable { + + @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply()) + + def this(sc: JavaSparkContext, springUrl: String) { + this(sc, new IgniteOutClosure[IgniteConfiguration] { + override def apply() = IgnitionEx.loadConfiguration(springUrl).get1() + }) + } + + def fromCache(cacheName: String): JavaIgniteRDD[K, V] = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null)) + + def fromCache(cacheCfg: CacheConfiguration[K, V]) = + JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg)) + + def ignite(): Ignite = ic.ignite() + + def close() = ic.close() + + private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + implicit val ktag: ClassTag[K] = fakeClassTag + + implicit val vtag: ClassTag[V] = fakeClassTag +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala new file mode 100644 index 0000000..2e8702e --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import java.util + +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.{Partition, TaskContext} + +import scala.annotation.varargs +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.reflect.ClassTag + +/** + * Java-friendly Ignite RDD wrapper. Represents Ignite cache as Java Spark RDD abstraction. + * + * @param rdd Ignite RDD instance. + * @tparam K Key type. + * @tparam V Value type. + */ +class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) + extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag) { + + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) + + override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag + + /** + * Computes iterator based on given partition. + * + * @param part Partition to use. + * @param context Task context. + * @return Partition iterator. + */ + def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { + rdd.compute(part, context) + } + + /** + * Gets partitions for the given cache RDD. + * + * @return Partitions. + */ + protected def getPartitions: java.util.List[Partition] = { + new util.ArrayList[Partition](rdd.getPartitions.toSeq) + } + + /** + * Gets preferred locations for the given partition. + * + * @param split Split partition. + * @return + */ + protected def getPreferredLocations(split: Partition): Seq[String] = { + rdd.getPreferredLocations(split) + } + + @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V] = + JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args:_*)) + + @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args:_*) + + def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd)) + + def savePairs(jrdd: JavaPairRDD[K, V]) = { + val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) + + rdd.savePairs(rrdd) + } + + def clear(): Unit = rdd.clear() +} + +object JavaIgniteRDD { + implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K, V] = + new JavaIgniteRDD[K, V](rdd) + + implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd + + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala new file mode 100644 index 0000000..25b3b56 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteAbstractRDD.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.IgniteCache +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag + +abstract class IgniteAbstractRDD[R:ClassTag, K, V] ( + ic: IgniteContext[K, V], + cacheName: String, + cacheCfg: CacheConfiguration[K, V] +) extends RDD[R] (ic.sparkContext, deps = Nil) { + protected def ensureCache(): IgniteCache[K, V] = { + // Make sure to deploy the cache + if (cacheCfg != null) + ic.ignite().getOrCreateCache(cacheCfg) + else + ic.ignite().getOrCreateCache(cacheName) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala new file mode 100644 index 0000000..2def636 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgnitePartition.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.spark.Partition + +class IgnitePartition(idx: Int) extends Partition { + override def index: Int = idx +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala new file mode 100644 index 0000000..4165fd3 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +class IgniteQueryIterator[T, R] ( + cur: java.util.Iterator[T], + conv: (T) ⇒ R +) extends Iterator[R] { + override def hasNext: Boolean = cur.hasNext + + override def next(): R = conv(cur.next()) +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala new file mode 100644 index 0000000..762a6ed --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.cache.query.Query +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.spark.IgniteContext +import org.apache.spark.{TaskContext, Partition} + +import scala.reflect.ClassTag + +class IgniteSqlRDD[R: ClassTag, T, K, V]( + ic: IgniteContext[K, V], + cacheName: String, + cacheCfg: CacheConfiguration[K, V], + qry: Query[T], + conv: (T) ⇒ R +) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) { + override def compute(split: Partition, context: TaskContext): Iterator[R] = { + new IgniteQueryIterator[T, R](ensureCache().query(qry).iterator(), conv) + } + + override protected def getPartitions: Array[Partition] = { + Array(new IgnitePartition(0)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala new file mode 100644 index 0000000..13bd3e8 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.IgniteCache +import org.apache.ignite.spark.IgniteRDD +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike} + +abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V]) + extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { + + protected def ensureCache(): IgniteCache[K, V] = { + // Make sure to deploy the cache + if (rdd.cacheCfg != null) + rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg) + else + rdd.ic.ignite().getOrCreateCache(rdd.cacheName) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java new file mode 100644 index 0000000..e14abfc --- /dev/null +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.*; + +import scala.*; + +import java.util.*; + +/** + * Tests for {@link JavaIgniteRDD}. + */ +public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Cache name. */ + private static final String PARTITIONED_CACHE_NAME = "partitioned"; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Sum function. */ + private static final Function2 SUM_F = new Function2() { + public Integer call(Integer x, Integer y) { + return x + y; + } + }; + + /** To pair function. */ + private static final PairFunction TO_PAIR_F = new PairFunction() { + /** {@inheritDoc} */ + @Override public Tuple2 call(Integer i) { + return new Tuple2<>(String.valueOf(i), "val" + i); + } + }; + + /** (String, Integer); pair to Integer value function. */ + private static final Function, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>(); + + /** (String, Entity) pair to Entity value function. */ + private static final Function, Entity> STR_ENTITY_PAIR_TO_ENTITY_F = + new PairToValueFunction<>(); + + /** Integer to entity function. */ + private static final PairFunction INT_TO_ENTITY_F = + new PairFunction() { + @Override public Tuple2 call(Integer i) throws Exception { + return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100)); + } + }; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + Ignition.stop("client", false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.start(getConfiguration("grid-" + i, false)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + for (int i = 0; i < GRID_CNT; i++) + Ignition.stop("grid-" + i, false); + } + + /** + * @throws Exception If failed. + */ + public void testStoreDataToIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + ic.fromCache(PARTITIONED_CACHE_NAME) + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F)); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) { + String val = cache.get(String.valueOf(i)); + + assertNotNull("Value was not put to cache for key: " + i, val); + assertEquals("Invalid value stored for key: " + i, "val" + i, val); + } + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadDataFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + Ignite ignite = Ignition.ignite("grid-0"); + + IgniteCache cache = ignite.cache(PARTITIONED_CACHE_NAME); + + for (int i = 0; i < KEYS_CNT; i++) + cache.put(String.valueOf(i), i); + + JavaRDD values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F); + + int sum = values.fold(0, SUM_F); + + int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT; + + assertEquals(expSum, sum); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryObjectsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + List res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) + .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); + + assertEquals("Invalid result length", 1, res.size()); + assertEquals("Invalid result", 50, res.get(0).id()); + assertEquals("Invalid result", "name50", res.get(0).name()); + assertEquals("Invalid result", 5000, res.get(0).salary()); + assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count()); + } + finally { + sc.stop(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryFieldsFromIgnite() throws Exception { + JavaSparkContext sc = new JavaSparkContext("local[*]", "test"); + + try { + JavaIgniteContext ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider()); + + JavaIgniteRDD cache = ic.fromCache(PARTITIONED_CACHE_NAME); + + cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F)); + + DataFrame df = + cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000); + + df.printSchema(); + + Row[] res = df.collect(); + + assertEquals("Invalid result length", 1, res.length); + assertEquals("Invalid result", 50, res[0].get(0)); + assertEquals("Invalid result", "name50", res[0].get(1)); + assertEquals("Invalid result", 5000, res[0].get(2)); + + Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000)); + + DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp); + + df.printSchema(); + + Row[] res0 = df0.collect(); + + assertEquals("Invalid result length", 1, res0.length); + assertEquals("Invalid result", 50, res0[0].get(0)); + assertEquals("Invalid result", "name50", res0[0].get(1)); + assertEquals("Invalid result", 5000, res0[0].get(2)); + + assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count()); + } + finally { + sc.stop(); + } + + } + + /** + * @param gridName Grid name. + * @param client Client. + */ + private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setClientMode(client); + + cfg.setGridName(gridName); + + return cfg; + } + + /** + * Creates cache configuration. + */ + private static CacheConfiguration cacheConfiguration() { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setBackups(1); + + ccfg.setName(PARTITIONED_CACHE_NAME); + + ccfg.setIndexedTypes(String.class, Entity.class); + + return ccfg; + } + + /** + * Ignite configiration provider. + */ + static class IgniteConfigProvider implements IgniteOutClosure { + /** {@inheritDoc} */ + @Override public IgniteConfiguration apply() { + try { + return getConfiguration("client", true); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * @param + * @param + */ + static class PairToValueFunction implements Function, V> { + /** {@inheritDoc} */ + @Override public V call(Tuple2 t) throws Exception { + return t._2(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala new file mode 100644 index 0000000..00beac6 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/Entity.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.spark.IgniteRddSpec.ScalarCacheQuerySqlField + +class Entity ( + @ScalarCacheQuerySqlField(index = true) val id: Int, + @ScalarCacheQuerySqlField(index = true) val name: String, + @ScalarCacheQuerySqlField(index = true) val salary: Int +) extends Serializable { + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala new file mode 100644 index 0000000..26ce693 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteRddSpec.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.Ignition +import org.apache.ignite.cache.query.annotations.{QueryTextField, QuerySqlField} +import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration} +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder +import org.apache.spark.SparkContext +import org.junit.runner.RunWith +import org.scalatest._ +import org.scalatest.junit.JUnitRunner + +import IgniteRddSpec._ + +import scala.annotation.meta.field + +@RunWith(classOf[JUnitRunner]) +class IgniteRddSpec extends FunSpec with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { + describe("IgniteRDD") { + it("should successfully store data to ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, String](sc, + () ⇒ configuration("client", client = true)) + + // Save pairs ("0", "val0"), ("1", "val1"), ... to Ignite cache. + ic.fromCache(PARTITIONED_CACHE_NAME).savePairs(sc.parallelize(0 to 10000, 2).map(i ⇒ (String.valueOf(i), "val" + i))) + + // Check cache contents. + val ignite = Ignition.ignite("grid-0") + + for (i ← 0 to 10000) { + val res = ignite.cache[String, String](PARTITIONED_CACHE_NAME).get(String.valueOf(i)) + + assert(res != null, "Value was not put to cache for key: " + i) + assert("val" + i == res, "Invalid value stored for key: " + i) + } + } + finally { + sc.stop() + } + } + + it("should successfully read data from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val cache = Ignition.ignite("grid-0").cache[String, Int](PARTITIONED_CACHE_NAME) + + val num = 10000 + + for (i ← 0 to num) { + cache.put(String.valueOf(i), i) + } + + val ic = new IgniteContext[String, Int](sc, + () ⇒ configuration("client", client = true)) + + val res = ic.fromCache(PARTITIONED_CACHE_NAME).map(_._2).sum() + + assert(res == (0 to num).sum) + } + finally { + sc.stop() + } + } + + it("should successfully query objects from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, + () ⇒ configuration("client", client = true)) + + val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + + val res: Array[Entity] = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000).map(_._2).collect() + + assert(res.length == 1, "Invalid result length") + assert(50 == res(0).id, "Invalid result") + assert("name50" == res(0).name, "Invalid result") + assert(5000 == res(0).salary) + + assert(500 == cache.objectSql("Entity", "id > 500").count(), "Invalid count") + } + finally { + sc.stop() + } + } + + it("should successfully query fields from ignite") { + val sc = new SparkContext("local[*]", "test") + + try { + val ic = new IgniteContext[String, Entity](sc, + () ⇒ configuration("client", client = true)) + + val cache: IgniteRDD[String, Entity] = ic.fromCache(PARTITIONED_CACHE_NAME) + + import ic.sqlContext.implicits._ + + cache.savePairs(sc.parallelize(0 to 1000, 2).map(i ⇒ (String.valueOf(i), new Entity(i, "name" + i, i * 100)))) + + val df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000) + + df.printSchema() + + val res = df.collect() + + assert(res.length == 1, "Invalid result length") + assert(50 == res(0)(0), "Invalid result") + assert("name50" == res(0)(1), "Invalid result") + assert(5000 == res(0)(2), "Invalid result") + + val df0 = cache.sql("select id, name, salary from Entity").where('NAME === "name50" and 'SALARY === 5000) + + val res0 = df0.collect() + + assert(res0.length == 1, "Invalid result length") + assert(50 == res0(0)(0), "Invalid result") + assert("name50" == res0(0)(1), "Invalid result") + assert(5000 == res0(0)(2), "Invalid result") + + assert(500 == cache.sql("select id from Entity where id > 500").count(), "Invalid count") + } + finally { + sc.stop() + } + } + } + + override protected def beforeEach() = { + Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll() + } + + override protected def afterEach() = { + Ignition.stop("client", false) + } + + override protected def beforeAll() = { + for (i ← 0 to 3) { + Ignition.start(configuration("grid-" + i, client = false)) + } + } + + override protected def afterAll() = { + for (i ← 0 to 3) { + Ignition.stop("grid-" + i, false) + } + } +} + +/** + * Constants and utility methods. + */ +object IgniteRddSpec { + /** IP finder for the test. */ + val IP_FINDER = new TcpDiscoveryVmIpFinder(true) + + /** Partitioned cache name. */ + val PARTITIONED_CACHE_NAME = "partitioned" + + /** Type alias for `QuerySqlField`. */ + type ScalarCacheQuerySqlField = QuerySqlField @field + + /** Type alias for `QueryTextField`. */ + type ScalarCacheQueryTextField = QueryTextField @field + + /** + * Gets ignite configuration. + * + * @param gridName Grid name. + * @param client Client mode flag. + * @return Ignite configuration. + */ + def configuration(gridName: String, client: Boolean): IgniteConfiguration = { + val cfg = new IgniteConfiguration + + val discoSpi = new TcpDiscoverySpi + + discoSpi.setIpFinder(IgniteRddSpec.IP_FINDER) + + cfg.setDiscoverySpi(discoSpi) + + cfg.setCacheConfiguration(cacheConfiguration(gridName)) + + cfg.setClientMode(client) + + cfg.setGridName(gridName) + + cfg + } + + /** + * Gets cache configuration for the given grid name. + * + * @param gridName Grid name. + * @return Cache configuration. + */ + def cacheConfiguration(gridName: String): CacheConfiguration[Object, Object] = { + val ccfg = new CacheConfiguration[Object, Object]() + + ccfg.setBackups(1) + + ccfg.setName(PARTITIONED_CACHE_NAME) + + ccfg.setIndexedTypes(classOf[String], classOf[Entity]) + + ccfg + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/visor-console-2.10/README.txt ---------------------------------------------------------------------- diff --git a/modules/visor-console-2.10/README.txt b/modules/visor-console-2.10/README.txt new file mode 100644 index 0000000..1a018b9 --- /dev/null +++ b/modules/visor-console-2.10/README.txt @@ -0,0 +1,4 @@ +Apache Ignite Visor Console Module +--------------------------- + +Apache Ignite Visor Console module to be build with Scala 2.10. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/modules/visor-console-2.10/pom.xml ---------------------------------------------------------------------- diff --git a/modules/visor-console-2.10/pom.xml b/modules/visor-console-2.10/pom.xml new file mode 100644 index 0000000..f0df657 --- /dev/null +++ b/modules/visor-console-2.10/pom.xml @@ -0,0 +1,174 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-visor-console_2.10 + 1.1.1-SNAPSHOT + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.apache.ignite + ignite-ssh + ${project.version} + + + + org.apache.ignite + ignite-spring + ${project.version} + + + + org.springframework + spring-core + ${spring.version} + + + + org.springframework + spring-beans + ${spring.version} + + + + org.springframework + spring-context + ${spring.version} + + + + org.springframework + spring-expression + ${spring.version} + + + + org.scala-lang + scala-library + 2.10.4 + + + + org.scala-lang + jline + 2.10.4 + + + + + + org.apache.ignite + ignite-indexing + ${project.version} + test + + + + org.scalatest + scalatest_2.10 + 2.2.2 + test + + + org.scala-lang + scala-library + + + + + + + + + + ../visor-console/src/main/scala + + **/*.scala + + + + + + + ../visor-console/src/test/scala + + **/*.scala + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5d007e3/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index a514e35..f5b73df 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -321,6 +321,10 @@ Mesos Framework org.apache.ignite.mesos* + + Spark Integration + org.apache.ignite.spark.examples.java +
scala - true - [1.7,) + !scala-2.10 modules/scalar + modules/spark modules/visor-console modules/visor-plugins + scala-2.10 + + + scala-2.10 + + + + modules/scalar-2.10 + modules/spark-2.10 + modules/visor-console-2.10 + modules/visor-plugins + + + + + lgpl modules/hibernate