ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/6] incubator-ignite git commit: # IGNITE-465: WIP.
Date Thu, 12 Mar 2015 14:29:52 GMT
# IGNITE-465: WIP.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16863f2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16863f2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16863f2c

Branch: refs/heads/ignite-465
Commit: 16863f2c09e71867ae64ab0216a65e34014874ec
Parents: 91e60c5
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Mar 12 16:53:52 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Mar 12 16:53:52 2015 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  55 ++----
 .../igfs/IgfsIpcEndpointConfiguration.java      | 190 +++++++++++++++++--
 .../internal/processors/igfs/IgfsServer.java    |  49 ++++-
 .../processors/igfs/IgfsServerManager.java      |  35 ++--
 .../internal/util/ipc/IpcEndpointFactory.java   |   4 +-
 .../util/ipc/IpcServerEndpointDeserializer.java |  66 -------
 .../util/ipc/IpcServerEndpointFactory.java      |  72 +++++++
 .../IpcServerEndpointDeserializerSelfTest.java  | 160 ----------------
 8 files changed, 331 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index f679fc0..5793df1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -34,9 +34,6 @@ public class FileSystemConfiguration {
     /** Default file system user name. */
     public static final String DFLT_USER_NAME = System.getProperty("user.name", "anonymous");
 
-    /** Default IPC port. */
-    public static final int DFLT_IPC_PORT = 10500;
-
     /** Default fragmentizer throttling block length. */
     public static final long DFLT_FRAGMENTIZER_THROTTLING_BLOCK_LENGTH = 16 * 1024 * 1024;
 
@@ -109,8 +106,8 @@ public class FileSystemConfiguration {
     /** Per node parallel operations. */
     private int perNodeParallelBatchCnt = DFLT_PER_NODE_PARALLEL_BATCH_CNT;
 
-    /** IPC endpoint properties to publish IGFS over. */
-    private Map<String, String> ipcEndpointCfg;
+    /** IPC endpoint configuration. */
+    private IgfsIpcEndpointConfiguration ipcEndpointCfg;
 
     /** IPC endpoint enabled flag. */
     private boolean ipcEndpointEnabled = DFLT_IPC_ENDPOINT_ENABLED;
@@ -401,52 +398,35 @@ public class FileSystemConfiguration {
     }
 
     /**
-     * Gets map of IPC endpoint configuration properties. There are 2 different
-     * types of endpoint supported: {@code shared-memory}, and {@code TCP}.
-     * <p>
-     * The following configuration properties are supported for {@code shared-memory}
-     * endpoint:
-     * <ul>
-     *     <li>{@code type} - value is {@code shmem} to specify {@code shared-memory}
approach.</li>
-     *     <li>{@code port} - endpoint port.</li>
-     *     <li>{@code size} - memory size allocated for single endpoint communication.</li>
-     *     <li>
-     *         {@code tokenDirectoryPath} - path, either absolute or relative to {@code IGNITE_HOME}
to
-     *         store shared memory tokens.
-     *     </li>
-     * </ul>
-     * <p>
-     * The following configuration properties are supported for {@code TCP} approach:
-     * <ul>
-     *     <li>{@code type} - value is {@code tcp} to specify {@code TCP} approach.</li>
-     *     <li>{@code port} - endpoint bind port.</li>
-     *     <li>
-     *         {@code host} - endpoint bind host. If omitted '127.0.0.1' will be used.
-     *     </li>
-     * </ul>
+     * Gets IPC endpoint configuration.
      * <p>
-     * Note that {@code shared-memory} approach is not supported on Windows environments.
-     * In case IGFS is failed to bind to particular port, further attempts will be performed
every 3 seconds.
+     * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem}
shipped with <b>Ignite
+     * Hadoop Accelerator</b>.
      *
-     * @return Map of IPC endpoint configuration properties. In case the value is not set,
defaults will be used. Default
-     * type for Windows is "tcp", for all other platforms - "shmem". Default port is {@link
#DFLT_IPC_PORT}.
+     * @return IPC endpoint configuration.
      */
-    @Nullable public Map<String,String> getIpcEndpointConfiguration() {
+    @Nullable public IgfsIpcEndpointConfiguration getIpcEndpointConfiguration() {
         return ipcEndpointCfg;
     }
 
     /**
-     * Sets IPC endpoint configuration to publish IGFS over.
+     * Sets IPC endpoint configuration.
+     * <p>
+     * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem}
shipped with <b>Ignite
+     * Hadoop Accelerator</b>.
      *
-     * @param ipcEndpointCfg Map of IPC endpoint config properties.
+     * @param ipcEndpointCfg IPC endpoint configuration.
      */
-    public void setIpcEndpointConfiguration(@Nullable Map<String,String> ipcEndpointCfg)
{
+    public void setIpcEndpointConfiguration(@Nullable IgfsIpcEndpointConfiguration ipcEndpointCfg)
{
         this.ipcEndpointCfg = ipcEndpointCfg;
     }
 
     /**
      * Get IPC endpoint enabled flag. In case it is set to {@code true} endpoint will be
created and bound to specific
      * port. Otherwise endpoint will not be created. Default value is {@link #DFLT_IPC_ENDPOINT_ENABLED}.
+     * <p>
+     * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem}
shipped with <b>Ignite
+     * Hadoop Accelerator</b>.
      *
      * @return {@code True} in case endpoint is enabled.
      */
@@ -456,6 +436,9 @@ public class FileSystemConfiguration {
 
     /**
      * Set IPC endpoint enabled flag. See {@link #isIpcEndpointEnabled()}.
+     * <p>
+     * Endpoint is needed for communication between IGFS and {@code IgniteHadoopFileSystem}
shipped with <b>Ignite
+     * Hadoop Accelerator</b>.
      *
      * @param ipcEndpointEnabled IPC endpoint enabled flag.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
index 9fa6e09..6aaf739 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java
@@ -17,63 +17,225 @@
 
 package org.apache.ignite.igfs;
 
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
 import static org.apache.ignite.igfs.IgfsIpcEndpointType.*;
 
 /**
- *
+ * IGFS IPC endpoint configuration.
  */
 public class IgfsIpcEndpointConfiguration {
-    /** */
+    /** Default endpoint type; TCP for Windows, SHMEM otherwise. */
     public static IgfsIpcEndpointType DFLT_TYP = U.isWindows() ? TCP : SHMEM;
 
-    /** */
+    /** Default host. */
     public static String DFLT_HOST = "127.0.0.1";
 
-    /** */
+    /** Default port. */
     public static int DFLT_PORT = 10500;
 
     /** Default shared memory space in bytes. */
-    public static final int DFLT_SPACE_SIZE = 256 * 1024;
+    public static final int DFLT_MEM_SIZE = 256 * 1024;
 
     /**
      * Default token directory. Note that this path is relative to {@code IGNITE_HOME/work}
folder
      * if {@code IGNITE_HOME} system or environment variable specified, otherwise it is relative
to
      * {@code work} folder under system {@code java.io.tmpdir} folder.
      *
-     * @see org.apache.ignite.configuration.IgniteConfiguration#getWorkDirectory()
+     * @see IgniteConfiguration#getWorkDirectory()
      */
     public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem";
 
-    /** */
+    /** Endpoint type. */
     private IgfsIpcEndpointType typ = DFLT_TYP;
 
-    /** */
+    /** Host. */
     private String host = DFLT_HOST;
 
-    /** */
+    /** Port. */
     private int port = DFLT_PORT;
 
-    /** */
-    private int spaceSize = DFLT_SPACE_SIZE;
+    /** Space size. */
+    private int memSize = DFLT_MEM_SIZE;
 
-    /** */
+    /** Token directory path. */
     private String tokenDirPath = DFLT_TOKEN_DIR_PATH;
 
     /**
+     * Default constructor.
+     */
+    public IgfsIpcEndpointConfiguration() {
+        // No-op.
+    }
+
+    /**
+     * Copying constructor.
      *
-     * @return
+     * @param cfg Configuration to copy.
+     */
+    public IgfsIpcEndpointConfiguration(IgfsIpcEndpointConfiguration cfg) {
+        typ = cfg.getType();
+        host = cfg.getHost();
+        port = cfg.getPort();
+        memSize = cfg.getMemorySize();
+        tokenDirPath = cfg.getTokenDirectoryPath();
+    }
+
+    /**
+     * Gets endpoint type. There are two endpoints types: {@code SHMEM} working over shared
memory, and {@code TCP}
+     * working over sockets.
+     * <p>
+     * Shared memory is recommended approach for Linux-based systems. For Windows TCP is
the only available option.
+     * <p>
+     * Defaults to {@link #DFLT_TYP}.
+     *
+     * @return Endpoint type.
      */
     public IgfsIpcEndpointType getType() {
         return typ;
     }
 
     /**
+     * Sets endpoint type. There are two endpoints types: {@link IgfsIpcEndpointType#SHMEM}
working over shared memory,
+     * and {@link IgfsIpcEndpointType#TCP} working over sockets.
+     * <p>
+     * Shared memory is recommended approach for Linux-based systems. For Windows TCP is
the only available option.
+     * <p>
+     * Defaults to {@link #DFLT_TYP}.
      *
-     * @param typ
+     * @param typ Endpoint type.
      */
     public void setType(IgfsIpcEndpointType typ) {
         this.typ = typ;
     }
+
+    /**
+     * Gets the host endpoint is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#TCP} endpoint this is the network interface server
socket is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to
perform an initial handshake.
+     * All further communication is performed over shared memory. Therefore, for {@code SHMEM}
this value is ignored
+     * and socket will be always bound to {@link #DFLT_HOST}.
+     * <p>
+     * Defaults to {@link #DFLT_HOST}.
+     *
+     * @return Host.
+     */
+    public String getHost() {
+        return host;
+    }
+
+    /**
+     * Sets the host endpoint is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#TCP} endpoint this is the network interface server
socket is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to
perform an initial handshake.
+     * All further communication is performed over shared memory. Therefore, for {@code SHMEM}
this value is ignored
+     * and socket will be always bound to {@link #DFLT_HOST}.
+     * <p>
+     * Defaults to {@link #DFLT_HOST}.
+     *
+     * @param host Host.
+     */
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    /**
+     * Gets the port endpoint is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#TCP} endpoint this is the port server socket is bound
to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to
perform an initial handshake.
+     * All further communication is performed over shared memory.
+     * <p>
+     * Defaults to {@link #DFLT_PORT}.
+     *
+     * @return Port.
+     */
+    public int getPort() {
+        return port;
+    }
+
+    /**
+     * Sets the port endpoint is bound to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#TCP} endpoint this is the port server socket is bound
to.
+     * <p>
+     * For {@link IgfsIpcEndpointType#SHMEM} endpoint socket connection is needed only to
perform an initial handshake.
+     * All further communication is performed over shared memory.
+     * <p>
+     * Defaults to {@link #DFLT_PORT}.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Gets shared memory size in bytes allocated for endpoint communication.
+     * <p>
+     * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint.
+     * <p>
+     * Defaults to {@link #DFLT_MEM_SIZE}.
+     *
+     * @return Shared memory size.
+     */
+    public int getMemorySize() {
+        return memSize;
+    }
+
+    /**
+     * Sets shared memory size in bytes allocated for endpoint communication.
+     * <p>
+     * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint.
+     * <p>
+     * Defaults to {@link #DFLT_MEM_SIZE}.
+     *
+     * @param memSize Shared memory size.
+     */
+    public void setMemorySize(int memSize) {
+        this.memSize = memSize;
+    }
+
+    /**
+     * Gets directory where shared memory tokens are stored.
+     * <p>
+     * Note that this path is relative to {@code IGNITE_HOME/work} folder if {@code IGNITE_HOME}
system or environment
+     * variable specified, otherwise it is relative to {@code work} folder under system {@code
java.io.tmpdir} folder.
+     * <p>
+     * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint.
+     * <p>
+     * Defaults to {@link #DFLT_TOKEN_DIR_PATH}.
+     *
+     * @return Directory where shared memory tokens are stored.
+     */
+    public String getTokenDirectoryPath() {
+        return tokenDirPath;
+    }
+
+    /**
+     * Sets directory where shared memory tokens are stored.
+     * <p>
+     * Note that this path is relative to {@code IGNITE_HOME/work} folder if {@code IGNITE_HOME}
system or environment
+     * variable specified, otherwise it is relative to {@code work} folder under system {@code
java.io.tmpdir} folder.
+     * <p>
+     * Ignored for {@link IgfsIpcEndpointType#TCP} endpoint.
+     * <p>
+     * Defaults to {@link #DFLT_TOKEN_DIR_PATH}.
+     *
+     * @param tokenDirPath Directory where shared memory tokens are stored.
+     */
+    public void setTokenDirectoryPath(String tokenDirPath) {
+        this.tokenDirPath = tokenDirPath;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsIpcEndpointConfiguration.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 1146812..0cd1a62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.igfs.common.*;
 import org.apache.ignite.internal.util.ipc.*;
@@ -31,7 +32,6 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
-import java.util.*;
 
 import static org.apache.ignite.spi.IgnitePortProtocol.*;
 
@@ -49,7 +49,7 @@ public class IgfsServer {
     private final IgfsMarshaller marsh;
 
     /** Endpoint configuration. */
-    private final Map<String,String> endpointCfg;
+    private final IgfsIpcEndpointConfiguration endpointCfg;
 
     /** Server endpoint. */
     private IpcServerEndpoint srvEndpoint;
@@ -72,7 +72,7 @@ public class IgfsServer {
      * @param endpointCfg Endpoint configuration to start.
      * @param mgmt Management flag - if true, server is intended to be started for Visor.
      */
-    public IgfsServer(IgfsContext igfsCtx, Map<String, String> endpointCfg, boolean
mgmt) {
+    public IgfsServer(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean
mgmt) {
         assert igfsCtx != null;
         assert endpointCfg != null;
 
@@ -91,7 +91,7 @@ public class IgfsServer {
      * @throws IgniteCheckedException If failed.
      */
     public void start() throws IgniteCheckedException {
-        srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg);
+        srvEndpoint = createEndpoint(endpointCfg, mgmt);
 
         if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint)
             throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName()
+
@@ -135,6 +135,47 @@ public class IgfsServer {
     }
 
     /**
+     * Create server IPC endpoint.
+     *
+     * @param endpointCfg Endpoint configuration.
+     * @param mgmt Management flag.
+     * @return Server endpoint.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static IpcServerEndpoint createEndpoint(IgfsIpcEndpointConfiguration endpointCfg,
boolean mgmt)
+        throws IgniteCheckedException {
+        A.notNull(endpointCfg, "endpointCfg");
+
+        IgfsIpcEndpointType typ = endpointCfg.getType();
+
+        if (typ == null)
+            throw new IgniteCheckedException("Failed to create server endpoint (type is not
specified)");
+
+        switch (typ) {
+            case SHMEM: {
+                IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint();
+
+                endpoint.setPort(endpointCfg.getPort());
+                endpoint.setSize(endpointCfg.getMemorySize());
+                endpoint.setTokenDirectoryPath(endpointCfg.getTokenDirectoryPath());
+
+                return endpoint;
+            }
+            case TCP: {
+                IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint();
+
+                endpoint.setHost(endpointCfg.getHost());
+                endpoint.setPort(endpointCfg.getPort());
+                endpoint.setManagement(mgmt);
+
+                return endpoint;
+            }
+            default:
+                throw new IgniteCheckedException("Failed to create server endpoint (type
is unknown): " + typ);
+        }
+    }
+
+    /**
      * Callback that is invoked when kernal is ready.
      */
     public void onKernalStart() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
index 643eeff..2cd51f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.util.ipc.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -30,6 +31,7 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.configuration.FileSystemConfiguration.*;
+import static org.apache.ignite.igfs.IgfsIpcEndpointType.*;
 
 /**
  * IGFS server manager.
@@ -50,26 +52,23 @@ public class IgfsServerManager extends IgfsManager {
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         FileSystemConfiguration igfsCfg = igfsCtx.configuration();
-        Map<String,String> cfg = igfsCfg.getIpcEndpointConfiguration();
 
-        if (F.isEmpty(cfg)) {
-            // Set default configuration.
-            cfg = new HashMap<>();
+        if (igfsCfg.isIpcEndpointEnabled()) {
+            IgfsIpcEndpointConfiguration ipcCfg = igfsCfg.getIpcEndpointConfiguration();
 
-            cfg.put("type", U.isWindows() ? "tcp" : "shmem");
-            cfg.put("port", String.valueOf(DFLT_IPC_PORT));
-        }
+            if (ipcCfg == null)
+                ipcCfg = new IgfsIpcEndpointConfiguration();
 
-        if (igfsCfg.isIpcEndpointEnabled())
-            bind(cfg, /*management*/false);
+            bind(ipcCfg, /*management*/false);
+        }
 
         if (igfsCfg.getManagementPort() >= 0) {
-            cfg = new HashMap<>();
+            IgfsIpcEndpointConfiguration mgmtIpcCfg = new IgfsIpcEndpointConfiguration();
 
-            cfg.put("type", "tcp");
-            cfg.put("port", String.valueOf(igfsCfg.getManagementPort()));
+            mgmtIpcCfg.setType(TCP);
+            mgmtIpcCfg.setPort(igfsCfg.getManagementPort());
 
-            bind(cfg, /*management*/true);
+            bind(mgmtIpcCfg, /*management*/true);
         }
 
         if (bindWorker != null)
@@ -84,7 +83,7 @@ public class IgfsServerManager extends IgfsManager {
      * @param mgmt {@code True} if endpoint is management.
      * @throws IgniteCheckedException If failed.
      */
-    private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws
IgniteCheckedException {
+    private void bind(final IgfsIpcEndpointConfiguration endpointCfg, final boolean mgmt)
throws IgniteCheckedException {
         if (srvrs == null)
             srvrs = new ConcurrentLinkedQueue<>();
 
@@ -155,7 +154,7 @@ public class IgfsServerManager extends IgfsManager {
     @SuppressWarnings("BusyWait")
     private class BindWorker extends GridWorker {
         /** Configurations to bind. */
-        private Collection<IgniteBiTuple<Map<String, String>, Boolean>>
bindCfgs = new LinkedList<>();
+        private Collection<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>>
bindCfgs = new LinkedList<>();
 
         /**
          * Constructor.
@@ -170,7 +169,7 @@ public class IgfsServerManager extends IgfsManager {
          * @param cfg Configuration.
          * @param mgmt Management flag.
          */
-        public void addConfiguration(Map<String, String> cfg, boolean mgmt) {
+        public void addConfiguration(IgfsIpcEndpointConfiguration cfg, boolean mgmt) {
             bindCfgs.add(F.t(cfg, mgmt));
         }
 
@@ -181,10 +180,10 @@ public class IgfsServerManager extends IgfsManager {
             while (!isCancelled()) {
                 Thread.sleep(REBIND_INTERVAL);
 
-                Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it
= bindCfgs.iterator();
+                Iterator<IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean>>
it = bindCfgs.iterator();
 
                 while (it.hasNext()) {
-                    IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next();
+                    IgniteBiTuple<IgfsIpcEndpointConfiguration, Boolean> cfg = it.next();
 
                     IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java
index 06710fb..4debeac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcEndpointFactory.java
@@ -54,8 +54,8 @@ public class IpcEndpointFactory {
             port = -1;
 
         return "shmem".equalsIgnoreCase(split[0]) ?
-            connectSharedMemoryEndpoint(port > 0 ? port : IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT,
log) :
-            connectTcpEndpoint(split[0], port > 0 ? port : IpcServerTcpEndpoint.DFLT_IPC_PORT);
+            connectSharedMemoryEndpoint(port > 0 ? port : IpcSharedMemoryServerEndpoint.DFLT_PORT,
log) :
+            connectTcpEndpoint(split[0], port > 0 ? port : IpcServerTcpEndpoint.DFLT_PORT);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java
deleted file mode 100644
index 07bc28b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.ipc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.ipc.loopback.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Grid IpcServerEndpoint configuration deserializer.
- */
-public class IpcServerEndpointDeserializer {
-    /**
-     * Deserializes IPC server endpoint config into concrete
-     * instance of {@link IpcServerEndpoint}.
-     *
-     * @param endpointCfg Map with properties of the IPC server endpoint config.
-     * @return Deserialized instance of {@link IpcServerEndpoint}.
-     * @throws IgniteCheckedException If any problem with configuration properties setting
has happened.
-     */
-    public static IpcServerEndpoint deserialize(Map<String,String> endpointCfg) throws
IgniteCheckedException {
-        A.notNull(endpointCfg, "endpointCfg");
-
-        String endpointType = endpointCfg.get("type");
-
-        if (endpointType == null)
-            throw new IgniteCheckedException("Failed to create server endpoint (type is not
specified)");
-
-        switch (endpointType) {
-            case "shmem": {
-                IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint();
-
-                endpoint.setupConfiguration(endpointCfg);
-
-                return endpoint;
-            }
-            case "tcp": {
-                IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint();
-
-                endpoint.setupConfiguration(endpointCfg);
-
-                return endpoint;
-            }
-            default:
-                throw new IgniteCheckedException("Failed to create server endpoint (type
is unknown): " + endpointType);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointFactory.java
new file mode 100644
index 0000000..13ebcd6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.ipc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.ipc.loopback.*;
+import org.apache.ignite.internal.util.ipc.shmem.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Grid IpcServerEndpoint configuration deserializer.
+ */
+public class IpcServerEndpointFactory {
+    /**
+     * Create IPC server endpoint from configuration.
+     *
+     * @param endpointCfg Map with properties of the IPC server endpoint config.
+     * @param mgmt Management flag.
+     * @return Deserialized instance of {@link IpcServerEndpoint}.
+     * @throws IgniteCheckedException If any problem with configuration properties setting
has happened.
+     */
+    public static IpcServerEndpoint create(IgfsIpcEndpointConfiguration endpointCfg, boolean
mgmt)
+        throws IgniteCheckedException {
+        A.notNull(endpointCfg, "endpointCfg");
+
+        IgfsIpcEndpointType typ = endpointCfg.getType();
+
+        if (typ == null)
+            throw new IgniteCheckedException("Failed to create server endpoint (type is not
specified)");
+
+        switch (typ) {
+            case SHMEM: {
+                IpcSharedMemoryServerEndpoint endpoint = new IpcSharedMemoryServerEndpoint();
+
+                endpoint.setPort(endpointCfg.getPort());
+                endpoint.setSize(endpointCfg.getMemorySize());
+                endpoint.setTokenDirectoryPath(endpointCfg.getTokenDirectoryPath());
+
+                return endpoint;
+            }
+            case TCP: {
+                IpcServerTcpEndpoint endpoint = new IpcServerTcpEndpoint();
+
+                endpoint.setHost(endpointCfg.getHost());
+                endpoint.setPort(endpointCfg.getPort());
+                endpoint.setManagement(mgmt);
+
+                return endpoint;
+            }
+            default:
+                throw new IgniteCheckedException("Failed to create server endpoint (type
is unknown): " + typ);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16863f2c/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java
deleted file mode 100644
index f5aa591..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/ipc/IpcServerEndpointDeserializerSelfTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.ipc;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.ipc.loopback.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.testframework.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Tests for {@code IpcServerEndpointDeserializer}.
- */
-public class IpcServerEndpointDeserializerSelfTest extends IgfsCommonAbstractTest {
-    /** */
-    private Map<String,String> shmemSrvEndpoint;
-
-    /** */
-    private Map<String,String> tcpSrvEndpoint;
-
-    /**
-     * Initialize test stuff.
-     */
-    @Override protected void beforeTest() throws Exception {
-        shmemSrvEndpoint = new HashMap<>();
-        shmemSrvEndpoint.put("port", "888");
-        shmemSrvEndpoint.put("size", "111");
-        shmemSrvEndpoint.put("tokenDirectoryPath", "test-my-path-baby");
-
-        tcpSrvEndpoint = new HashMap<>();
-        tcpSrvEndpoint.put("port", "999");
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfCfgIsNull() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @SuppressWarnings("NullableProblems")
-            @Override public Object call() throws Exception {
-                return IpcServerEndpointDeserializer.deserialize(null);
-            }
-        }, NullPointerException.class, "Ouch! Argument cannot be null: endpointCfg");
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfShmemAndNoTypeInfoInJson() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                return IpcServerEndpointDeserializer.deserialize(shmemSrvEndpoint);
-            }
-        }, IgniteCheckedException.class, "Failed to create server endpoint (type is not specified)");
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfShmemAndNoUnknownTypeInfoInJson() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                Map<String, String> endPnt = new HashMap<>();
-
-                endPnt.putAll(shmemSrvEndpoint);
-                endPnt.put("type", "unknownEndpointType");
-
-                return IpcServerEndpointDeserializer.deserialize(endPnt);
-            }
-        }, IgniteCheckedException.class, "Failed to create server endpoint (type is unknown):
unknownEndpointType");
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfLoopbackAndJsonIsLightlyBroken() throws Exception {
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                return IpcServerEndpointDeserializer.deserialize(tcpSrvEndpoint);
-            }
-        }, IgniteCheckedException.class, null);
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfShmemAndJsonIsOk() throws Exception {
-        Map<String, String> endPnt = new HashMap<>();
-
-        endPnt.putAll(shmemSrvEndpoint);
-        endPnt.put("type", "shmem");
-
-        IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt);
-
-        assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint);
-
-        IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized;
-
-        assertEquals(shmemSrvEndpoint.get("port"), String.valueOf(deserializedShmemEndpoint.getPort()));
-        assertEquals(shmemSrvEndpoint.get("size"), String.valueOf(deserializedShmemEndpoint.getSize()));
-        assertEquals(shmemSrvEndpoint.get("tokenDirectoryPath"), deserializedShmemEndpoint.getTokenDirectoryPath());
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfShmemAndJsonIsOkAndDefaultValuesAreSetToFields() throws
Exception {
-        IpcSharedMemoryServerEndpoint defShmemSrvEndpoint = new IpcSharedMemoryServerEndpoint();
-        defShmemSrvEndpoint.setPort(8);
-
-        Map<String, String> endPnt = new HashMap<>();
-
-        endPnt.put("type", "shmem");
-        endPnt.put("port", String.valueOf(defShmemSrvEndpoint.getPort()));
-
-        IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt);
-
-        assertTrue(deserialized instanceof IpcSharedMemoryServerEndpoint);
-
-        IpcSharedMemoryServerEndpoint deserializedShmemEndpoint = (IpcSharedMemoryServerEndpoint)deserialized;
-
-        assertEquals(defShmemSrvEndpoint.getPort(), deserializedShmemEndpoint.getPort());
-        assertEquals(defShmemSrvEndpoint.getSize(), deserializedShmemEndpoint.getSize());
-        assertEquals(defShmemSrvEndpoint.getTokenDirectoryPath(), deserializedShmemEndpoint.getTokenDirectoryPath());
-    }
-
-    /**
-     * @throws Exception In case of any exception.
-     */
-    public void testDeserializeIfLoopbackAndJsonIsOk() throws Exception {
-        Map<String, String> endPnt = new HashMap<>();
-
-        endPnt.putAll(tcpSrvEndpoint);
-        endPnt.put("type", "tcp");
-
-        IpcServerEndpoint deserialized = IpcServerEndpointDeserializer.deserialize(endPnt);
-
-        assertTrue(deserialized instanceof IpcServerTcpEndpoint);
-
-        assertEquals(tcpSrvEndpoint.get("port"), String.valueOf(deserialized.getPort()));
-    }
-}


Mime
View raw message