ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-948 Add Java API for Ignite RDD
Date Wed, 03 Jun 2015 17:05:14 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-948 29c6d28a5 -> 4d307635d (forced update)


ignite-948 Add Java API for Ignite RDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d307635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d307635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d307635

Branch: refs/heads/ignite-948
Commit: 4d307635d9230ccec01d207ca87d71a860329166
Parents: 2aa1ace
Author: agura <agura@gridgain.com>
Authored: Tue Jun 2 01:09:17 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Wed Jun 3 20:04:46 2015 +0300

----------------------------------------------------------------------
 modules/spark/pom.xml                           |  14 +
 .../spark/examples/java/ColocationTest.java     |  94 ++++++
 .../examples/java/IgniteProcessExample.java     |  82 +++++
 .../spark/examples/java/IgniteStoreExample.java |  72 +++++
 .../spark/examples/java/package-info.java       |  21 ++
 .../org/apache/ignite/spark/IgniteRDD.scala     |  10 +-
 .../apache/ignite/spark/JavaIgniteContext.scala |  55 ++++
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |  94 ++++++
 .../spark/impl/JavaIgniteAbstractRDD.scala      |  34 +++
 .../ignite/spark/JavaIgniteRDDSelfTest.java     | 298 +++++++++++++++++++
 parent/pom.xml                                  |   4 +
 11 files changed, 773 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/pom.xml
----------------------------------------------------------------------
diff --git a/modules/spark/pom.xml b/modules/spark/pom.xml
index a4a25f5..a36ee87 100644
--- a/modules/spark/pom.xml
+++ b/modules/spark/pom.xml
@@ -87,6 +87,20 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>${spring.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
new file mode 100644
index 0000000..839bfc7
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/ColocationTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+
+import scala.Tuple2;
+
+import java.util.*;
+
+/**
+ * Colocation test example.
+ */
+public class ColocationTest {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Colocation test");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Colocation test", conf);
+
+        JavaIgniteContext<Integer, Integer> ignite = new JavaIgniteContext<>(sc,
new IgniteConfigProvider());
+
+        JavaIgniteRDD<Integer, Integer> cache = ignite.fromCache("partitioned");
+
+        List<Integer> seq = new ArrayList<>();
+
+        long sum = 0;
+
+        for (int i = 0; i < 100000; i++) {
+            seq.add(i);
+
+            sum += i;
+        }
+
+        IgniteClosure<Integer, Tuple2<Integer, Integer>> f = new IgniteClosure<Integer,
Tuple2<Integer, Integer>>() {
+            @Override public Tuple2<Integer, Integer> apply(Integer i) {
+                return new Tuple2<>(i, i);
+            }
+        };
+
+        JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(F.transformList(seq,
f), 48);
+
+        cache.savePairs(rdd);
+
+        // Execute parallel sum.
+        System.out.println("Local sum: " + sum);
+
+        System.out.println("Distributed sum: " + cache.map(new Function<Tuple2<Integer,Integer>,
Integer>() {
+            @Override public Integer call(Tuple2<Integer, Integer> t) throws Exception
{
+                return t._2();
+            }
+        }).fold(0, new Function2<Integer, Integer, Integer>() {
+            public Integer call(Integer x, Integer y) {
+                return x + y;
+            }
+        }));
+    }
+
+    /**
+     * Ignite configiration provider.
+     */
+    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration>
{
+        /** {@inheritDoc} */
+        @Override public IgniteConfiguration apply() {
+            return ExampleConfiguration.configuration();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
new file mode 100644
index 0000000..e69ca5f
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteProcessExample.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.lang.Boolean;
+import java.util.*;
+
+/**
+ * Ignite process example.
+ */
+public class IgniteProcessExample {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example",
conf);
+
+        JavaIgniteContext<Object, String> ignite = new JavaIgniteContext<>(sc,
new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+        // Search for lines containing "Ignite".
+        JavaIgniteRDD<Object, String> scanRdd = ignite.fromCache("partitioned");
+
+        JavaRDD<String> processedRdd = scanRdd.filter(new Function<Tuple2<Object,
String>, Boolean>() {
+            @Override public Boolean call(Tuple2<Object, String> t) throws Exception
{
+                System.out.println("Analyzing line: " + t._2());
+
+                t._2().contains("Ignite");
+
+                return true;
+            }
+        }).map(new Function<Tuple2<Object, String>, String>() {
+            @Override public String call(Tuple2<Object, String> t) throws Exception
{
+                return t._2();
+            }
+        });
+
+        // Create a new cache for results.
+        JavaIgniteRDD<Object, String> results = ignite.fromCache("results");
+
+        results.saveValues(processedRdd);
+
+        // SQL query
+        ignite.fromCache("indexed").objectSql("Person", "age > ? and organizationId =
?", 20, 12).collect();
+
+        // SQL fields query
+        DataFrame df = ignite.fromCache("indexed").sql("select name, age from Person where
age > ?", 20);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
new file mode 100644
index 0000000..c1299ef
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/IgniteStoreExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.examples.java;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spark.*;
+import org.apache.ignite.spark.examples.*;
+import org.apache.spark.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+
+import scala.*;
+
+import java.lang.*;
+import java.lang.Boolean;
+
+/**
+ * Ignite store example.
+ */
+public class IgniteStoreExample {
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) {
+        SparkConf conf = new SparkConf();
+
+        conf.setAppName("Ignite processing example");
+
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "Ignite processing example",
conf);
+
+        JavaIgniteContext<String, String> ignite = new JavaIgniteContext<>(sc,
new IgniteOutClosure<IgniteConfiguration>() {
+            @Override public IgniteConfiguration apply() {
+                return ExampleConfiguration.configuration();
+            }
+        });
+
+
+        JavaRDD<String> lines = sc.textFile(args[0]).filter(new Function<String,
Boolean>() {
+            @Override public Boolean call(String s) throws Exception {
+                System.out.println("Read line: " + s);
+
+                return s.contains("Ignite");
+            }
+        });
+
+        ignite.fromCache("partitioned").saveValues(lines);
+
+        ignite.fromCache("partitioned").savePairs(lines.mapToPair(new PairFunction<String,
String, String>() {
+            @Override public Tuple2<String, String> call(String s) throws Exception
{
+                return new Tuple2<>(s, s);
+            }
+        }));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
new file mode 100644
index 0000000..e3243bf
--- /dev/null
+++ b/modules/spark/src/main/java/org/apache/ignite/spark/examples/java/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Demonstrates usage of Ignite and Spark from Java.
+ */
+package org.apache.ignite.spark.examples.java;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..742d7ee 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -43,9 +43,9 @@ import scala.collection.JavaConversions._
  * @tparam V Value type.
  */
 class IgniteRDD[K, V] (
-    ic: IgniteContext[K, V],
-    cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    val ic: IgniteContext[K, V],
+    val cacheName: String,
+    val cacheCfg: CacheConfiguration[K, V]
 ) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
     /**
      * Computes iterator based on given partition.
@@ -73,7 +73,7 @@ class IgniteRDD[K, V] (
      *
      * @return Partitions.
      */
-    override protected def getPartitions: Array[Partition] = {
+    override protected[spark] def getPartitions: Array[Partition] = {
         ensureCache()
 
         val parts = ic.ignite().affinity(cacheName).partitions()
@@ -87,7 +87,7 @@ class IgniteRDD[K, V] (
      * @param split Split partition.
      * @return
      */
-    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+    override protected[spark] def getPreferredLocations(split: Partition): Seq[String] =
{
         ensureCache()
 
         ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
new file mode 100644
index 0000000..6c52e65
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.Ignite
+import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.lang.IgniteOutClosure
+import org.apache.spark.api.java.JavaSparkContext
+
+import scala.reflect.ClassTag
+
+class JavaIgniteContext[K, V](
+    @scala.transient val sc: JavaSparkContext,
+    val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+
+    @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+
+    def this(sc: JavaSparkContext, springUrl: String) {
+        this(sc, new IgniteOutClosure[IgniteConfiguration] {
+            override def apply() = IgnitionEx.loadConfiguration(springUrl).get1()
+        })
+    }
+
+    def fromCache(cacheName: String): JavaIgniteRDD[K, V] =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheName, null))
+
+    def fromCache(cacheCfg: CacheConfiguration[K, V]) =
+        JavaIgniteRDD.fromIgniteRDD(new IgniteRDD[K, V](ic, cacheCfg.getName, cacheCfg))
+
+    def ignite(): Ignite = ic.ignite()
+
+    def close() = ic.close()
+
+    private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+    implicit val ktag: ClassTag[K] = fakeClassTag
+
+    implicit val vtag: ClassTag[V] = fakeClassTag
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
new file mode 100644
index 0000000..0b42bb1
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import java.util
+
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.annotation.varargs
+import scala.collection.JavaConversions._
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+
+class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V])
+    extends JavaPairRDD[K, V](rdd)(JavaIgniteRDD.fakeClassTag, JavaIgniteRDD.fakeClassTag)
{
+    //with JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
+
+    override val classTag: ClassTag[(K, V)] = JavaIgniteRDD.fakeClassTag
+
+    /**
+     * Computes iterator based on given partition.
+     *
+     * @param part Partition to use.
+     * @param context Task context.
+     * @return Partition iterator.
+     */
+    def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+        rdd.compute(part, context)
+    }
+
+    /**
+     * Gets partitions for the given cache RDD.
+     *
+     * @return Partitions.
+     */
+    protected def getPartitions: java.util.List[Partition] = {
+        new util.ArrayList[Partition](rdd.getPartitions.toSeq)
+    }
+
+    /**
+     * Gets preferred locations for the given partition.
+     *
+     * @param split Split partition.
+     * @return
+     */
+    protected def getPreferredLocations(split: Partition): Seq[String] = {
+        rdd.getPreferredLocations(split)
+    }
+
+    @varargs def objectSql(typeName: String, sql: String, args: Any*): JavaPairRDD[K, V]
=
+        JavaPairRDD.fromRDD(rdd.objectSql(typeName, sql, args:_*))
+
+    @varargs def sql(sql: String, args: Any*): DataFrame = rdd.sql(sql, args:_*)
+
+    def saveValues(jrdd: JavaRDD[V]) = rdd.saveValues(JavaRDD.toRDD(jrdd))
+
+    def savePairs(jrdd: JavaPairRDD[K, V]) = {
+        val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd)
+
+        rdd.savePairs(rrdd)
+    }
+
+    def clear(): Unit = rdd.clear()
+}
+
+object JavaIgniteRDD {
+    implicit def fromIgniteRDD[K: ClassTag, V: ClassTag](rdd: IgniteRDD[K, V]): JavaIgniteRDD[K,
V] =
+        new JavaIgniteRDD[K, V](rdd)
+
+    implicit def toIgniteRDD[K, V](rdd: JavaIgniteRDD[K, V]): IgniteRDD[K, V] = rdd.rdd
+
+    def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
new file mode 100644
index 0000000..13bd3e8
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/JavaIgniteAbstractRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.spark.IgniteRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike}
+
+abstract class JavaIgniteAbstractRDD[K, V](val rdd: IgniteRDD[K, V])
+    extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
+
+    protected def ensureCache(): IgniteCache[K, V] = {
+        // Make sure to deploy the cache
+        if (rdd.cacheCfg != null)
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheCfg)
+        else
+            rdd.ic.ignite().getOrCreateCache(rdd.cacheName)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
new file mode 100644
index 0000000..e14abfc
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.*;
+
+import scala.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link JavaIgniteRDD}.
+ */
+public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 10000;
+
+    /** Cache name. */
+    private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Sum function. */
+    private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer,
Integer, Integer>() {
+        public Integer call(Integer x, Integer y) {
+            return x + y;
+        }
+    };
+
+    /** To pair function. */
+    private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer,
String, String>() {
+        /** {@inheritDoc} */
+        @Override public Tuple2<String, String> call(Integer i) {
+            return new Tuple2<>(String.valueOf(i), "val" + i);
+        }
+    };
+
+    /** (String, Integer); pair to Integer value function. */
+    private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F
= new PairToValueFunction<>();
+
+    /** (String, Entity) pair to Entity value function. */
+    private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F
=
+        new PairToValueFunction<>();
+
+    /** Integer to entity function. */
+    private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+        new PairFunction<Integer, String, Entity>() {
+            @Override public Tuple2<String, Entity> call(Integer i) throws Exception
{
+                return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i
* 100));
+            }
+        };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        Ignition.stop("client", false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.start(getConfiguration("grid-" + i, false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.stop("grid-" + i, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreDataToIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc,
new IgniteConfigProvider());
+
+            ic.fromCache(PARTITIONED_CACHE_NAME)
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                String val = cache.get(String.valueOf(i));
+
+                assertNotNull("Value was not put to cache for key: " + i, val);
+                assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+            }
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadDataFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc,
new IgniteConfigProvider());
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++)
+                cache.put(String.valueOf(i), i);
+
+            JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+            int sum = values.fold(0, SUM_F);
+
+            int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+            assertEquals(expSum, sum);
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryObjectsFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc,
new IgniteConfigProvider());
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+            List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?",
"name50", 5000)
+                .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+            assertEquals("Invalid result length", 1, res.size());
+            assertEquals("Invalid result", 50, res.get(0).id());
+            assertEquals("Invalid result", "name50", res.get(0).name());
+            assertEquals("Invalid result", 5000, res.get(0).salary());
+            assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryFieldsFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc,
new IgniteConfigProvider());
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+            DataFrame df =
+                cache.sql("select id, name, salary from Entity where name = ? and salary
= ?", "name50", 5000);
+
+            df.printSchema();
+
+            Row[] res = df.collect();
+
+            assertEquals("Invalid result length", 1, res.length);
+            assertEquals("Invalid result", 50, res[0].get(0));
+            assertEquals("Invalid result", "name50", res[0].get(1));
+            assertEquals("Invalid result", 5000, res[0].get(2));
+
+            Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+            df.printSchema();
+
+            Row[] res0 = df0.collect();
+
+            assertEquals("Invalid result length", 1, res0.length);
+            assertEquals("Invalid result", 50, res0[0].get(0));
+            assertEquals("Invalid result", "name50", res0[0].get(1));
+            assertEquals("Invalid result", 5000, res0[0].get(2));
+
+            assertEquals("Invalid count", 500, cache.sql("select id from Entity where id
> 500").count());
+        }
+        finally {
+            sc.stop();
+        }
+
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @param client Client.
+     */
+    private static IgniteConfiguration getConfiguration(String gridName, boolean client)
throws Exception {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        cfg.setClientMode(client);
+
+        cfg.setGridName(gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration.
+     */
+    private static CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setBackups(1);
+
+        ccfg.setName(PARTITIONED_CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, Entity.class);
+
+        return ccfg;
+    }
+
+    /**
+     * Ignite configiration provider.
+     */
+    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration>
{
+        /** {@inheritDoc} */
+        @Override public IgniteConfiguration apply() {
+            try {
+                return getConfiguration("client", true);
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * @param <K>
+     * @param <V>
+     */
+    static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>,
V> {
+        /** {@inheritDoc} */
+        @Override public V call(Tuple2<K, V> t) throws Exception {
+            return t._2();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d307635/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index a514e35..f5b73df 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -321,6 +321,10 @@
                                 <title>Mesos Framework</title>
                                 <packages>org.apache.ignite.mesos*</packages>
                             </group>
+                            <group>
+                                <title>Spark Integration</title>
+                                <packages>org.apache.ignite.spark.examples.java</packages>
+                            </group>
                         </groups>
                         <header>
                             <![CDATA[


Mime
View raw message