ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [2/2] incubator-ignite git commit: #IGNITE-389 - Changing API
Date Sat, 23 May 2015 01:49:44 GMT
#IGNITE-389 - Changing API


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7439b5b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7439b5b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7439b5b7

Branch: refs/heads/ignite-389
Commit: 7439b5b7ddb5f6f771c927a2a6f5bdb8b3405392
Parents: 520cd03
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Fri May 22 18:49:33 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Fri May 22 18:49:33 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |   1 -
 .../processors/cache/GridCacheProcessor.java    |  30 ++++--
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  22 ++--
 .../apache/ignite/spark/IgniteAbstractRDD.scala |  38 +++++++
 .../org/apache/ignite/spark/IgniteContext.scala |  97 ++---------------
 .../org/apache/ignite/spark/IgniteRDD.scala     | 106 ++++++++++++++++---
 .../spark/examples/IgniteProcessExample.scala   |  13 ++-
 .../spark/examples/IgniteStoreExample.scala     |   4 +-
 .../ignite/spark/impl/IgniteQueryIterator.scala |  17 ++-
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |  43 ++++++++
 .../spark/util/SerializablePredicate2.scala     |  32 ------
 11 files changed, 232 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..692f1e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -774,7 +774,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
      *
      * @param loadPrevVal Load previous value flag.
      * @return {@code this} for chaining.
-     * @return {@code this} for chaining.
      */
     public CacheConfiguration setLoadPreviousValue(boolean loadPrevVal) {
         this.loadPrevVal = loadPrevVal;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..3c4c7d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1931,7 +1931,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         req.cacheType(cacheType);
 
-        return F.first(initiateCacheChanges(F.asList(req)));
+        return F.first(initiateCacheChanges(F.asList(req), failIfExists));
     }
 
     /**
@@ -1941,14 +1941,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public IgniteInternalFuture<?> dynamicStopCache(String cacheName) {
         DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId(),
true);
 
-        return F.first(initiateCacheChanges(F.asList(t)));
+        return F.first(initiateCacheChanges(F.asList(t), false));
     }
 
     /**
      * @param reqs Requests.
      * @return Collection of futures.
      */
-    public Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest>
reqs) {
+    @SuppressWarnings("TypeMayBeWeakened")
+    private Collection<DynamicCacheStartFuture> initiateCacheChanges(Collection<DynamicCacheChangeRequest>
reqs,
+        boolean failIfExists) {
         Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
 
         Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
@@ -1981,9 +1983,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     maskNull(req.cacheName()), fut);
 
                 if (old != null) {
-                    if (req.start() && !req.clientStartOnly()) {
-                        fut.onDone(new CacheExistsException("Failed to start cache " +
-                            "(a cache with the same name is already being started or stopped):
" + req.cacheName()));
+                    if (req.start()) {
+                        if (!req.clientStartOnly()) {
+                            if (failIfExists)
+                                fut.onDone(new CacheExistsException("Failed to start cache
" +
+                                    "(a cache with the same name is already being started
or stopped): " +
+                                    req.cacheName()));
+                            else {
+                                fut = old;
+
+                                continue;
+                            }
+                        }
+                        else {
+                            fut = old;
+
+                            continue;
+                        }
                     }
                     else {
                         fut = old;
@@ -2617,7 +2633,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         req.clientStartOnly(true);
 
-        F.first(initiateCacheChanges(F.asList(req))).get();
+        F.first(initiateCacheChanges(F.asList(req), false)).get();
 
         IgniteCache cache = jCacheProxies.get(masked);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index adece63..698ee03 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -793,23 +793,19 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testGetOrCreateMultiNodeTemplate() throws Exception {
-        for (int i = 0; i < 100; i++) {
-            info(">>> Iteration " + i);
+        final AtomicInteger idx = new AtomicInteger();
 
-            final AtomicInteger idx = new AtomicInteger();
-
-            GridTestUtils.runMultiThreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    int idx0 = idx.getAndIncrement();
+        GridTestUtils.runMultiThreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int idx0 = idx.getAndIncrement();
 
-                    ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME);
+                ignite(idx0 % nodeCount()).getOrCreateCache(DYNAMIC_CACHE_NAME);
 
-                    return null;
-                }
-            }, nodeCount() * 4, "runner");
+                return null;
+            }
+        }, nodeCount() * 4, "runner");
 
-            ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
-        }
+        ignite(0).destroyCache(DYNAMIC_CACHE_NAME);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
new file mode 100644
index 0000000..63232be
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteAbstractRDD.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.spark.rdd.RDD
+
+import scala.reflect.ClassTag
+
+abstract class IgniteAbstractRDD[R:ClassTag, K, V] (
+    ic: IgniteContext[K, V],
+    cacheName: String,
+    cacheCfg: CacheConfiguration[K, V]
+) extends RDD[R] (ic.sparkContext, deps = Nil) {
+    protected def ensureCache(): IgniteCache[K, V] = {
+        // Make sure to deploy the cache
+        if (cacheCfg != null)
+            ic.ignite().getOrCreateCache(cacheCfg)
+        else
+            ic.ignite().getOrCreateCache(cacheName)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 56d2a05..9d9f9a7 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,98 +17,29 @@
 
 package org.apache.ignite.spark
 
-import javax.cache.Cache
 
-import org.apache.ignite.cache.query.{Query, ScanQuery}
-import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.lang.IgniteUuid
-import org.apache.ignite.spark.util.SerializablePredicate2
-import org.apache.ignite.{Ignition, IgniteCache, Ignite}
+import org.apache.ignite.{Ignition, Ignite}
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
 
 class IgniteContext[K, V](
-    @scala.transient sc: SparkContext,
-    cfgF: () => IgniteConfiguration,
-    val cacheName: String,
-    cacheCfg: CacheConfiguration[K, V]
+    @scala.transient val sparkContext: SparkContext,
+    cfgF: () => IgniteConfiguration
 ) extends Serializable {
-    type ScanRDD[K1, V1] = IgniteRDD[Cache.Entry[K1, V1], K1, V1]
-
-    def this(
-        sc: SparkContext,
-        springUrl: String,
-        cacheName: String
-    ) {
-        this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheName, null)
-    }
-
-    def this(
-        sc: SparkContext,
-        cfgF: () => IgniteConfiguration,
-        cacheName: String
-    ) {
-        this(sc, cfgF, cacheName, null)
-    }
-
-    def this(
-        sc: SparkContext,
-        cfgF: () => IgniteConfiguration,
-        cacheCfg: CacheConfiguration[K, V]
-    ) {
-        this(sc, cfgF, cacheCfg.getName, cacheCfg)
-    }
-
     def this(
         sc: SparkContext,
-        springUrl: String,
-        cacheCfg: CacheConfiguration[K, V]
+        springUrl: String
     ) {
-        this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1(), cacheCfg.getName,
cacheCfg)
+        this(sc, () => IgnitionEx.loadConfiguration(springUrl).get1())
     }
 
-    def sparkContext() = sc
-
-    def scan(p: (K, V) => Boolean = (_, _) => true): ScanRDD[K, V] = {
-        new ScanRDD(this, new ScanQuery[K, V](new SerializablePredicate2[K, V](p)))
+    def fromCache(cacheName: String): IgniteRDD[K, V] = {
+        new IgniteRDD[K, V](this, cacheName, null)
     }
 
-    def scan[R:ClassTag](qry: Query[R]): IgniteRDD[R, K, V] = {
-        new IgniteRDD[R, K, V](this, qry)
-    }
-
-    def saveToIgnite[T](rdd: RDD[V], keyFunc: (IgniteContext[K, V], V, ClusterNode) =>
T = affinityKeyFunc(_: IgniteContext[K, V], _:V, _: ClusterNode)) = {
-        rdd.foreachPartition(it => {
-            println("Using scala version: " + scala.util.Properties.versionString)
-            // Make sure to deploy the cache
-            igniteCache()
-
-            val ig = ignite()
-
-            val locNode = ig.cluster().localNode()
-
-            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
-
-            val streamer = ignite().dataStreamer[T, V](cacheName)
-
-            try {
-                it.foreach(value => {
-                    val key: T = keyFunc(this, value, node.orNull)
-
-                    println("Saving: " + key + ", " + value)
-
-                    streamer.addData(key, value)
-                })
-            }
-            finally {
-                streamer.close()
-            }
-        })
+    def fromCache(cacheCfg: CacheConfiguration[K, V]) = {
+        new IgniteRDD[K, V](this, cacheCfg.getName, cacheCfg)
     }
 
     def ignite(): Ignite = {
@@ -130,14 +61,4 @@ class IgniteContext[K, V](
         }
     }
 
-    private def igniteCache(): IgniteCache[K, V] = {
-        if (cacheCfg == null)
-            ignite().getOrCreateCache(cacheName)
-        else
-            ignite().getOrCreateCache(cacheCfg)
-    }
-
-    private def affinityKeyFunc(ic: IgniteContext[K, V], value: V, node: ClusterNode): Object
= {
-        IgniteUuid.randomUuid()
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 4018c53..ce51e9c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -14,32 +14,114 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ignite.spark
 
-import org.apache.ignite.cache.query.Query
-import org.apache.ignite.spark.impl.{IgniteQueryIterator, IgnitePartition}
-import org.apache.spark.{TaskContext, Partition}
+import javax.cache.Cache
+
+import org.apache.ignite.cache.query.{SqlQuery, ScanQuery}
+import org.apache.ignite.cluster.ClusterNode
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.lang.IgniteUuid
+import org.apache.ignite.spark.impl.{IgniteSqlRDD, IgnitePartition, IgniteQueryIterator}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.{TaskContext, Partition}
 
 import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
 
-class IgniteRDD[R:ClassTag, K, V](
+class IgniteRDD[K, V] (
     ic: IgniteContext[K, V],
-    qry: Query[R]
-) extends RDD[R] (ic.sparkContext(), deps = Nil) {
-    override def compute(part: Partition, context: TaskContext): Iterator[R] = {
-        new IgniteQueryIterator[R, K, V](ic, part, qry)
+    cacheName: String,
+    cacheCfg: CacheConfiguration[K, V]
+) extends IgniteAbstractRDD[(K, V), K, V] (ic, cacheName, cacheCfg) {
+
+    override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+        val cache = ensureCache()
+
+        val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator()
+
+        new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => {
+            (entry.getKey, entry.getValue)
+        })
     }
 
     override protected def getPartitions: Array[Partition] = {
-        val parts = ic.ignite().affinity(ic.cacheName).partitions()
+        ensureCache()
+
+        val parts = ic.ignite().affinity(cacheName).partitions()
 
         (0 until parts).map(new IgnitePartition(_)).toArray
     }
 
     override protected def getPreferredLocations(split: Partition): Seq[String] = {
-        ic.ignite().affinity(ic.cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+        ensureCache()
+
+        ic.ignite().affinity(cacheName).mapPartitionToPrimaryAndBackups(split.index).map(_.addresses()).flatten.toList
+    }
+
+    def query(typeName: String, sql: String, args: Any*): RDD[(K, V)] = {
+        val qry: SqlQuery[K, V] = new SqlQuery[K, V](typeName, sql)
+
+        qry.setArgs(args)
+
+        new IgniteSqlRDD[(K, V), Cache.Entry[K, V], K, V](ic, cacheName, cacheCfg, qry, entry
=> (entry.getKey, entry.getValue))
+    }
+
+    def saveValues(rdd: RDD[V]) = {
+        rdd.foreachPartition(it => {
+            println("Using scala version: " + scala.util.Properties.versionString)
+            val ig = ic.ignite()
+
+            ensureCache()
+
+            val locNode = ig.cluster().localNode()
+
+            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+            val streamer = ig.dataStreamer[Object, V](cacheName)
+
+            try {
+                it.foreach(value => {
+                    val key = affinityKeyFunc(value, node.orNull)
+
+                    println("Saving: " + key + ", " + value)
+
+                    streamer.addData(key, value)
+                })
+            }
+            finally {
+                streamer.close()
+            }
+        })
+    }
+
+    def save(rdd: RDD[(K, V)]) = {
+        rdd.foreachPartition(it => {
+            println("Using scala version: " + scala.util.Properties.versionString)
+            val ig = ic.ignite()
+
+            // Make sure to deploy the cache
+            ensureCache()
+
+            val locNode = ig.cluster().localNode()
+
+            val node: Option[ClusterNode] = ig.cluster().forHost(locNode).nodes().find(!_.eq(locNode))
+
+            val streamer = ig.dataStreamer[K, V](cacheName)
+
+            try {
+                it.foreach(tup => {
+                    println("Saving: " + tup._1 + ", " + tup._2)
+
+                    streamer.addData(tup._1, tup._2)
+                })
+            }
+            finally {
+                streamer.close()
+            }
+        })
+    }
+
+    private def affinityKeyFunc(value: V, node: ClusterNode): Object = {
+        IgniteUuid.randomUuid()
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
index 3932a26..cdb41d2 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteProcessExample.scala
@@ -25,20 +25,23 @@ object IgniteProcessExample {
         val conf = new SparkConf().setAppName("Ignite processing example")
         val sc = new SparkContext(conf)
 
-        val partitioned = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration
_, "partitioned")
+        val ignite = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration
_)
 
         // Search for lines containing "Ignite".
-        val scanRdd = partitioned.scan((k, v) => v.contains("Ignite"))
+        val scanRdd = ignite.fromCache("partitioned")
 
         val processedRdd = scanRdd.filter(line => {
             println("Analyzing line: " + line)
+            line._2.contains("Ignite")
 
             true
-        }).map(_.getValue)
+        }).map(_._2)
 
         // Create a new cache for results.
-        val results = new IgniteContext[Object, String](sc, ExampleConfiguration.configuration
_, "results")
+        val results = ignite.fromCache("results")
 
-        results.saveToIgnite(processedRdd)
+        results.saveValues(processedRdd)
+
+        ignite.fromCache("indexed").query("Person", "age > ?", 20).collect()
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
index a7823f4..c74804e 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/examples/IgniteStoreExample.scala
@@ -27,7 +27,7 @@ object IgniteStoreExample {
         val conf = new SparkConf().setAppName("Ignite store example")
         val sc = new SparkContext(conf)
 
-        val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration
_, "partitioned")
+        val ignite = new IgniteContext[String, String](sc, ExampleConfiguration.configuration
_)
 
         val lines: RDD[String] = sc.textFile(args(0)).filter(line => {
             println("Read line: " + line)
@@ -35,6 +35,6 @@ object IgniteStoreExample {
             true
         })
 
-        ignite.saveToIgnite(lines)
+        ignite.fromCache("partitioned").saveValues(lines)
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
index 07b24a9..b24ba50 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteQueryIterator.scala
@@ -17,16 +17,11 @@
 
 package org.apache.ignite.spark.impl
 
-import org.apache.ignite.cache.query.Query
-import org.apache.ignite.spark.IgniteContext
-import org.apache.spark.Partition
+class IgniteQueryIterator[T, R] (
+    cur: java.util.Iterator[T],
+    conv: (T) => R
+) extends Iterator[R] {
+    override def hasNext: Boolean = cur.hasNext
 
-class IgniteQueryIterator[R, K, V] (
-    ic: IgniteContext[K, V],
-    part: Partition,
-    qry: Query[R]
-    ) extends Iterator[R] {
-    override def hasNext: Boolean = ???
-
-    override def next(): R = ???
+    override def next(): R = conv(cur.next())
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
new file mode 100644
index 0000000..e347c85
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSqlRDD.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.query.Query
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.spark.{IgniteAbstractRDD, IgniteContext}
+import org.apache.spark.{TaskContext, Partition}
+
+import scala.reflect.ClassTag
+
+class IgniteSqlRDD[R: ClassTag, T, K, V](
+    ic: IgniteContext[K, V],
+    cacheName: String,
+    cacheCfg: CacheConfiguration[K, V],
+    qry: Query[T],
+    conv: (T) => R
+) extends IgniteAbstractRDD[R, K, V](ic, cacheName, cacheCfg) {
+    override def compute(split: Partition, context: TaskContext): Iterator[R] = {
+        val it: java.util.Iterator[T] = ensureCache().query(qry).iterator()
+
+        new IgniteQueryIterator[T, R](it, conv)
+    }
+
+    override protected def getPartitions: Array[Partition] = {
+        Array(new IgnitePartition(0))
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7439b5b7/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
b/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
deleted file mode 100644
index 484d0df..0000000
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/util/SerializablePredicate2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark.util
-
-import org.apache.ignite.lang.IgniteBiPredicate
-
-/**
- * Peer deploy aware adapter for Java's `GridPredicate2`.
- */
-class SerializablePredicate2[T1, T2](private val p: (T1, T2) => Boolean) extends IgniteBiPredicate[T1,
T2] {
-    assert(p != null)
-
-    /**
-     * Delegates to passed in function.
-     */
-    def apply(e1: T1, e2: T2) = p(e1, e2)
-}
\ No newline at end of file


Mime
View raw message