ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/7] ignite git commit: IGNITE-2941: Add getOrStart method to ignition IGNITE-2942: Use getOrStart in IgniteContext instead of current try-catch structure This closes #631 Reviewed by Denis Magda, Alexey Goncharuk.
Date Tue, 19 Apr 2016 13:08:59 GMT
IGNITE-2941: Add getOrStart method to ignition
IGNITE-2942: Use getOrStart in IgniteContext instead of current try-catch structure
This closes #631
Reviewed by Denis Magda, Alexey Goncharuk.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9d375e5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9d375e5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9d375e5

Branch: refs/heads/ignite-2926
Commit: a9d375e595e504ce22c684b91d1a0da228c4a606
Parents: 3779fe4
Author: Alexei Scherbakov <alexey.scherbakoff@gmail.com>
Authored: Tue Apr 19 07:11:54 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Tue Apr 19 07:11:54 2016 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/Ignition.java   |  19 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |  60 +++-
 .../ignite/internal/GridGetOrStartSelfTest.java | 129 +++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |  16 +-
 .../org/apache/ignite/spark/IgniteContext.scala |  73 ++--
 .../apache/ignite/spark/JavaIgniteContext.scala |  14 +-
 .../spark/JavaEmbeddedIgniteRDDSelfTest.java    | 343 +++++++++++++++++++
 .../ignite/spark/JavaIgniteRDDSelfTest.java     | 302 ----------------
 .../spark/JavaStandaloneIgniteRDDSelfTest.java  | 302 ++++++++++++++++
 .../ignite/testsuites/IgniteRDDTestSuite.java   |  40 +++
 10 files changed, 935 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/main/java/org/apache/ignite/Ignition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignition.java b/modules/core/src/main/java/org/apache/ignite/Ignition.java
index 99ee1d9..b4c01f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignition.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignition.java
@@ -309,7 +309,7 @@ public class Ignition {
     }
 
     /**
-     * Starts grid with given configuration. Note that this method is no-op if grid with the name
+     * Starts grid with given configuration. Note that this method will throw an exception if grid with the name
      * provided in given configuration is already started.
      *
      * @param cfg Grid configuration. This cannot be {@code null}.
@@ -401,6 +401,23 @@ public class Ignition {
         }
     }
 
+
+    /**
+     * Gets or starts new grid instance if it hasn't been started yet.
+     *
+     * @param cfg Grid configuration. This cannot be {@code null}.
+     * @return Grid instance.
+     * @throws IgniteException If grid could not be started.
+     */
+    public static Ignite getOrStart(IgniteConfiguration cfg) throws IgniteException {
+        try {
+            return IgnitionEx.start(cfg, false);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
     /**
      * Loads Spring bean by its name from given Spring XML configuration file. If bean
      * with such name doesn't exist, exception is thrown.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 533b6d8..9a83826 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -499,7 +499,7 @@ public class IgnitionEx {
 
         U.warn(null, "Default Spring XML file not found (is IGNITE_HOME set?): " + DFLT_CFG);
 
-        return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx)).grid();
+        return start0(new GridStartContext(new IgniteConfiguration(), null, springCtx), true).grid();
     }
 
     /**
@@ -512,11 +512,25 @@ public class IgnitionEx {
      *      also if named grid has already been started.
      */
     public static Ignite start(IgniteConfiguration cfg) throws IgniteCheckedException {
-        return start(cfg, null);
+        return start(cfg, null, true);
     }
 
     /**
-     * Starts grid with given configuration. Note that this method is no-op if grid with the name
+     * Starts a grid with given configuration. If the grid is already started and failIfStarted set to TRUE
+     * an exception will be thrown.
+     *
+     * @param cfg Grid configuration. This cannot be {@code null}.
+     * failIfStarted Throw or not an exception if grid is already started.
+     * @return Started grid.
+     * @throws IgniteCheckedException If grid could not be started. This exception will be thrown
+     *      also if named grid has already been started.
+     */
+    public static Ignite start(IgniteConfiguration cfg, boolean failIfStarted) throws IgniteCheckedException {
+        return start(cfg, null, failIfStarted);
+    }
+
+    /**
+     * Starts grid with given configuration. Note that this method will throw and exception if grid with the name
      * provided in given configuration is already started.
      *
      * @param cfg Grid configuration. This cannot be {@code null}.
@@ -531,7 +545,27 @@ public class IgnitionEx {
     public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
         A.notNull(cfg, "cfg");
 
-        return start0(new GridStartContext(cfg, null, springCtx)).grid();
+        return start0(new GridStartContext(cfg, null, springCtx), true).grid();
+    }
+
+    /**
+     * Starts grid with given configuration. If the grid is already started and failIfStarted set to TRUE
+     * an exception will be thrown.
+     *
+     * @param cfg Grid configuration. This cannot be {@code null}.
+     * @param springCtx Optional Spring application context, possibly {@code null}.
+     *      Spring bean definitions for bean injection are taken from this context.
+     *      If provided, this context can be injected into grid tasks and grid jobs using
+     *      {@link SpringApplicationContextResource @SpringApplicationContextResource} annotation.
+     * @param failIfStarted Throw or not an exception if grid is already started.
+     * @return Started grid.
+     * @throws IgniteCheckedException If grid could not be started. This exception will be thrown
+     *      also if named grid has already been started.
+     */
+    public static Ignite start(IgniteConfiguration cfg, @Nullable GridSpringResourceContext springCtx, boolean failIfStarted) throws IgniteCheckedException {
+        A.notNull(cfg, "cfg");
+
+        return start0(new GridStartContext(cfg, null, springCtx), failIfStarted).grid();
     }
 
     /**
@@ -927,7 +961,7 @@ public class IgnitionEx {
 
                 // Use either user defined context or our one.
                 IgniteNamedInstance grid = start0(
-                    new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx));
+                    new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx), true);
 
                 // Add it if it was not stopped during startup.
                 if (grid != null)
@@ -958,10 +992,11 @@ public class IgnitionEx {
      * Starts grid with given configuration.
      *
      * @param startCtx Start context.
+     * @param failIfStarted Throw or not an exception if grid is already started.
      * @return Started grid.
      * @throws IgniteCheckedException If grid could not be started.
      */
-    private static IgniteNamedInstance start0(GridStartContext startCtx) throws IgniteCheckedException {
+    private static IgniteNamedInstance start0(GridStartContext startCtx, boolean failIfStarted ) throws IgniteCheckedException {
         assert startCtx != null;
 
         String name = startCtx.config().getGridName();
@@ -984,12 +1019,15 @@ public class IgnitionEx {
             }
         }
 
-        if (old != null) {
-            if (name == null)
-                throw new IgniteCheckedException("Default Ignite instance has already been started.");
+        if (old != null)
+            if (failIfStarted) {
+                if (name == null)
+                    throw new IgniteCheckedException("Default Ignite instance has already been started.");
+                else
+                    throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name);
+            }
             else
-                throw new IgniteCheckedException("Ignite instance with this name has already been started: " + name);
-        }
+                return old;
 
         if (startCtx.config().getWarmupClosure() != null)
             startCtx.config().getWarmupClosure().apply(startCtx.config());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
new file mode 100644
index 0000000..9b3985e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridGetOrStartSelfTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ * The GridGetOrStartSelfTest tests get or start semantics.
+ */
+
+@GridCommonTest(group = "Kernal Self")
+public class GridGetOrStartSelfTest extends GridCommonAbstractTest {
+    /** Concurrency. */
+    public static final int CONCURRENCY = 10;
+
+    /**
+     * Default constructor.
+     */
+    public GridGetOrStartSelfTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Tests default grid
+     */
+    public void testDefaultGridGetOrStart() throws Exception {
+        IgniteConfiguration cfg = getConfiguration(null);
+
+        try (Ignite ignite = Ignition.getOrStart(cfg)) {
+            try {
+                Ignition.start(cfg);
+
+                fail("Expected exception after grid started");
+            }
+            catch (IgniteException ignored) {
+            }
+
+            Ignite ignite2 = Ignition.getOrStart(cfg);
+
+            assertEquals("Must return same instance", ignite, ignite2);
+        }
+
+        assertTrue(G.allGrids().isEmpty());
+    }
+
+    /**
+     * Tests named grid
+     */
+    public void testNamedGridGetOrStart() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("test");
+        try (Ignite ignite = Ignition.getOrStart(cfg)) {
+            try {
+                Ignition.start(cfg);
+
+                fail("Expected exception after grid started");
+            }
+            catch (IgniteException ignored) {
+                // No-op.
+            }
+
+            Ignite ignite2 = Ignition.getOrStart(cfg);
+
+            assertEquals("Must return same instance", ignite, ignite2);
+        }
+
+        assertTrue(G.allGrids().isEmpty());
+    }
+
+    /**
+     * Tests concurrent grid initialization
+     */
+    public void testConcurrentGridGetOrStartCon() throws Exception {
+        final IgniteConfiguration cfg = getConfiguration(null);
+
+        final AtomicReference<Ignite> ref = new AtomicReference<>();
+
+        try {
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                @Override public void run() {
+                    // must return same instance in each thread
+
+                    try {
+                        Ignite ignite = Ignition.getOrStart(cfg);
+
+                        boolean set = ref.compareAndSet(null, ignite);
+
+                        if (!set)
+                            assertEquals(ref.get(), ignite);
+                    }
+                    catch (IgniteException e) {
+                        throw new RuntimeException("Ignite error", e);
+                    }
+                }
+            }, CONCURRENCY, "GridCreatorThread");
+        }
+        catch (Exception ignored) {
+            fail("Exception is not expected");
+        }
+
+        G.stopAll(true);
+
+        assertTrue(G.allGrids().isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 9e2324c..bb4b0f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -20,19 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.Set;
 import junit.framework.TestSuite;
 import org.apache.ignite.GridSuppressedExceptionSelfTest;
-import org.apache.ignite.internal.ClusterGroupHostsSelfTest;
-import org.apache.ignite.internal.ClusterGroupSelfTest;
-import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest;
-import org.apache.ignite.internal.GridLifecycleAwareSelfTest;
-import org.apache.ignite.internal.GridLifecycleBeanSelfTest;
-import org.apache.ignite.internal.GridNodeMetricsLogSelfTest;
-import org.apache.ignite.internal.GridProjectionForCachesSelfTest;
-import org.apache.ignite.internal.GridReduceSelfTest;
-import org.apache.ignite.internal.GridReleaseTypeSelfTest;
-import org.apache.ignite.internal.GridSelfTest;
-import org.apache.ignite.internal.GridStartStopSelfTest;
-import org.apache.ignite.internal.GridStopWithCancelSelfTest;
-import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
 import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest;
@@ -129,6 +117,8 @@ public class IgniteBasicTestSuite extends TestSuite {
         suite.addTestSuite(VariationsIteratorTest.class);
         suite.addTestSuite(ConfigVariationsTestSuiteBuilderTest.class);
 
+        GridTestUtils.addTestIfNeeded(suite, GridGetOrStartSelfTest.class, ignoredTests);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 57fe84f..182605c 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.spark
 
-
-import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.internal.util.IgniteUtils
 import org.apache.ignite._
 import org.apache.ignite.configuration.{CacheConfiguration, IgniteConfiguration}
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.internal.util.IgniteUtils
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.{Logging, SparkContext}
 
 /**
  * Ignite context.
@@ -36,15 +35,13 @@ import org.apache.spark.sql.SQLContext
 class IgniteContext[K, V](
     @transient val sparkContext: SparkContext,
     cfgF: () ⇒ IgniteConfiguration,
-    client: Boolean = true
-) extends Serializable with Logging {
-    @transient private val driver = true
-
+    standalone: Boolean = true
+    ) extends Serializable with Logging {
     private val cfgClo = new Once(cfgF)
 
     private val igniteHome = IgniteUtils.getIgniteHome
 
-    if (!client) {
+    if (!standalone) {
         // Get required number of executors with default equals to number of available executors.
         val workers = sparkContext.getConf.getInt("spark.executor.instances",
             sparkContext.getExecutorStorageStatus.length)
@@ -55,7 +52,7 @@ class IgniteContext[K, V](
         logInfo("Will start Ignite nodes on " + workers + " workers")
 
         // Start ignite server node on each worker in server mode.
-        sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+        sparkContext.parallelize(1 to workers, workers).foreachPartition(it ⇒ ignite())
     }
 
     // Make sure to start Ignite on context creation.
@@ -71,7 +68,7 @@ class IgniteContext[K, V](
         sc: SparkContext,
         springUrl: String,
         client: Boolean
-    ) {
+        ) {
         this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1(), client)
     }
 
@@ -84,7 +81,7 @@ class IgniteContext[K, V](
     def this(
         sc: SparkContext,
         springUrl: String
-    ) {
+        ) {
         this(sc, () ⇒ IgnitionEx.loadConfiguration(springUrl).get1())
     }
 
@@ -124,10 +121,8 @@ class IgniteContext[K, V](
     }
 
     /**
-     * Gets an Ignite instance supporting this context. Ignite instance will be started
-     * if it has not been started yet.
-     *
-     * @return Ignite instance.
+     * Get or start Ignite instance it it's not started yet.
+     * @return
      */
     def ignite(): Ignite = {
         val home = IgniteUtils.getIgniteHome
@@ -142,24 +137,17 @@ class IgniteContext[K, V](
 
         val igniteCfg = cfgClo()
 
+        // check if called from driver
+        if (sparkContext != null) igniteCfg.setClientMode(true)
+
         try {
-            Ignition.ignite(igniteCfg.getGridName)
+            Ignition.getOrStart(igniteCfg)
         }
         catch {
-            case e: IgniteIllegalStateException ⇒
-                try {
-                    igniteCfg.setClientMode(client || driver)
-
-                    Ignition.start(igniteCfg)
-                }
-                catch {
-                    case e: IgniteException ⇒ {
-                        logError("Failed to start Ignite client. Will try to use an existing instance with name: "
-                            + igniteCfg.getGridName, e)
-
-                        Ignition.ignite(igniteCfg.getGridName)
-                    }
-                }
+            case e: IgniteException ⇒
+                logError("Failed to start Ignite.", e)
+
+                throw e
         }
     }
 
@@ -167,7 +155,25 @@ class IgniteContext[K, V](
      * Stops supporting ignite instance. If ignite instance has been already stopped, this operation will be
      * a no-op.
      */
-    def close() = {
+    def close(shutdownIgniteOnWorkers: Boolean = false) = {
+        // additional check if called from driver
+        if (sparkContext != null && shutdownIgniteOnWorkers) {
+            // Get required number of executors with default equals to number of available executors.
+            val workers = sparkContext.getConf.getInt("spark.executor.instances",
+                sparkContext.getExecutorStorageStatus.length)
+
+            if (workers > 0) {
+                logInfo("Will stop Ignite nodes on " + workers + " workers")
+
+                // Start ignite server node on each worker in server mode.
+                sparkContext.parallelize(1 to workers, workers).foreachPartition(it ⇒ doClose())
+            }
+        }
+
+        doClose()
+    }
+
+    private def doClose() = {
         val igniteCfg = cfgClo()
 
         Ignition.stop(igniteCfg.getGridName, false)
@@ -184,8 +190,11 @@ private class Once(clo: () ⇒ IgniteConfiguration) extends Serializable {
 
     def apply(): IgniteConfiguration = {
         if (res == null) {
+
             this.synchronized {
+
                 if (res == null)
+
                     res = clo()
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
index e2d57bf..44b1cd9 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteContext.scala
@@ -34,10 +34,16 @@ import scala.reflect.ClassTag
  * @tparam V Value type.
  */
 class JavaIgniteContext[K, V](
-    @scala.transient val sc: JavaSparkContext,
-    val cfgF: IgniteOutClosure[IgniteConfiguration]) extends Serializable {
+    @transient val sc: JavaSparkContext,
+    val cfgF: IgniteOutClosure[IgniteConfiguration],
+    standalone: Boolean = true
+    ) extends Serializable {
 
-    @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply())
+    @transient val ic: IgniteContext[K, V] = new IgniteContext[K, V](sc.sc, () => cfgF.apply(), standalone)
+
+    def this(sc: JavaSparkContext, cfgF: IgniteOutClosure[IgniteConfiguration]) {
+        this(sc, cfgF, true)
+    }
 
     def this(sc: JavaSparkContext, springUrl: String) {
         this(sc, new IgniteOutClosure[IgniteConfiguration] {
@@ -53,7 +59,7 @@ class JavaIgniteContext[K, V](
 
     def ignite(): Ignite = ic.ignite()
 
-    def close() = ic.close()
+    def close(shutdownIgniteOnWorkers:Boolean = false) = ic.close(shutdownIgniteOnWorkers)
 
     private[spark] def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
new file mode 100644
index 0000000..5ceaca7
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+/**
+ * Tests for {@link JavaIgniteRDD} (embedded mode).
+ */
+public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest {
+    /** For grid names generation */
+    private static AtomicInteger cntr = new AtomicInteger(1);
+
+    /** Grid names. */
+    private static ThreadLocal<Integer> gridNames = new ThreadLocal<Integer>() {
+        @Override protected Integer initialValue() {
+            return cntr.getAndIncrement();
+        }
+    };
+
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 10000;
+
+    /** Cache name. */
+    private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+    /** Sum function. */
+    private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+        public Integer call(Integer x, Integer y) {
+            return x + y;
+        }
+    };
+
+    /** To pair function. */
+    private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+        /** {@inheritDoc} */
+        @Override public Tuple2<String, String> call(Integer i) {
+            return new Tuple2<>(String.valueOf(i), "val" + i);
+        }
+    };
+
+    /** (String, Integer); pair to Integer value function. */
+    private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+    /** (String, Entity) pair to Entity value function. */
+    private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+        new PairToValueFunction<>();
+
+    /** Integer to entity function. */
+    private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+        new PairFunction<Integer, String, Entity>() {
+            @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+                return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+            }
+        };
+
+    /**
+     * Default constructor.
+     */
+    public JavaEmbeddedIgniteRDDSelfTest() {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Creates default spark context
+     */
+    private JavaSparkContext createContext() {
+        SparkConf conf = new SparkConf();
+
+        conf.set("spark.executor.instances", String.valueOf(GRID_CNT));
+
+        return new JavaSparkContext("local[" + GRID_CNT + "]", "test", conf);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreDataToIgnite() throws Exception {
+        JavaSparkContext sc = createContext();
+
+        JavaIgniteContext<String, String> ic = null;
+
+        try {
+            ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+            ic.fromCache(PARTITIONED_CACHE_NAME)
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F));
+
+            Ignite ignite = ic.ignite();
+
+            IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                String val = cache.get(String.valueOf(i));
+
+                assertNotNull("Value was not put to cache for key: " + i, val);
+                assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+            }
+        }
+        finally {
+            if (ic != null)
+                ic.close(true);
+
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadDataFromIgnite() throws Exception {
+        JavaSparkContext sc = createContext();
+
+        JavaIgniteContext<String, Integer> ic = null;
+
+        try {
+            ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+            Ignite ignite = ic.ignite();
+
+            IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++)
+                cache.put(String.valueOf(i), i);
+
+            JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+            int sum = values.fold(0, SUM_F);
+
+            int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+            assertEquals(expSum, sum);
+        }
+        finally {
+            if (ic != null)
+                ic.close(true);
+
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryObjectsFromIgnite() throws Exception {
+        fail("IGNITE-3009");
+
+        JavaSparkContext sc = createContext();
+
+        JavaIgniteContext<String, Entity> ic = null;
+
+        try {
+            ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            int cnt = 1001;
+
+            List<Integer> numbers = F.range(0, cnt);
+
+            cache.savePairs(sc.parallelize(numbers, GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+
+            List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+                .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+            assertEquals("Invalid result length", 1, res.size());
+            assertEquals("Invalid result", 50, res.get(0).id());
+            assertEquals("Invalid result", "name50", res.get(0).name());
+            assertEquals("Invalid result", 5000, res.get(0).salary());
+
+//            Ignite ignite = ic.ignite();
+//            IgniteCache<Object, Object> underCache = ignite.cache(PARTITIONED_CACHE_NAME);
+//            assertEquals("Invalid total count", cnt, underCache.size());
+
+            assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+        }
+        finally {
+            if (ic != null)
+                ic.close(true);
+
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryFieldsFromIgnite() throws Exception {
+        JavaSparkContext sc = createContext();
+
+        JavaIgniteContext<String, Entity> ic = null;
+
+        try {
+            ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false);
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F));
+
+            DataFrame df =
+                cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+            df.printSchema();
+
+            Row[] res = df.collect();
+
+            assertEquals("Invalid result length", 1, res.length);
+            assertEquals("Invalid result", 50, res[0].get(0));
+            assertEquals("Invalid result", "name50", res[0].get(1));
+            assertEquals("Invalid result", 5000, res[0].get(2));
+
+            Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+            df.printSchema();
+
+            Row[] res0 = df0.collect();
+
+            assertEquals("Invalid result length", 1, res0.length);
+            assertEquals("Invalid result", 50, res0[0].get(0));
+            assertEquals("Invalid result", "name50", res0[0].get(1));
+            assertEquals("Invalid result", 5000, res0[0].get(2));
+
+            assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+        }
+        finally {
+            if (ic != null)
+                ic.close(true);
+
+            sc.stop();
+        }
+    }
+
+    /** Finder. */
+    private static TcpDiscoveryVmIpFinder FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * @param gridName Grid name.
+     * @param client Client.
+     */
+    private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        cfg.setClientMode(client);
+
+        cfg.setGridName(gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration.
+     */
+    private static CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setBackups(1);
+
+        ccfg.setName(PARTITIONED_CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, Entity.class);
+
+        return ccfg;
+    }
+
+    /**
+     * Ignite configiration provider.
+     */
+    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public IgniteConfiguration apply() {
+            try {
+                return getConfiguration("worker-" + gridNames.get(), false);
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * @param <K>
+     * @param <V>
+     */
+    static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+        /** {@inheritDoc} */
+        @Override public V call(Tuple2<K, V> t) throws Exception {
+            return t._2();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
deleted file mode 100644
index becd90a..0000000
--- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaIgniteRDDSelfTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spark;
-
-import java.util.List;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import scala.Tuple2;
-
-/**
- * Tests for {@link JavaIgniteRDD}.
- */
-public class JavaIgniteRDDSelfTest extends GridCommonAbstractTest {
-    /** Grid count. */
-    private static final int GRID_CNT = 3;
-
-    /** Keys count. */
-    private static final int KEYS_CNT = 10000;
-
-    /** Cache name. */
-    private static final String PARTITIONED_CACHE_NAME = "partitioned";
-
-    /** Ip finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Sum function. */
-    private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
-        public Integer call(Integer x, Integer y) {
-            return x + y;
-        }
-    };
-
-    /** To pair function. */
-    private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
-        /** {@inheritDoc} */
-        @Override public Tuple2<String, String> call(Integer i) {
-            return new Tuple2<>(String.valueOf(i), "val" + i);
-        }
-    };
-
-    /** (String, Integer); pair to Integer value function. */
-    private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
-
-    /** (String, Entity) pair to Entity value function. */
-    private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
-        new PairToValueFunction<>();
-
-    /** Integer to entity function. */
-    private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
-        new PairFunction<Integer, String, Entity>() {
-            @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
-                return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
-            }
-        };
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        Ignition.stop("client", false);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        for (int i = 0; i < GRID_CNT; i++)
-            Ignition.start(getConfiguration("grid-" + i, false));
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        for (int i = 0; i < GRID_CNT; i++)
-            Ignition.stop("grid-" + i, false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testStoreDataToIgnite() throws Exception {
-        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
-        try {
-            JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
-            ic.fromCache(PARTITIONED_CACHE_NAME)
-                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
-
-            Ignite ignite = Ignition.ignite("grid-0");
-
-            IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
-
-            for (int i = 0; i < KEYS_CNT; i++) {
-                String val = cache.get(String.valueOf(i));
-
-                assertNotNull("Value was not put to cache for key: " + i, val);
-                assertEquals("Invalid value stored for key: " + i, "val" + i, val);
-            }
-        }
-        finally {
-            sc.stop();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReadDataFromIgnite() throws Exception {
-        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
-        try {
-            JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
-            Ignite ignite = Ignition.ignite("grid-0");
-
-            IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
-
-            for (int i = 0; i < KEYS_CNT; i++)
-                cache.put(String.valueOf(i), i);
-
-            JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
-
-            int sum = values.fold(0, SUM_F);
-
-            int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
-
-            assertEquals(expSum, sum);
-        }
-        finally {
-            sc.stop();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testQueryObjectsFromIgnite() throws Exception {
-        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
-        try {
-            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
-            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
-
-            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
-
-            List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
-                .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
-
-            assertEquals("Invalid result length", 1, res.size());
-            assertEquals("Invalid result", 50, res.get(0).id());
-            assertEquals("Invalid result", "name50", res.get(0).name());
-            assertEquals("Invalid result", 5000, res.get(0).salary());
-            assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
-        }
-        finally {
-            sc.stop();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testQueryFieldsFromIgnite() throws Exception {
-        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
-
-        try {
-            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
-
-            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
-
-            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
-
-            DataFrame df =
-                cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
-
-            df.printSchema();
-
-            Row[] res = df.collect();
-
-            assertEquals("Invalid result length", 1, res.length);
-            assertEquals("Invalid result", 50, res[0].get(0));
-            assertEquals("Invalid result", "name50", res[0].get(1));
-            assertEquals("Invalid result", 5000, res[0].get(2));
-
-            Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
-
-            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
-
-            df.printSchema();
-
-            Row[] res0 = df0.collect();
-
-            assertEquals("Invalid result length", 1, res0.length);
-            assertEquals("Invalid result", 50, res0[0].get(0));
-            assertEquals("Invalid result", "name50", res0[0].get(1));
-            assertEquals("Invalid result", 5000, res0[0].get(2));
-
-            assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
-        }
-        finally {
-            sc.stop();
-        }
-
-    }
-
-    /**
-     * @param gridName Grid name.
-     * @param client Client.
-     */
-    private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        cfg.setCacheConfiguration(cacheConfiguration());
-
-        cfg.setClientMode(client);
-
-        cfg.setGridName(gridName);
-
-        return cfg;
-    }
-
-    /**
-     * Creates cache configuration.
-     */
-    private static CacheConfiguration<Object, Object> cacheConfiguration() {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
-        ccfg.setBackups(1);
-
-        ccfg.setName(PARTITIONED_CACHE_NAME);
-
-        ccfg.setIndexedTypes(String.class, Entity.class);
-
-        return ccfg;
-    }
-
-    /**
-     * Ignite configiration provider.
-     */
-    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
-        /** {@inheritDoc} */
-        @Override public IgniteConfiguration apply() {
-            try {
-                return getConfiguration("client", true);
-            }
-            catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    /**
-     * @param <K>
-     * @param <V>
-     */
-    static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
-        /** {@inheritDoc} */
-        @Override public V call(Tuple2<K, V> t) throws Exception {
-            return t._2();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
new file mode 100644
index 0000000..faa8fda
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaStandaloneIgniteRDDSelfTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark;
+
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+/**
+ * Tests for {@link JavaIgniteRDD} (standalone mode).
+ */
+public class JavaStandaloneIgniteRDDSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Keys count. */
+    private static final int KEYS_CNT = 10000;
+
+    /** Cache name. */
+    private static final String PARTITIONED_CACHE_NAME = "partitioned";
+
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Sum function. */
+    private static final Function2<Integer, Integer, Integer> SUM_F = new Function2<Integer, Integer, Integer>() {
+        public Integer call(Integer x, Integer y) {
+            return x + y;
+        }
+    };
+
+    /** To pair function. */
+    private static final PairFunction<Integer, String, String> TO_PAIR_F = new PairFunction<Integer, String, String>() {
+        /** {@inheritDoc} */
+        @Override public Tuple2<String, String> call(Integer i) {
+            return new Tuple2<>(String.valueOf(i), "val" + i);
+        }
+    };
+
+    /** (String, Integer); pair to Integer value function. */
+    private static final Function<Tuple2<String, Integer>, Integer> STR_INT_PAIR_TO_INT_F = new PairToValueFunction<>();
+
+    /** (String, Entity) pair to Entity value function. */
+    private static final Function<Tuple2<String, Entity>, Entity> STR_ENTITY_PAIR_TO_ENTITY_F =
+        new PairToValueFunction<>();
+
+    /** Integer to entity function. */
+    private static final PairFunction<Integer, String, Entity> INT_TO_ENTITY_F =
+        new PairFunction<Integer, String, Entity>() {
+            @Override public Tuple2<String, Entity> call(Integer i) throws Exception {
+                return new Tuple2<>(String.valueOf(i), new Entity(i, "name" + i, i * 100));
+            }
+        };
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Ignition.ignite("grid-0").cache(PARTITIONED_CACHE_NAME).removeAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        Ignition.stop("client", false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.start(getConfiguration("grid-" + i, false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        for (int i = 0; i < GRID_CNT; i++)
+            Ignition.stop("grid-" + i, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStoreDataToIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, String> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            ic.fromCache(PARTITIONED_CACHE_NAME)
+                .savePairs(sc.parallelize(F.range(0, KEYS_CNT), 2).mapToPair(TO_PAIR_F));
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, String> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                String val = cache.get(String.valueOf(i));
+
+                assertNotNull("Value was not put to cache for key: " + i, val);
+                assertEquals("Invalid value stored for key: " + i, "val" + i, val);
+            }
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadDataFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Integer> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            Ignite ignite = Ignition.ignite("grid-0");
+
+            IgniteCache<String, Integer> cache = ignite.cache(PARTITIONED_CACHE_NAME);
+
+            for (int i = 0; i < KEYS_CNT; i++)
+                cache.put(String.valueOf(i), i);
+
+            JavaRDD<Integer> values = ic.fromCache(PARTITIONED_CACHE_NAME).map(STR_INT_PAIR_TO_INT_F);
+
+            int sum = values.fold(0, SUM_F);
+
+            int expSum = (KEYS_CNT * KEYS_CNT + KEYS_CNT) / 2 - KEYS_CNT;
+
+            assertEquals(expSum, sum);
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryObjectsFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+            List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000)
+                .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect();
+
+            assertEquals("Invalid result length", 1, res.size());
+            assertEquals("Invalid result", 50, res.get(0).id());
+            assertEquals("Invalid result", "name50", res.get(0).name());
+            assertEquals("Invalid result", 5000, res.get(0).salary());
+            assertEquals("Invalid count", 500, cache.objectSql("Entity", "id > 500").count());
+        }
+        finally {
+            sc.stop();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryFieldsFromIgnite() throws Exception {
+        JavaSparkContext sc = new JavaSparkContext("local[*]", "test");
+
+        try {
+            JavaIgniteContext<String, Entity> ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider());
+
+            JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME);
+
+            cache.savePairs(sc.parallelize(F.range(0, 1001), 2).mapToPair(INT_TO_ENTITY_F));
+
+            DataFrame df =
+                cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);
+
+            df.printSchema();
+
+            Row[] res = df.collect();
+
+            assertEquals("Invalid result length", 1, res.length);
+            assertEquals("Invalid result", 50, res[0].get(0));
+            assertEquals("Invalid result", "name50", res[0].get(1));
+            assertEquals("Invalid result", 5000, res[0].get(2));
+
+            Column exp = new Column("NAME").equalTo("name50").and(new Column("SALARY").equalTo(5000));
+
+            DataFrame df0 = cache.sql("select id, name, salary from Entity").where(exp);
+
+            df.printSchema();
+
+            Row[] res0 = df0.collect();
+
+            assertEquals("Invalid result length", 1, res0.length);
+            assertEquals("Invalid result", 50, res0[0].get(0));
+            assertEquals("Invalid result", "name50", res0[0].get(1));
+            assertEquals("Invalid result", 5000, res0[0].get(2));
+
+            assertEquals("Invalid count", 500, cache.sql("select id from Entity where id > 500").count());
+        }
+        finally {
+            sc.stop();
+        }
+
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @param client Client.
+     */
+    private static IgniteConfiguration getConfiguration(String gridName, boolean client) throws Exception {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        cfg.setClientMode(client);
+
+        cfg.setGridName(gridName);
+
+        return cfg;
+    }
+
+    /**
+     * Creates cache configuration.
+     */
+    private static CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setBackups(1);
+
+        ccfg.setName(PARTITIONED_CACHE_NAME);
+
+        ccfg.setIndexedTypes(String.class, Entity.class);
+
+        return ccfg;
+    }
+
+    /**
+     * Ignite configiration provider.
+     */
+    static class IgniteConfigProvider implements IgniteOutClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public IgniteConfiguration apply() {
+            try {
+                return getConfiguration("client", true);
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * @param <K>
+     * @param <V>
+     */
+    static class PairToValueFunction<K, V> implements Function<Tuple2<K, V>, V> {
+        /** {@inheritDoc} */
+        @Override public V call(Tuple2<K, V> t) throws Exception {
+            return t._2();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a9d375e5/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
new file mode 100644
index 0000000..a4177f0
--- /dev/null
+++ b/modules/spark/src/test/java/org/apache/ignite/testsuites/IgniteRDDTestSuite.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.spark.JavaEmbeddedIgniteRDDSelfTest;
+import org.apache.ignite.spark.JavaStandaloneIgniteRDDSelfTest;
+
+/**
+ * Test suit for Ignite RDD
+ */
+public class IgniteRDDTestSuite extends TestSuite {
+    /**
+     * @return Java Ignite RDD test suit.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Java Ignite RDD tests (standalone and embedded modes");
+
+        suite.addTest(new TestSuite(JavaEmbeddedIgniteRDDSelfTest.class));
+        suite.addTest(new TestSuite(JavaStandaloneIgniteRDDSelfTest.class));
+
+        return suite;
+    }
+}
\ No newline at end of file


Mime
View raw message