ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/50] incubator-ignite git commit: IGNITE-389 - IPC checked and API improvements.
Date Thu, 11 Jun 2015 07:20:41 GMT
IGNITE-389 - IPC checked and API improvements.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b51f99e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b51f99e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b51f99e

Branch: refs/heads/ignite-998
Commit: 6b51f99e72eb11af25403f8ec50087c03b1f1fb7
Parents: 1d8643c
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Thu Jun 4 19:19:36 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Thu Jun 4 19:19:36 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |   4 +-
 .../shmem/IpcSharedMemoryClientEndpoint.java    |   2 +-
 .../ipc/shmem/IpcSharedMemoryNativeLoader.java  | 150 +++++++++++++++++--
 .../shmem/IpcSharedMemoryServerEndpoint.java    |   2 +-
 .../IpcSharedMemoryCrashDetectionSelfTest.java  |   2 +-
 .../ipc/shmem/IpcSharedMemorySpaceSelfTest.java |   2 +-
 .../ipc/shmem/IpcSharedMemoryUtilsSelfTest.java |   2 +-
 .../LoadWithCorruptedLibFileTestRunner.java     |   2 +-
 .../IpcSharedMemoryBenchmarkReader.java         |   2 +-
 .../IpcSharedMemoryBenchmarkWriter.java         |   2 +-
 .../hadoop/HadoopAbstractSelfTest.java          |   1 +
 .../org/apache/ignite/spark/IgniteContext.scala |  19 ++-
 .../org/apache/ignite/spark/IgniteRDD.scala     |   8 +-
 13 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 0932212..9016b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9025,11 +9025,11 @@ public abstract class IgniteUtils {
                 hasShmem = false;
             else {
                 try {
-                    IpcSharedMemoryNativeLoader.load();
+                    IpcSharedMemoryNativeLoader.load(null);
 
                     hasShmem = true;
                 }
-                catch (IgniteCheckedException e) {
+                catch (IgniteCheckedException ignore) {
                     hasShmem = false;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index 27a234f..c935c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -112,7 +112,7 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
         boolean clear = true;
 
         try {
-            IpcSharedMemoryNativeLoader.load();
+            IpcSharedMemoryNativeLoader.load(log);
 
             sock.connect(new InetSocketAddress("127.0.0.1", port), timeout);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
index dc00ca6..8c345f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryNativeLoader.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.ipc.shmem;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
@@ -25,6 +26,8 @@ import java.net.*;
 import java.nio.channels.*;
 import java.security.*;
 import java.util.*;
+import java.util.jar.*;
+import java.util.zip.*;
 
 import static org.apache.ignite.internal.IgniteVersionUtils.*;
 
@@ -36,6 +39,9 @@ public class IpcSharedMemoryNativeLoader {
     /** Library name base. */
     private static final String LIB_NAME_BASE = "igniteshmem";
 
+    /** Library jar name base. */
+    private static final String JAR_NAME_BASE = "shmem";
+
     /** Library name. */
     static final String LIB_NAME = LIB_NAME_BASE + "-" + VER_STR;
 
@@ -84,9 +90,10 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
+     * @param log Logger, if available. If null, warnings will be printed out to console.
      * @throws IgniteCheckedException If failed.
      */
-    public static void load() throws IgniteCheckedException {
+    public static void load(IgniteLogger log) throws IgniteCheckedException {
         if (loaded)
             return;
 
@@ -94,7 +101,7 @@ public class IpcSharedMemoryNativeLoader {
             if (loaded)
                 return;
 
-            doLoad();
+            doLoad(log);
 
             loaded = true;
         }
@@ -103,7 +110,7 @@ public class IpcSharedMemoryNativeLoader {
     /**
      * @throws IgniteCheckedException If failed.
      */
-    private static void doLoad() throws IgniteCheckedException {
+    private static void doLoad(IgniteLogger log) throws IgniteCheckedException {
         assert Thread.holdsLock(IpcSharedMemoryNativeLoader.class);
 
         Collection<Throwable> errs = new ArrayList<>();
@@ -124,7 +131,7 @@ public class IpcSharedMemoryNativeLoader {
 
         // Obtain lock on file to prevent concurrent extracts.
         try (RandomAccessFile randomAccessFile = new RandomAccessFile(lockFile, "rws");
-             FileLock ignored = randomAccessFile.getChannel().lock()) {
+            FileLock ignored = randomAccessFile.getChannel().lock()) {
             if (extractAndLoad(errs, tmpDir, platformSpecificResourcePath()))
                 return;
 
@@ -134,6 +141,30 @@ public class IpcSharedMemoryNativeLoader {
             if (extractAndLoad(errs, tmpDir, resourcePath()))
                 return;
 
+            try {
+                U.quietAndWarn(log, "Failed to load 'igniteshmem' library from classpath.
Will try to load it from IGNITE_HOME.");
+
+                String igniteHome = X.resolveIgniteHome();
+
+                File shmemJar = findShmemJar(errs, igniteHome);
+
+                if (shmemJar != null) {
+                    try (JarFile jar = new JarFile(shmemJar, false, JarFile.OPEN_READ)) {
+                        if (extractAndLoad(errs, jar, tmpDir, platformSpecificResourcePath()))
+                            return;
+
+                        if (extractAndLoad(errs, jar, tmpDir, osSpecificResourcePath()))
+                            return;
+
+                        if (extractAndLoad(errs, jar, tmpDir, resourcePath()))
+                            return;
+                    }
+                }
+            }
+            catch (IgniteCheckedException ignore) {
+
+            }
+
             // Failed to find the library.
             assert !errs.isEmpty();
 
@@ -145,6 +176,32 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
+     * Tries to find shmem jar in IGNITE_HOME/libs folder.
+     *
+     * @param errs Collection of errors to add readable exception to.
+     * @param igniteHome Resolver IGNITE_HOME variable.
+     * @return File, if found.
+     */
+    private static File findShmemJar(Collection<Throwable> errs, String igniteHome)
{
+        File libs = new File(igniteHome, "libs");
+
+        if (!libs.exists() || libs.isFile()) {
+            errs.add(new IllegalStateException("Failed to find libs folder in resolved IGNITE_HOME:
" + igniteHome));
+
+            return null;
+        }
+
+        for (File lib : libs.listFiles()) {
+            if (lib.getName().endsWith(".jar") && lib.getName().contains(JAR_NAME_BASE))
+                return lib;
+        }
+
+        errs.add(new IllegalStateException("Failed to find shmem jar in resolved IGNITE_HOME:
" + igniteHome));
+
+        return null;
+    }
+
+    /**
      * Gets temporary directory unique for each OS user.
      * The directory guaranteed to exist, though may not be empty.
      */
@@ -220,6 +277,24 @@ public class IpcSharedMemoryNativeLoader {
 
     /**
      * @param errs Errors collection.
+     * @param rsrcPath Path.
+     * @return {@code True} if library was found and loaded.
+     */
+    private static boolean extractAndLoad(Collection<Throwable> errs, JarFile jar,
File tmpDir, String rsrcPath) {
+        ZipEntry rsrc = jar.getEntry(rsrcPath);
+
+        if (rsrc != null)
+            return extract(errs, rsrc, jar, new File(tmpDir, mapLibraryName(LIB_NAME)));
+        else {
+            errs.add(new IllegalStateException("Failed to find resource within specified
jar file " +
+                "[rsrc=" + rsrcPath + ", jar=" + jar.getName() + ']'));
+
+            return false;
+        }
+    }
+
+    /**
+     * @param errs Errors collection.
      * @param src Source.
      * @param target Target.
      * @return {@code True} if resource was found and loaded.
@@ -230,7 +305,7 @@ public class IpcSharedMemoryNativeLoader {
         InputStream is = null;
 
         try {
-            if (!target.exists() || !haveEqualMD5(target, src)) {
+            if (!target.exists() || !haveEqualMD5(target, src.openStream())) {
                 is = src.openStream();
 
                 if (is != null) {
@@ -265,20 +340,69 @@ public class IpcSharedMemoryNativeLoader {
     }
 
     /**
-     * @param target Target.
+     * @param errs Errors collection.
      * @param src Source.
+     * @param target Target.
+     * @return {@code True} if resource was found and loaded.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private static boolean extract(Collection<Throwable> errs, ZipEntry src, JarFile
jar, File target) {
+        FileOutputStream os = null;
+        InputStream is = null;
+
+        try {
+            if (!target.exists() || !haveEqualMD5(target, jar.getInputStream(src))) {
+                is = jar.getInputStream(src);
+
+                if (is != null) {
+                    os = new FileOutputStream(target);
+
+                    int read;
+
+                    byte[] buf = new byte[4096];
+
+                    while ((read = is.read(buf)) != -1)
+                        os.write(buf, 0, read);
+                }
+            }
+
+            // chmod 775.
+            if (!U.isWindows())
+                Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor();
+
+            System.load(target.getPath());
+
+            return true;
+        }
+        catch (IOException | UnsatisfiedLinkError | InterruptedException | NoSuchAlgorithmException
e) {
+            errs.add(e);
+        }
+        finally {
+            U.closeQuiet(os);
+            U.closeQuiet(is);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param target Target.
+     * @param srcIS Source input stream.
      * @return {@code True} if target md5-sum equal to source md5-sum.
      * @throws NoSuchAlgorithmException If md5 algorithm was not found.
      * @throws IOException If an I/O exception occurs.
      */
-    private static boolean haveEqualMD5(File target, URL src) throws NoSuchAlgorithmException,
IOException {
-        try (InputStream targetIS = new FileInputStream(target);
-             InputStream srcIS = src.openStream()) {
-
-            String targetMD5 = U.calculateMD5(targetIS);
-            String srcMD5 = U.calculateMD5(srcIS);
+    private static boolean haveEqualMD5(File target, InputStream srcIS) throws NoSuchAlgorithmException,
IOException {
+        try {
+            try (InputStream targetIS = new FileInputStream(target)) {
+                String targetMD5 = U.calculateMD5(targetIS);
+                String srcMD5 = U.calculateMD5(srcIS);
 
-            return targetMD5.equals(srcMD5);
+                return targetMD5.equals(srcMD5);
+            }
+        }
+        finally {
+            srcIS.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
index 5185856..102c5b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryServerEndpoint.java
@@ -146,7 +146,7 @@ public class IpcSharedMemoryServerEndpoint implements IpcServerEndpoint
{
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log);
 
         pid = IpcSharedMemoryUtils.pid();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
index 2ddf6f3..c6f590e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryCrashDetectionSelfTest.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryCrashDetectionSelfTest extends GridCommonAbstractTes
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
index 7dc0870..4afb64b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemorySpaceSelfTest.java
@@ -51,7 +51,7 @@ public class IpcSharedMemorySpaceSelfTest extends GridCommonAbstractTest
{
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
index 4c5413c..176429e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryUtilsSelfTest.java
@@ -31,7 +31,7 @@ public class IpcSharedMemoryUtilsSelfTest extends GridCommonAbstractTest
{
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(log());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
index 8ff827b..8fee239 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/LoadWithCorruptedLibFileTestRunner.java
@@ -37,7 +37,7 @@ public class LoadWithCorruptedLibFileTestRunner {
 
         createCorruptedLibFile();
 
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
index 28495af..89eeda1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkReader.java
@@ -43,7 +43,7 @@ public class IpcSharedMemoryBenchmarkReader implements IpcSharedMemoryBenchmarkP
      * @throws IgniteCheckedException If failed.
      */
     public static void main(String[] args) throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
 
         int nThreads = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
index 2ade145..e8a8402 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/shmem/benchmark/IpcSharedMemoryBenchmarkWriter.java
@@ -42,7 +42,7 @@ public class IpcSharedMemoryBenchmarkWriter implements IpcSharedMemoryBenchmarkP
      * @throws IgniteCheckedException If failed.
      */
     public static void main(String[] args) throws IgniteCheckedException {
-        IpcSharedMemoryNativeLoader.load();
+        IpcSharedMemoryNativeLoader.load(null);
 
         int nThreads = args.length > 0 ? Integer.parseInt(args[0]) : 1;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index 517a587..a3c9bde 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
index 5cdbad0..2cfebd6 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteContext.scala
@@ -34,8 +34,23 @@ import org.apache.spark.sql.SQLContext
  */
 class IgniteContext[K, V](
     @scala.transient val sparkContext: SparkContext,
-    cfgF: () ⇒ IgniteConfiguration
+    cfgF: () ⇒ IgniteConfiguration,
+    client: Boolean = true
 ) extends Serializable {
+    @scala.transient private val driver = true
+
+    if (!client) {
+        val workers = sparkContext.getExecutorStorageStatus.length - 1
+
+        if (workers <= 0)
+            throw new IllegalStateException("No Spark executors found to start Ignite nodes.")
+
+        println("Will start Ignite nodes on " + workers + " workers")
+
+        // Start ignite server node on each worker in server mode.
+        sparkContext.parallelize(1 to workers, workers).foreach(it ⇒ ignite())
+    }
+
     def this(
         sc: SparkContext,
         springUrl: String
@@ -62,7 +77,7 @@ class IgniteContext[K, V](
         catch {
             case e: Exception ⇒
                 try {
-                    igniteCfg.setClientMode(true)
+                    igniteCfg.setClientMode(client || driver)
 
                     Ignition.start(igniteCfg)
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b51f99e/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
----------------------------------------------------------------------
diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
index 0b8e845..0d1a3be 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala
@@ -114,7 +114,7 @@ class IgniteRDD[K, V] (
         ic.sqlContext.createDataFrame(rowRdd, schema)
     }
 
-    def saveValues(rdd: RDD[V]) = {
+    def saveValues(rdd: RDD[V], overwrite: Boolean = false) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -127,6 +127,8 @@ class IgniteRDD[K, V] (
             val streamer = ig.dataStreamer[Object, V](cacheName)
 
             try {
+                streamer.allowOverwrite(overwrite)
+
                 it.foreach(value ⇒ {
                     val key = affinityKeyFunc(value, node.orNull)
 
@@ -139,7 +141,7 @@ class IgniteRDD[K, V] (
         })
     }
 
-    def savePairs(rdd: RDD[(K, V)]) = {
+    def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = {
         rdd.foreachPartition(it ⇒ {
             val ig = ic.ignite()
 
@@ -149,6 +151,8 @@ class IgniteRDD[K, V] (
             val streamer = ig.dataStreamer[K, V](cacheName)
 
             try {
+                streamer.allowOverwrite(overwrite)
+
                 it.foreach(tup ⇒ {
                     streamer.addData(tup._1, tup._2)
                 })


Mime
View raw message