ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [11/31] incubator-ignite git commit: #ignite-128: All classes are moved.
Date Tue, 03 Feb 2015 14:18:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
new file mode 100644
index 0000000..19d0bd6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnection.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.internal.client.impl.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import javax.net.ssl.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Facade for all possible network communications between client and server. Introduced to hide
+ * protocol implementation (TCP, HTTP) from client code.
+ */
+public abstract class GridClientConnection {
+    /** Topology */
+    protected GridClientTopology top;
+
+    /** Client id. */
+    protected final UUID clientId;
+
+    /** Server address this connection connected to */
+    private InetSocketAddress srvAddr;
+
+    /** SSL context to use if ssl is enabled. */
+    private SSLContext sslCtx;
+
+    /** Client credentials. */
+    private Object cred;
+
+    /** Reason why connection was closed. {@code null} means connection is still alive. */
+    protected volatile GridClientConnectionCloseReason closeReason;
+
+    /**
+     * Creates a facade.
+     *
+     * @param clientId Client identifier.
+     * @param srvAddr Server address this connection connected to.
+     * @param sslCtx SSL context to use if SSL is enabled, {@code null} otherwise.
+     * @param top Topology.
+     * @param cred Client credentials.
+     */
+    protected GridClientConnection(UUID clientId, InetSocketAddress srvAddr, SSLContext sslCtx, GridClientTopology top,
+        Object cred) {
+        assert top != null;
+
+        this.clientId = clientId;
+        this.srvAddr = srvAddr;
+        this.top = top;
+        this.sslCtx = sslCtx;
+        this.cred = cred;
+    }
+
+    /**
+     * Closes connection facade.
+     *
+     * @param reason Why this connection should be closed.
+     * @param waitCompletion If {@code true} this method will wait until all pending requests are handled.
+     */
+    abstract void close(GridClientConnectionCloseReason reason, boolean waitCompletion);
+
+    /**
+     * Closes connection facade if no requests are in progress.
+     *
+     * @param idleTimeout Idle timeout.
+     * @return {@code True} if no requests were in progress and client was closed, {@code false} otherwise.
+     */
+    abstract boolean closeIfIdle(long idleTimeout);
+
+    /**
+     * Gets server address this connection connected to.
+     *
+     * @return Server address this connection connected to.
+     */
+    public InetSocketAddress serverAddress() {
+        return srvAddr;
+    }
+
+    /**
+     * Encodes cache flags to bit map.
+     *
+     * @param flagSet Set of flags to be encoded.
+     * @return Bit map.
+     */
+    public static int encodeCacheFlags(Collection<GridClientCacheFlag> flagSet) {
+        int bits = 0;
+
+        if (flagSet.contains(GridClientCacheFlag.SKIP_STORE))
+            bits |= 1;
+
+        if (flagSet.contains(GridClientCacheFlag.SKIP_SWAP))
+            bits |= 1 << 1;
+
+        if (flagSet.contains(GridClientCacheFlag.SYNC_COMMIT))
+            bits |= 1 << 2;
+
+        if (flagSet.contains(GridClientCacheFlag.INVALIDATE))
+            bits |= 1 << 4;
+
+        return bits;
+    }
+
+    /**
+     * Puts key-value pair into cache.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Value.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return If value was actually put.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public <K, V> GridClientFutureAdapter<Boolean> cachePut(String cacheName, K key, V val,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException {
+        return cachePutAll(cacheName, Collections.singletonMap(key, val), flags, destNodeId);
+    }
+
+    /**
+     * Gets entry from the cache for specified key.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Value.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public <K, V> GridClientFutureAdapter<V> cacheGet(String cacheName, final K key, Set<GridClientCacheFlag> flags,
+        UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException {
+        final GridClientFutureAdapter<Map<K, V>> res = cacheGetAll(cacheName, Collections.singleton(key), flags,
+            destNodeId);
+
+        return res.chain(new GridClientFutureCallback<Map<K, V>, V>() {
+            @Override public V onComplete(GridClientFuture<Map<K, V>> fut) throws GridClientException {
+                Map<K, V> map = fut.get();
+
+                return F.firstValue(map);
+            }
+        });
+    }
+
+    /**
+     * Removes entry from the cache for specified key.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether entry was actually removed.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws org.apache.ignite.internal.client.GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K> GridClientFutureAdapter<Boolean> cacheRemove(String cacheName, K key,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Puts bundle of entries into cache.
+     *
+     * @param cacheName Cache name.
+     * @param entries Entries.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return {@code True} if map contained more then one entry or if put succeeded in case of one entry,
+     *      {@code false} otherwise
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Boolean> cachePutAll(String cacheName, Map<K, V> entries,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets bundle of entries for specified keys from the cache.
+     *
+     * @param cacheName Cache name.
+     * @param keys Keys.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Entries.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Map<K, V>> cacheGetAll(String cacheName, Collection<K> keys,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Removes bundle of entries for specified keys from the cache.
+     *
+     * @param cacheName Cache name.
+     * @param keys Keys.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether entries were actually removed
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K> GridClientFutureAdapter<Boolean> cacheRemoveAll(String cacheName, Collection<K> keys,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Replace key-value pair in cache if already exist.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Value.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether value was actually replaced.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Boolean> cacheReplace(String cacheName, K key, V val,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * <table>
+     *     <tr><th>New value</th><th>Actual/old value</th><th>Behaviour</th></tr>
+     *     <tr><td>null     </td><td>null   </td><td>Remove entry for key.</td></tr>
+     *     <tr><td>newVal   </td><td>null   </td><td>Put newVal into cache if such key doesn't exist.</td></tr>
+     *     <tr><td>null     </td><td>oldVal </td><td>Remove if actual value oldVal is equals to value in cache.</td></tr>
+     *     <tr><td>newVal   </td><td>oldVal </td><td>Replace if actual value oldVal is equals to value in cache.</td></tr>
+     * </table>
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param newVal Value 1.
+     * @param oldVal Value 2.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether new value was actually set.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Boolean> cacheCompareAndSet(String cacheName, K key, V newVal,
+        V oldVal, Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets cache metrics for the key.
+     *
+     * @param cacheName Cache name.
+     * @param destNodeId Destination node ID.
+     * @return Metrics.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K> GridClientFutureAdapter<GridClientDataMetrics> cacheMetrics(String cacheName, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Append requested value to already cached one.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Value to append to the cached one.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether new value was actually set.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Boolean> cacheAppend(String cacheName, K key, V val,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Prepend requested value to already cached one.
+     *
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param val Value to prepend to the cached one.
+     * @param flags Cache flags to be enabled.
+     * @param destNodeId Destination node ID.
+     * @return Whether new value was actually set.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <K, V> GridClientFutureAdapter<Boolean> cachePrepend(String cacheName, K key, V val,
+        Set<GridClientCacheFlag> flags, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Execute task in the grid.
+     *
+     * @param taskName Task name.
+     * @param arg Task argument.
+     * @param destNodeId Destination node ID.
+     * @param keepPortables Keep portables flag.
+     * @return Task execution result.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract <R> GridClientFutureAdapter<R> execute(String taskName, Object arg, UUID destNodeId,
+        boolean keepPortables) throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets node by node ID.
+     *
+     * @param id Node ID.
+     * @param inclAttrs Whether to include attributes.
+     * @param inclMetrics Whether to include metrics.
+     * @param destNodeId Destination node ID.
+     * @return Node.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<GridClientNode> node(UUID id, boolean inclAttrs, boolean inclMetrics,
+        UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets node by node IP.
+     *
+     * @param ipAddr IP address.
+     * @param inclAttrs Whether to include attributes.
+     * @param includeMetrics Whether to include metrics.
+     * @param destNodeId Destination node ID.
+     * @return Node.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<GridClientNode> node(String ipAddr, boolean inclAttrs,
+        boolean includeMetrics, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets grid topology nodes.
+     *
+     * @param inclAttrs Whether to include attributes.
+     * @param inclMetrics Whether to include metrics.
+     * @param destNodeId Destination node ID.
+     * @return Nodes.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<List<GridClientNode>> topology(boolean inclAttrs, boolean inclMetrics,
+        @Nullable UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Gets log entries.
+     *
+     * @param path Log file path.  If {@code null}, then default path will be used.
+     * @param fromLine Index of start line that should be retrieved.
+     * @param toLine Index of end line that should be retrieved.
+     * @param destNodeId Destination node ID.
+     * @return Log file contents.
+     * @throws GridClientConnectionResetException In case of error.
+     * @throws GridClientClosedException If client was manually closed before request was sent over network.
+     */
+    public abstract GridClientFuture<List<String>> log(@Nullable String path, int fromLine, int toLine, UUID destNodeId)
+        throws GridClientConnectionResetException, GridClientClosedException;
+
+    /**
+     * Forwards a message in raw form to the connected node. This method supposed to be used only inside router.
+     * The exact types of acceptable arguments and return values depend on connection implementation.
+     *
+     * @param body A raw message to send.
+     * @return Future holding server's response.
+     * @throws GridClientException If message forwarding failed.
+     */
+    public abstract GridClientFutureAdapter<?> forwardMessage(Object body) throws GridClientException;
+
+    /**
+     * @return {@code True} if connection is closed.
+     */
+    public boolean isClosed() {
+        return closeReason != null;
+    }
+
+    /**
+     * Gets SSLContext of this client connection.
+     *
+     * @return {@link SSLContext} instance.
+     */
+    protected SSLContext sslContext() {
+        return sslCtx;
+    }
+
+    /**
+     * Returns credentials for this client connection.
+     *
+     * @return Credentials.
+     */
+    protected Object credentials() {
+        return cred;
+    }
+
+    /**
+     * Safely gets long value by given key.
+     *
+     * @param map Map to get value from.
+     * @param key Metrics name.
+     * @return Value or -1, if not found.
+     */
+    protected long safeLong(Map<String, Number> map, String key) {
+        Number val = map.get(key);
+
+        if (val == null)
+            return -1;
+
+        return val.longValue();
+    }
+
+    /**
+     * Safely gets double value by given key.
+     *
+     * @param map Map to get value from.
+     * @param key Metrics name.
+     * @return Value or -1, if not found.
+     */
+    protected double safeDouble(Map<String, Number> map, String key) {
+        Number val = map.get(key);
+
+        if (val == null)
+            return -1;
+
+        return val.doubleValue();
+    }
+
+    /**
+     * Converts metrics map to metrics object.
+     *
+     * @param metricsMap Map to convert.
+     * @return Metrics object.
+     */
+    protected GridClientDataMetrics metricsMapToMetrics(Map<String, Number> metricsMap) {
+        GridClientDataMetricsAdapter metrics = new GridClientDataMetricsAdapter();
+
+        metrics.createTime(safeLong(metricsMap, "createTime"));
+        metrics.readTime(safeLong(metricsMap, "readTime"));
+        metrics.writeTime(safeLong(metricsMap, "writeTime"));
+        metrics.reads((int)safeLong(metricsMap, "reads"));
+        metrics.writes((int)safeLong(metricsMap, "writes"));
+        metrics.hits((int)safeLong(metricsMap, "hits"));
+        metrics.misses((int)safeLong(metricsMap, "misses"));
+
+        return metrics;
+    }
+
+    /**
+     * Check if this connection was closed and throws appropriate exception.
+     * This method should be used for synchronous connection state check.
+     *
+     * @param reason Close reason.
+     * @throws GridConnectionIdleClosedException If connection was closed as idle.
+     * @throws GridClientClosedException If client was closed by by external call.
+     * @throws GridClientConnectionResetException If connection was closed because of failure.
+     */
+    protected void checkClosed(GridClientConnectionCloseReason reason)
+        throws GridConnectionIdleClosedException, GridClientConnectionResetException, GridClientClosedException {
+        if (reason == GridClientConnectionCloseReason.CONN_IDLE)
+            throw new GridConnectionIdleClosedException("Connection was closed by idle thread (will " +
+                "reconnect): " + serverAddress());
+
+        if (reason == GridClientConnectionCloseReason.FAILED)
+            throw new GridClientConnectionResetException("Failed to perform request (connection failed before " +
+                "message is sent): " + serverAddress());
+
+        if (reason == GridClientConnectionCloseReason.CLIENT_CLOSED)
+            throw new GridClientClosedException("Failed to perform request (connection was closed before " +
+                "message is sent): " + serverAddress());
+    }
+
+    /**
+     * Build appropriate exception from the given close reason.
+     * This method should be used as a factory for exception to finish futures asynchronously.
+     *
+     * @param reason Close reason.
+     * @param cause Cause of connection close, or {@code null} in case of regular close.
+     * @return Exception.
+     */
+    protected GridClientException getCloseReasonAsException(GridClientConnectionCloseReason reason,
+        @Nullable Throwable cause) {
+        if (reason == GridClientConnectionCloseReason.CONN_IDLE)
+            return new GridConnectionIdleClosedException("Connection was closed by idle thread: " + serverAddress());
+
+        if (reason == GridClientConnectionCloseReason.FAILED)
+            return new GridClientConnectionResetException("Failed to perform request (connection failed): " +
+                serverAddress(), cause);
+
+        if (reason == GridClientConnectionCloseReason.CLIENT_CLOSED)
+            return new GridClientClosedException("Failed to perform request (client was closed): " + serverAddress());
+
+        return null;
+    }
+
+    /**
+     * @param reason Close reason.
+     * @param cause Cause of connection close, or {@code null} in case of regular close.
+     * @return Description of close reason for logging purpose.
+     */
+    protected String getCloseReasonMessage(GridClientConnectionCloseReason reason, @Nullable Throwable cause) {
+        if (reason == GridClientConnectionCloseReason.CONN_IDLE)
+            return "Connection was closed by idle thread";
+
+        if (reason == GridClientConnectionCloseReason.FAILED)
+            return cause != null ? "Connection failed, cause: " + cause.getMessage() : "Connection failed";
+
+        if (reason == GridClientConnectionCloseReason.CLIENT_CLOSED)
+            return "Client was closed";
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionCloseReason.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionCloseReason.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionCloseReason.java
new file mode 100644
index 0000000..94e5840
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionCloseReason.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+/**
+ * Set of reasons why connection closed.
+ */
+enum GridClientConnectionCloseReason {
+    /** Connection failed, IO exception or other unexpected result of request execution. */
+    FAILED,
+
+    /** Connection closed as idle. */
+    CONN_IDLE,
+
+    /** Client is closed and connection also shouldn't be used for new requests. */
+    CLIENT_CLOSED
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManager.java
new file mode 100644
index 0000000..fed7059
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+import org.apache.ignite.internal.client.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+
+/**
+ * Common interface for client connection managers.
+ */
+public interface GridClientConnectionManager {
+    /**
+     * Tries to open initial connection and fetch topology using given server addresses.
+     *
+     * @param srvs Collection<InetSocketAddress> server addresses.
+     * @throws GridClientAuthenticationException If connection failed to authenticate on server.
+     * @throws GridClientException If manager failed to initialise,
+     * @throws InterruptedException If manager was interrupted while waiting for connection.
+     */
+    public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException;
+
+    /**
+     * Returns connection to the given node.
+     *
+     * @param node Node to connect with.
+     * @return Connection to use for operations, targeted for the given node.
+     * @throws GridServerUnreachableException If connection can't be established.
+     * @throws InterruptedException If manager was interrupted while waiting for connection
+     * to be established.
+     * @throws org.apache.ignite.internal.client.GridClientClosedException If connection manager has been closed.
+     */
+    public GridClientConnection connection(GridClientNode node)
+        throws GridServerUnreachableException, GridClientClosedException, InterruptedException;
+
+    /**
+     * Callback method, which should be called by clients when they get connectivity errors.
+     * It's main purpose is to allow connection manager to terminate broken connection
+     * early and, try to establish a new one for the consequent
+     * {@link #connection(GridClientNode)} calls.
+     *
+     * @param conn Failed connection.
+     * @param node Connected node.
+     * @param e Error that caused connection termination.
+     */
+    public void terminateConnection(GridClientConnection conn, @Nullable GridClientNode node, Throwable e);
+
+    /**
+     * Stops this instance of connection manager and terminates all connections.
+     * @param waitCompletion If {@code true} this method awaits termination of all connections
+     *      (and receiving responses for all pending requests), otherwise it will return immediately.
+     */
+    public void stop(boolean waitCompletion);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
new file mode 100644
index 0000000..102765e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+import org.apache.ignite.*;
+import org.apache.ignite.client.*;
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.internal.client.impl.*;
+import org.apache.ignite.internal.client.util.*;
+import org.apache.ignite.logger.java.*;
+import org.apache.ignite.internal.processors.rest.client.message.*;
+import org.apache.ignite.plugin.security.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.nio.ssl.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import javax.net.ssl.*;
+import java.io.*;
+import java.net.*;
+import java.nio.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.logging.*;
+
+import static java.util.logging.Level.*;
+import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*;
+import static org.apache.ignite.internal.GridNodeAttributes.*;
+
+/**
+ * Cached connections manager.
+ */
+abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager {
+    /** Count of reconnect retries before init considered failed. */
+    private static final int INIT_RETRY_CNT = 3;
+
+    /** Initialization retry interval. */
+    private static final int INIT_RETRY_INTERVAL = 1000;
+
+    /** Class logger. */
+    private final Logger log;
+
+    /** NIO server. */
+    private GridNioServer srv;
+
+    /** Active connections. */
+    private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>();
+
+    /** Active connections of nodes. */
+    private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>();
+
+    /** SSL context. */
+    private final SSLContext sslCtx;
+
+    /** Client configuration. */
+    protected final GridClientConfiguration cfg;
+
+    /** Topology. */
+    private final GridClientTopology top;
+
+    /** Client id. */
+    private final UUID clientId;
+
+    /** Router endpoints to use instead of topology info. */
+    private final Collection<InetSocketAddress> routers;
+
+    /** Closed flag. */
+    private volatile boolean closed;
+
+    /** Shared executor service. */
+    private final ExecutorService executor;
+
+    /** Endpoint striped lock. */
+    private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16);
+
+    /** Service for ping requests, {@code null} if HTTP protocol is used. */
+    private final ScheduledExecutorService pingExecutor;
+
+    /** Marshaller ID. */
+    private final Byte marshId;
+
+    /** Message writer. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() {
+        @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
+            assert msg != null;
+            assert buf != null;
+
+            msg.messageWriter(this, nodeId);
+
+            return msg.writeTo(buf);
+        }
+
+        @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out,
+            ByteBuffer buf) throws IOException {
+            assert msg != null;
+            assert out != null;
+            assert buf != null;
+            assert buf.hasArray();
+
+            msg.messageWriter(this, nodeId);
+
+            boolean finished = false;
+            int cnt = 0;
+
+            while (!finished) {
+                finished = msg.writeTo(buf);
+
+                out.write(buf.array(), 0, buf.position());
+
+                cnt += buf.position();
+
+                buf.clear();
+            }
+
+            return cnt;
+        }
+    };
+
+    /**
+     * @param clientId Client ID.
+     * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
+     * @param cfg Client configuration.
+     * @param routers Routers or empty collection to use endpoints from topology info.
+     * @param top Topology.
+     * @param marshId Marshaller ID.
+     * @throws GridClientException In case of error.
+     */
+    @SuppressWarnings("unchecked")
+    protected GridClientConnectionManagerAdapter(UUID clientId,
+        SSLContext sslCtx,
+        GridClientConfiguration cfg,
+        Collection<InetSocketAddress> routers,
+        GridClientTopology top,
+        @Nullable Byte marshId)
+        throws GridClientException {
+        assert clientId != null : "clientId != null";
+        assert cfg != null : "cfg != null";
+        assert routers != null : "routers != null";
+        assert top != null : "top != null";
+
+        this.clientId = clientId;
+        this.sslCtx = sslCtx;
+        this.cfg = cfg;
+        this.routers = new ArrayList<>(routers);
+        this.top = top;
+
+        log = Logger.getLogger(getClass().getName());
+
+        executor = cfg.getExecutorService() != null ? cfg.getExecutorService() :
+            Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true));
+
+        pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool(
+            Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null;
+
+        this.marshId = marshId;
+
+        if (marshId == null && cfg.getMarshaller() == null)
+            throw new GridClientException("Failed to start client (marshaller is not configured).");
+
+        if (cfg.getProtocol() == GridClientProtocol.TCP) {
+            try {
+                IgniteLogger gridLog = new IgniteJavaLogger(false);
+
+                GridNioFilter[] filters;
+
+                GridNioMessageReader msgReader = new GridNioMessageReader() {
+                    @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg,
+                        ByteBuffer buf) {
+                        assert msg != null;
+                        assert buf != null;
+
+                        msg.messageReader(this, nodeId);
+
+                        return msg.readFrom(buf);
+                    }
+
+                    @Nullable @Override public GridTcpMessageFactory messageFactory() {
+                        return null;
+                    }
+                };
+
+                GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true);
+
+                if (sslCtx != null) {
+                    GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog);
+
+                    sslFilter.directMode(true);
+                    sslFilter.clientMode(true);
+
+                    filters = new GridNioFilter[]{codecFilter, sslFilter};
+                }
+                else
+                    filters = new GridNioFilter[]{codecFilter};
+
+                srv = GridNioServer.builder().address(U.getLocalHost())
+                    .port(-1)
+                    .listener(new NioListener(log))
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(Runtime.getRuntime().availableProcessors())
+                    .sendQueueLimit(1024)
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .tcpNoDelay(cfg.isTcpNoDelay())
+                    .directBuffer(true)
+                    .directMode(true)
+                    .socketReceiveBufferSize(0)
+                    .socketSendBufferSize(0)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .gridName("gridClient")
+                    .messageWriter(msgWriter)
+                    .daemon(cfg.isDaemon())
+                    .build();
+
+                srv.start();
+            }
+            catch (IOException | IgniteCheckedException e) {
+                throw new GridClientException("Failed to start connection server.", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("BusyWait")
+    @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException {
+        init0();
+
+        GridClientException firstEx = null;
+
+        for (int i = 0; i < INIT_RETRY_CNT; i++) {
+            Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs);
+
+            while (!srvsCp.isEmpty()) {
+                GridClientConnection conn = null;
+
+                try {
+                    conn = connect(null, srvsCp);
+
+                    conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get();
+
+                    return;
+                }
+                catch (GridServerUnreachableException e) {
+                    // No connection could be opened to any of initial addresses - exit to retry loop.
+                    assert conn == null :
+                        "GridClientConnectionResetException was thrown from GridClientConnection#topology";
+
+                    if (firstEx == null)
+                        firstEx = e;
+
+                    break;
+                }
+                catch (GridClientConnectionResetException e) {
+                    // Connection was established but topology update failed -
+                    // trying other initial addresses if any.
+                    assert conn != null : "GridClientConnectionResetException was thrown from connect()";
+
+                    if (firstEx == null)
+                        firstEx = e;
+
+                    if (!srvsCp.remove(conn.serverAddress()))
+                        // We have misbehaving collection or equals - just exit to avoid infinite loop.
+                        break;
+                }
+            }
+
+            Thread.sleep(INIT_RETRY_INTERVAL);
+        }
+
+        for (GridClientConnection c : conns.values()) {
+            conns.remove(c.serverAddress(), c);
+
+            c.close(FAILED, false);
+        }
+
+        throw firstEx;
+    }
+
+    /**
+     * Additional initialization.
+     *
+     * @throws GridClientException In case of error.
+     */
+    protected abstract void init0() throws GridClientException;
+
+    /**
+     * Gets active communication facade.
+     *
+     * @param node Remote node to which connection should be established.
+     * @throws GridServerUnreachableException If none of the servers can be reached after the exception.
+     * @throws GridClientClosedException If client was closed manually.
+     * @throws InterruptedException If connection was interrupted.
+     */
+    @Override public GridClientConnection connection(GridClientNode node)
+        throws GridClientClosedException, GridServerUnreachableException, InterruptedException {
+        assert node != null;
+
+        // Use router's connections if defined.
+        if (!routers.isEmpty())
+            return connection(null, routers);
+
+        GridClientConnection conn = nodeConns.get(node.nodeId());
+
+        if (conn != null) {
+            // Ignore closed connections.
+            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
+                closeIdle();
+            else
+                return conn;
+        }
+
+        // Use node's connection, if node is available over rest.
+        Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true);
+
+        List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size());
+
+        for (InetSocketAddress endpoint : endpoints)
+            if (!endpoint.isUnresolved())
+                resolvedEndpoints.add(endpoint);
+
+        if (resolvedEndpoints.isEmpty()) {
+            throw new GridServerUnreachableException("No available endpoints to connect " +
+                "(is rest enabled for this node?): " + node);
+        }
+
+        boolean sameHost = node.attributes().isEmpty() ||
+            F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
+
+        Collection<InetSocketAddress> srvs = new LinkedHashSet<>();
+
+        if (sameHost) {
+            Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true));
+
+            srvs.addAll(resolvedEndpoints);
+        }
+        else {
+            for (InetSocketAddress endpoint : resolvedEndpoints)
+                if (!endpoint.getAddress().isLoopbackAddress())
+                    srvs.add(endpoint);
+        }
+
+        return connection(node.nodeId(), srvs);
+    }
+
+    /**
+     * Returns connection to one of the given addresses.
+     *
+     * @param nodeId {@code UUID} of node for mapping with connection.
+     *      {@code null} if no need of mapping.
+     * @param srvs Collection of addresses to connect to.
+     * @return Connection to use for operations, targeted for the given node.
+     * @throws GridServerUnreachableException If connection can't be established.
+     * @throws GridClientClosedException If connections manager has been closed already.
+     * @throws InterruptedException If connection was interrupted.
+     */
+    public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
+        throws GridServerUnreachableException, GridClientClosedException, InterruptedException {
+        if (srvs == null || srvs.isEmpty())
+            throw new GridServerUnreachableException("Failed to establish connection to the grid" +
+                " (address list is empty).");
+
+        checkClosed();
+
+        // Search for existent connection.
+        for (InetSocketAddress endPoint : srvs) {
+            assert endPoint != null;
+
+            GridClientConnection conn = conns.get(endPoint);
+
+            if (conn == null)
+                continue;
+
+            // Ignore closed connections.
+            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
+                closeIdle();
+
+                continue;
+            }
+
+            if (nodeId != null)
+                nodeConns.put(nodeId, conn);
+
+            return conn;
+        }
+
+        return connect(nodeId, srvs);
+    }
+
+    /**
+     * Creates a connected facade and returns it. Called either from constructor or inside
+     * a write lock.
+     *
+     * @param nodeId {@code UUID} of node for mapping with connection.
+     *      {@code null} if no need of mapping.
+     * @param srvs List of server addresses that this method will try to connect to.
+     * @return Established connection.
+     * @throws GridServerUnreachableException If none of the servers can be reached.
+     * @throws InterruptedException If connection was interrupted.
+     */
+    protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
+        throws GridServerUnreachableException, InterruptedException {
+        if (srvs.isEmpty())
+            throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " +
+                "list is empty).");
+
+        Exception cause = null;
+
+        for (InetSocketAddress srv : srvs) {
+            try {
+                return connect(nodeId, srv);
+            }
+            catch (InterruptedException e) {
+                throw e;
+            }
+            catch (Exception e) {
+                if (cause == null)
+                    cause = e;
+                else if (log.isLoggable(INFO))
+                    log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']');
+            }
+        }
+
+        assert cause != null;
+
+        throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause);
+    }
+
+    /**
+     * Create new connection to specified server.
+     *
+     * @param nodeId {@code UUID} of node for mapping with connection.
+     *      {@code null} if no need of mapping.
+     * @param addr Remote socket to connect.
+     * @return Established connection.
+     * @throws IOException If connection failed.
+     * @throws GridClientException If protocol error happened.
+     * @throws InterruptedException If thread was interrupted before connection was established.
+     */
+    protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr)
+        throws IOException, GridClientException, InterruptedException {
+        endpointStripedLock.lock(addr);
+
+        try {
+            GridClientConnection old = conns.get(addr);
+
+            if (old != null) {
+                if (old.isClosed()) {
+                    conns.remove(addr, old);
+
+                    if (nodeId != null)
+                        nodeConns.remove(nodeId, old);
+                }
+                else {
+                    if (nodeId != null)
+                        nodeConns.put(nodeId, old);
+
+                    return old;
+                }
+            }
+
+            GridSecurityCredentials cred = null;
+
+            try {
+                if (cfg.getSecurityCredentialsProvider() != null)
+                    cred = cfg.getSecurityCredentialsProvider().credentials();
+            }
+            catch (IgniteCheckedException e) {
+                throw new GridClientException("Failed to obtain client credentials.", e);
+            }
+
+            GridClientConnection conn;
+
+            if (cfg.getProtocol() == GridClientProtocol.TCP) {
+                conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
+                    cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
+                    cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal());
+            }
+            else
+                throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " +
+                    cfg.getProtocol());
+
+            old = conns.putIfAbsent(addr, conn);
+
+            assert old == null;
+
+            if (nodeId != null)
+                nodeConns.put(nodeId, conn);
+
+            return conn;
+        }
+        finally {
+            endpointStripedLock.unlock(addr);
+        }
+    }
+
+    /**
+     * @return Get thread local used to enable keep portables mode.
+     */
+    protected ThreadLocal<Boolean> keepPortablesThreadLocal() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) {
+        if (log.isLoggable(Level.FINE))
+            log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" +
+                conn.serverAddress() + ", errMsg=" + e.getMessage() + ']');
+
+        closeIdle();
+
+        conn.close(FAILED, false);
+    }
+
+    /**
+     * Closes all opened connections.
+     *
+     * @param waitCompletion If {@code true} waits for all pending requests to be proceeded.
+     */
+    @SuppressWarnings("TooBroadScope")
+    @Override public void stop(boolean waitCompletion) {
+        Collection<GridClientConnection> closeConns;
+
+        if (closed)
+            return;
+
+        // Mark manager as closed.
+        closed = true;
+
+        // Remove all connections from cache.
+        closeConns = new ArrayList<>(conns.values());
+
+        conns.clear();
+
+        nodeConns.clear();
+
+        // Close old connection outside the writer lock.
+        for (GridClientConnection conn : closeConns)
+            conn.close(CLIENT_CLOSED, waitCompletion);
+
+        if (pingExecutor != null)
+            GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log);
+
+        GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log);
+
+        if (srv != null)
+            srv.stop();
+    }
+
+    /**
+     * Close all connections idling for more then
+     * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private void closeIdle() {
+        for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<UUID, GridClientConnection> entry = it.next();
+
+            GridClientConnection conn = entry.getValue();
+
+            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
+                conns.remove(conn.serverAddress(), conn);
+
+                nodeConns.remove(entry.getKey(), conn);
+            }
+        }
+
+        for (GridClientConnection conn : conns.values())
+            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
+                conns.remove(conn.serverAddress(), conn);
+    }
+
+    /**
+     * Checks and throws an exception if this client was closed.
+     *
+     * @throws GridClientClosedException If client was closed.
+     */
+    private void checkClosed() throws GridClientClosedException {
+        if (closed)
+            throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore).");
+    }
+
+    /**
+     */
+    private static class NioListener implements GridNioServerListener {
+        /** */
+        private final Logger log;
+
+        /**
+         * @param log Logger.
+         */
+        private NioListener(Logger log) {
+            this.log = log;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnected(GridNioSession ses) {
+            if (log.isLoggable(Level.FINE))
+                log.fine("Session connected: " + ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+            if (log.isLoggable(Level.FINE))
+                log.fine("Session disconnected: " + ses);
+
+            GridClientFutureAdapter<Boolean> handshakeFut =
+                ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+            if (handshakeFut != null)
+                handshakeFut.onDone(
+                    new GridClientConnectionResetException("Failed to perform handshake (connection failed)."));
+            else {
+                GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
+
+                if (conn != null)
+                    conn.close(FAILED, false);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(GridNioSession ses, Object msg) {
+            GridClientFutureAdapter<Boolean> handshakeFut =
+                ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+            if (handshakeFut != null) {
+                assert msg instanceof GridClientHandshakeResponse;
+
+                handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg);
+            }
+            else {
+                GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
+
+                assert conn != null;
+
+                if (msg instanceof GridClientMessageWrapper) {
+                    GridClientMessageWrapper req = (GridClientMessageWrapper)msg;
+
+                    if (req.messageSize() != 0) {
+                        assert req.message() != null;
+
+                        conn.handleResponse(req);
+                    }
+                    else
+                        conn.handlePingResponse();
+                }
+                else {
+                    assert msg instanceof GridClientPingPacket : msg;
+
+                    conn.handlePingResponse();
+                }
+            }
+        }
+
+        /**
+         * Handles client handshake response.
+         *
+         * @param handshakeFut Future.
+         * @param msg A handshake response.
+         */
+        private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut,
+            GridClientHandshakeResponse msg) {
+            byte rc = msg.resultCode();
+
+            if (rc != GridClientHandshakeResponse.OK.resultCode()) {
+                handshakeFut.onDone(new GridClientHandshakeException(rc,
+                    "Handshake failed due to internal error (see server log for more details)."));
+            }
+            else
+                handshakeFut.onDone(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionWriteTimeout(GridNioSession ses) {
+            if (log.isLoggable(Level.FINE))
+                log.fine("Closing NIO session because of write timeout.");
+
+            ses.close();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionIdleTimeout(GridNioSession ses) {
+            if (log.isLoggable(Level.FINE))
+                log.fine("Closing NIO session because of idle timeout.");
+
+            ses.close();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class NioParser implements GridNioParser {
+        /** Message metadata key. */
+        private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+        /** Message reader. */
+        private final GridNioMessageReader msgReader;
+
+        /**
+         * @param msgReader Message reader.
+         */
+        NioParser(GridNioMessageReader msgReader) {
+            this.msgReader = msgReader;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+            GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
+
+            if (handshakeFut != null) {
+                byte code = buf.get();
+
+                return new GridClientHandshakeResponse(code);
+            }
+
+            GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY);
+
+            if (msg == null && buf.hasRemaining()) {
+                byte type = buf.get();
+
+                if (type == GridClientMessageWrapper.REQ_HEADER)
+                    msg = new GridClientMessageWrapper();
+                else
+                    throw new IOException("Invalid message type: " + type);
+            }
+
+            boolean finished = false;
+
+            if (buf.hasRemaining())
+                finished = msgReader.read(null, msg, buf);
+
+            if (finished)
+                return msg;
+            else {
+                ses.addMeta(MSG_META_KEY, msg);
+
+                return null;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+            // No encoding needed for direct messages.
+            throw new UnsupportedEncodingException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
new file mode 100644
index 0000000..372b9e8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+import org.apache.ignite.client.*;
+import org.apache.ignite.internal.client.*;
+
+import javax.net.ssl.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Open source version of connection manager.
+ */
+public class GridClientConnectionManagerOsImpl extends GridClientConnectionManagerAdapter {
+    /**
+     * @param clientId Client ID.
+     * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
+     * @param cfg Client configuration.
+     * @param routers Routers or empty collection to use endpoints from topology info.
+     * @param top Topology.
+     * @throws GridClientException In case of error.
+     */
+    public GridClientConnectionManagerOsImpl(UUID clientId, SSLContext sslCtx, GridClientConfiguration cfg,
+        Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId) throws GridClientException {
+        super(clientId, sslCtx, cfg, routers, top, marshId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void init0() throws GridClientException {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5b2f309/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionResetException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionResetException.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionResetException.java
new file mode 100644
index 0000000..ed51547
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionResetException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.impl.connection;
+
+import org.apache.ignite.internal.client.*;
+
+/**
+ * This exception is thrown when ongoing packet should be sent, but network connection is broken.
+ * In this case client will try to reconnect to any of the servers specified in configuration.
+ */
+public class GridClientConnectionResetException extends GridClientException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates an exception with given message.
+     *
+     * @param msg Error message.
+     */
+    GridClientConnectionResetException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates an exception with given message and error cause.
+     *
+     * @param msg Error message.
+     * @param cause Wrapped exception.
+     */
+    GridClientConnectionResetException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}


Mime
View raw message