ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [33/50] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' into ignite-128
Date Wed, 11 Feb 2015 00:25:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java
index 82112da,0000000..ac2fe86
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientNode.java
@@@ -1,152 -1,0 +1,128 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client;
 +
 +import org.jetbrains.annotations.*;
 +
 +import java.net.*;
 +import java.util.*;
 +
 +/**
 + * Descriptor of remote grid node. Use {@link GridClientCompute#nodes()} to
 + * get a full view over remote grid nodes.
 + */
 +public interface GridClientNode {
 +    /**
 +     * Gets ID of a remote node.
 +     *
 +     * @return Node ID.
 +     */
 +    public UUID nodeId();
 +
 +    /**
 +     * Gets consistent globally unique node ID. Unlike {@link #nodeId()} method,
 +     * this method returns consistent node ID which survives node restarts.
 +     *
 +     * @return Consistent globally unique node ID.
 +     */
 +    public Object consistentId();
 +
 +    /**
 +     * Gets list of REST TCP server addresses of remote node.
 +     *
 +     * @return REST TCP server addresses.
 +     */
 +    public List<String> tcpAddresses();
 +
 +    /**
 +     * Gets list of REST TCP server host names of remote node.
 +     *
 +     * @return REST TCP server host names.
 +     */
 +    public List<String> tcpHostNames();
 +
 +    /**
-      * Gets list of REST HTTP server addresses of remote node.
-      *
-      * @return REST HTTP server addresses.
-      */
-     @Deprecated
-     public List<String> jettyAddresses();
- 
-     /**
-      * Gets list of REST HTTP server host names of remote node.
-      *
-      * @return REST HTTP server host names.
-      */
-     @Deprecated
-     public List<String> jettyHostNames();
- 
-     /**
 +     * Gets client TCP port of remote node.
 +     *
 +     * @return Remote tcp port.
 +     */
 +    public int tcpPort();
 +
 +    /**
-      * Gets client HTTP port of remote node.
-      *
-      * @return Remote http port.
-      */
-     @Deprecated
-     public int httpPort();
- 
-     /**
 +     * Gets all attributes of remote node. Note that all system and
 +     * environment properties are automatically includes in node
 +     * attributes. User can also attach custom attributes and then
 +     * use them to further filter remote nodes into virtual subgrids
 +     * for task execution.
 +     *
 +     * @return All node attributes.
 +     */
 +    public Map<String, Object> attributes();
 +
 +    /**
 +     * Gets specific attribute of remote node.
 +     *
 +     * @param name Attribute name.
 +     * @return Attribute value.
 +     * @see #attributes()
 +     */
 +    @Nullable public <T> T attribute(String name);
 +
 +    /**
 +     * Gets various dynamic metrics of remote node.
 +     *
 +     * @return Metrics of remote node.
 +     */
 +    public GridClientNodeMetrics metrics();
 +
 +    /**
 +     * Gets all configured caches and their types on remote node.
 +     *
 +     * @return Map in which key is a configured cache name on the node,
 +     *      value is mode of configured cache.
 +     */
 +    public Map<String, GridClientCacheMode> caches();
 +
 +    /**
 +     * Gets node replica count for consistent hash ring (valid only for
 +     * {@code PARTITIONED} caches).
 +     *
 +     * @return Node replica count for consistent hash ring.
 +     */
 +    public int replicaCount();
 +
 +    /**
 +     * Gets collection of addresses on which REST binary protocol is bound.
 +     *
 +     * @param proto Protocol for which addresses are obtained.
 +     * @param filterResolved Whether to filter resolved addresses ( {@link InetSocketAddress#isUnresolved()}
 +     * returns {@code False} ) or not.
 +     * @return List of addresses.
 +     */
 +    public Collection<InetSocketAddress> availableAddresses(GridClientProtocol proto, boolean filterResolved);
 +
 +    /**
 +     * Indicates whether client can establish direct connection with this node.
 +     * So it is guaranteed that that any request will take only one network
 +     * 'hop' before it will be processed by a Grid node.
 +     *
 +     * @return {@code true} if node can be directly connected,
 +     *  {@code false} if request may be passed through a router.
 +     */
 +    public boolean connectable();
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java
index cab20d8,0000000..f64d14c
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/GridClientProtocol.java
@@@ -1,30 -1,0 +1,26 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client;
 +
 +/**
 + * Protocol that will be used when client connections are created.
 + */
 +public enum GridClientProtocol {
-     /** Communication via HTTP protocol. */
-     @Deprecated
-     HTTP,
- 
 +    /** Communication via tcp binary protocol. */
 +    TCP
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java
index e86aa42,0000000..e6e041e
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientComputeImpl.java
@@@ -1,263 -1,0 +1,263 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl;
 +
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.balancer.*;
 +import org.apache.ignite.internal.client.impl.connection.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.util.*;
 +
 +import static org.apache.ignite.internal.client.util.GridClientUtils.*;
- import static org.apache.ignite.internal.GridNodeAttributes.*;
++import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 +
 +/**
 + * Compute projection implementation.
 + */
 +class GridClientComputeImpl extends GridClientAbstractProjection<GridClientComputeImpl> implements GridClientCompute {
 +    /** */
 +    private static final ThreadLocal<Boolean> KEEP_PORTABLES = new ThreadLocal<Boolean>() {
 +        @Override protected Boolean initialValue() {
 +            return false;
 +        }
 +    };
 +
 +    /** */
 +    private static final GridClientPredicate<GridClientNode> DAEMON = new GridClientPredicate<GridClientNode>() {
 +        @Override public boolean apply(GridClientNode e) {
 +            return "true".equals(e.<String>attribute(ATTR_DAEMON));
 +        }
 +    };
 +
 +    /** */
 +    private static final GridClientPredicate<GridClientNode> NOT_DAEMON = new GridClientPredicate<GridClientNode>() {
 +        @Override public boolean apply(GridClientNode e) {
 +            return !"true".equals(e.<String>attribute(ATTR_DAEMON));
 +        }
 +    };
 +
 +    /** Projection factory. */
 +    @SuppressWarnings("TypeMayBeWeakened")
 +    private final GridClientComputeFactory prjFactory = new GridClientComputeFactory();
 +
 +    /**
 +     * Creates a new compute projection.
 +     *
 +     * @param client Started client.
 +     * @param nodes Nodes to be included in this projection. If {@code null},
 +     *      then nodes from the current topology snapshot will be used.
 +     * @param nodeFilter Node filter to be used for this projection. If {@code null},
 +     *      then no filter would be applied to the node list.
 +     * @param balancer Balancer to be used in this projection. If {@code null},
 +     *      then no balancer will be used.
 +     */
 +    GridClientComputeImpl(GridClientImpl client, Collection<GridClientNode> nodes,
 +        GridClientPredicate<? super GridClientNode> nodeFilter, GridClientLoadBalancer balancer) {
 +        super(client, nodes, nodeFilter, balancer);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute projection(GridClientNode node) throws GridClientException {
 +        A.notNull(node, "node");
 +
 +        return createProjection(Collections.singletonList(node), null, null, prjFactory);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute projection(GridClientPredicate<? super GridClientNode> filter)
 +        throws GridClientException {
 +        return createProjection(null, filter, null, prjFactory);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute projection(Collection<GridClientNode> nodes) throws GridClientException {
 +        return createProjection(nodes, null, null, prjFactory);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute projection(GridClientPredicate<? super GridClientNode> filter,
 +        GridClientLoadBalancer balancer) throws GridClientException {
 +        return createProjection(null, filter, balancer, prjFactory);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute projection(Collection<GridClientNode> nodes, GridClientLoadBalancer balancer)
 +        throws GridClientException {
 +        return createProjection(nodes, null, balancer, prjFactory);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientLoadBalancer balancer() {
 +        return balancer;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <R> R execute(String taskName, Object taskArg) throws GridClientException {
 +        return this.<R>executeAsync(taskName, taskArg).get();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <R> GridClientFuture<R> executeAsync(final String taskName, final Object taskArg) {
 +        A.notNull(taskName, "taskName");
 +
 +        final boolean keepPortables = KEEP_PORTABLES.get();
 +
 +        KEEP_PORTABLES.set(false);
 +
 +        return withReconnectHandling(new ClientProjectionClosure<R>() {
 +            @Override public GridClientFuture<R> apply(GridClientConnection conn, UUID destNodeId)
 +                throws GridClientConnectionResetException, GridClientClosedException {
 +                return conn.execute(taskName, taskArg, destNodeId, keepPortables);
 +            }
 +        });
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <R> R affinityExecute(String taskName, String cacheName, Object affKey, Object taskArg)
 +        throws GridClientException {
 +        return this.<R>affinityExecuteAsync(taskName, cacheName, affKey, taskArg).get();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <R> GridClientFuture<R> affinityExecuteAsync(final String taskName, String cacheName,
 +        Object affKey, final Object taskArg) {
 +        A.notNull(taskName, "taskName");
 +
 +        final boolean keepPortables = KEEP_PORTABLES.get();
 +
 +        KEEP_PORTABLES.set(false);
 +
 +        return withReconnectHandling(new ClientProjectionClosure<R>() {
 +            @Override public GridClientFuture<R> apply(GridClientConnection conn, UUID destNodeId)
 +                throws GridClientConnectionResetException, GridClientClosedException {
 +                return conn.execute(taskName, taskArg, destNodeId, keepPortables);
 +            }
 +        }, cacheName, affKey);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientNode node(UUID id) throws GridClientException {
 +        A.notNull(id, "id");
 +
 +        return client.topology().node(id);
 +    }
 +
 +    /**
 +     * Gets most recently refreshed topology. If this compute instance is a projection,
 +     * then only nodes that satisfy projection criteria will be returned.
 +     *
 +     * @return Most recently refreshed topology.
 +     */
 +    @Override public Collection<GridClientNode> nodes() throws GridClientException {
 +        return applyFilter(projectionNodes(), NOT_DAEMON);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<GridClientNode> nodes(Collection<UUID> ids) throws GridClientException  {
 +        A.notNull(ids, "ids");
 +
 +        return client.topology().nodes(ids);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<GridClientNode> nodes(GridClientPredicate<GridClientNode> filter)
 +        throws GridClientException {
 +        A.notNull(filter, "filter");
 +
 +        return applyFilter(projectionNodes(), new GridClientAndPredicate<>(filter, NOT_DAEMON));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<GridClientNode> daemonNodes() throws GridClientException {
 +        return applyFilter(projectionNodes(), DAEMON);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientNode refreshNode(UUID id, boolean includeAttrs, boolean includeMetrics)
 +        throws GridClientException {
 +        return refreshNodeAsync(id, includeAttrs, includeMetrics).get();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientFuture<GridClientNode> refreshNodeAsync(final UUID id, final boolean includeAttrs,
 +        final boolean includeMetrics) {
 +        A.notNull(id, "id");
 +
 +        return withReconnectHandling(new ClientProjectionClosure<GridClientNode>() {
 +            @Override public GridClientFuture<GridClientNode> apply(GridClientConnection conn, UUID destNodeId)
 +                throws GridClientConnectionResetException, GridClientClosedException {
 +                return conn.node(id, includeAttrs, includeMetrics, destNodeId);
 +            }
 +        });
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Nullable @Override public GridClientNode refreshNode(String ip, boolean includeAttrs, boolean inclMetrics)
 +        throws GridClientException {
 +        return refreshNodeAsync(ip, includeAttrs, inclMetrics).get();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientFuture<GridClientNode> refreshNodeAsync(final String ip, final boolean inclAttrs,
 +        final boolean includeMetrics) {
 +        A.notNull(ip, "ip");
 +
 +        return withReconnectHandling(new ClientProjectionClosure<GridClientNode>() {
 +            @Override public GridClientFuture<GridClientNode> apply(GridClientConnection conn, UUID destNodeId)
 +                throws GridClientConnectionResetException, GridClientClosedException {
 +                return conn.node(ip, inclAttrs, includeMetrics, destNodeId);
 +            }
 +        });
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public List<GridClientNode> refreshTopology(boolean includeAttrs, boolean includeMetrics)
 +        throws GridClientException {
 +        return refreshTopologyAsync(includeAttrs, includeMetrics).get();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientFuture<List<GridClientNode>> refreshTopologyAsync(final boolean inclAttrs,
 +        final boolean includeMetrics) {
 +        return withReconnectHandling(new ClientProjectionClosure<List<GridClientNode>>() {
 +            @Override public GridClientFuture<List<GridClientNode>> apply(GridClientConnection conn, UUID destNodeId)
 +                throws GridClientConnectionResetException,
 +                GridClientClosedException {
 +                return conn.topology(inclAttrs, includeMetrics, destNodeId);
 +            }
 +        });
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute withKeepPortables() {
 +        KEEP_PORTABLES.set(true);
 +
 +        return this;
 +    }
 +
 +    /** {@inheritDoc} */
 +    private class GridClientComputeFactory implements ProjectionFactory<GridClientComputeImpl> {
 +        /** {@inheritDoc} */
 +        @Override public GridClientComputeImpl create(Collection<GridClientNode> nodes,
 +            GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer) {
 +            return new GridClientComputeImpl(client, nodes, filter, balancer);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
index fb843dc,0000000..9b9f22a
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientImpl.java
@@@ -1,524 -1,0 +1,527 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.balancer.*;
 +import org.apache.ignite.internal.client.impl.connection.*;
 +import org.apache.ignite.internal.client.ssl.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.net.ssl.*;
 +import java.lang.reflect.*;
 +import java.net.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +import java.util.logging.*;
 +
- import static org.apache.ignite.internal.GridNodeAttributes.*;
++import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 +
 +/**
 + * Client implementation.
 + */
 +public class GridClientImpl implements GridClient {
 +    /** Enterprise connection manager class name. */
 +    private static final String ENT_CONN_MGR_CLS =
 +        "org.apache.ignite.internal.client.impl.connection.GridClientConnectionManagerEntImpl";
 +
 +    /** Null mask object. */
 +    private static final Object NULL_MASK = new Object();
 +
 +    /** Logger. */
 +    private static final Logger log = Logger.getLogger(GridClientImpl.class.getName());
 +
 +    /** */
 +    static {
 +        boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
 +
 +        try {
 +            if (isLog4jUsed)
 +                U.addLog4jNoOpLogger();
 +            else
 +                U.addJavaNoOpLogger();
 +        }
 +        catch (IgniteCheckedException ignored) {
 +            // Our log4j warning suppression failed, leave it as is.
 +        }
 +    }
 +
 +    /** Client ID. */
 +    private final UUID id;
 +
 +    /** Client configuration. */
 +    protected final GridClientConfiguration cfg;
 +
 +    /** SSL context if ssl enabled. */
 +    private SSLContext sslCtx;
 +
 +    /** Main compute projection. */
 +    private final GridClientComputeImpl compute;
 +
 +    /** Data projections. */
 +    private ConcurrentMap<Object, GridClientDataImpl> dataMap = new ConcurrentHashMap<>();
 +
 +    /** Topology. */
 +    protected GridClientTopology top;
 +
 +    /** Topology updater thread. */
 +    private final Thread topUpdateThread;
 +
 +    /** Closed flag. */
 +    private AtomicBoolean closed = new AtomicBoolean();
 +
 +    /** Connection manager. */
 +    protected GridClientConnectionManager connMgr;
 +
 +    /** Routers. */
 +    private final Collection<InetSocketAddress> routers;
 +
 +    /** Servers. */
 +    private final Collection<InetSocketAddress> srvs;
 +
 +    /**
 +     * Creates a new client based on a given configuration.
 +     *
 +     * @param id Client identifier.
 +     * @param cfg0 Client configuration.
-      * @throws org.apache.ignite.internal.client.GridClientException If client configuration is incorrect.
++     * @param routerClient Router client flag.
++     * @throws GridClientException If client configuration is incorrect.
 +     * @throws GridServerUnreachableException If none of the servers specified in configuration can
 +     *      be reached.
 +     */
 +    @SuppressWarnings("CallToThreadStartDuringObjectConstruction")
-     public GridClientImpl(UUID id, GridClientConfiguration cfg0) throws GridClientException {
++    public GridClientImpl(UUID id, GridClientConfiguration cfg0, boolean routerClient) throws GridClientException {
 +        this.id = id;
 +
 +        cfg = new GridClientConfiguration(cfg0);
 +
 +        boolean success = false;
 +
 +        try {
 +            top = new GridClientTopology(cfg);
 +
 +            for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) {
 +                GridClientDataAffinity aff = dataCfg.getAffinity();
 +
 +                if (aff instanceof GridClientTopologyListener)
 +                    addTopologyListener((GridClientTopologyListener)aff);
 +            }
 +
 +            if (cfg.getBalancer() instanceof GridClientTopologyListener)
 +                top.addTopologyListener((GridClientTopologyListener)cfg.getBalancer());
 +
 +            GridSslContextFactory factory = cfg.getSslContextFactory();
 +
 +            if (factory != null) {
 +                try {
 +                    sslCtx = factory.createSslContext();
 +                }
 +                catch (SSLException e) {
 +                    throw new GridClientException("Failed to create client (unable to create SSL context, " +
 +                        "check ssl context factory configuration): " + e.getMessage(), e);
 +                }
 +            }
 +
 +            if (cfg.isAutoFetchMetrics() && !cfg.isEnableMetricsCache())
 +                log.warning("Auto-fetch for metrics is enabled without enabling caching for them.");
 +
 +            if (cfg.isAutoFetchAttributes() && !cfg.isEnableAttributesCache())
 +                log.warning(
 +                    "Auto-fetch for node attributes is enabled without enabling caching for them.");
 +
 +            srvs = parseAddresses(cfg.getServers());
 +            routers = parseAddresses(cfg.getRouters());
 +
 +            if (srvs.isEmpty() && routers.isEmpty())
 +                throw new GridClientException("Servers addresses and routers addresses cannot both be empty " +
 +                    "for client (please fix configuration and restart): " + this);
 +
 +            if (!srvs.isEmpty() && !routers.isEmpty())
 +                throw new GridClientException("Servers addresses and routers addresses cannot both be provided " +
 +                    "for client (please fix configuration and restart): " + this);
 +
-             connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null);
++            connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null, routerClient);
 +
 +            try {
 +                // Init connection manager, it should cause topology update.
 +                tryInitTopology();
 +            }
 +            catch (GridClientException e) {
 +                top.fail(e);
 +
 +                log.warning("Failed to initialize topology on client start. Will retry in background.");
 +            }
 +            catch (InterruptedException e) {
 +                Thread.currentThread().interrupt();
 +
 +                throw new GridClientException("Client startup was interrupted.", e);
 +            }
 +
 +            topUpdateThread = new TopologyUpdaterThread();
 +
 +            topUpdateThread.setDaemon(true);
 +
 +            topUpdateThread.start();
 +
 +            compute = new GridClientComputeImpl(this, null, null, cfg.getBalancer());
 +
 +            if (log.isLoggable(Level.INFO))
 +                log.info("Client started [id=" + id + ", protocol=" + cfg.getProtocol() + ']');
 +
 +            success = true;
 +        }
 +        finally {
 +            if (!success)
 +                stop(false);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public UUID id() {
 +        return id;
 +    }
 +
 +    /**
 +     * Closes client.
 +     * @param waitCompletion If {@code true} will wait for all pending requests to be proceeded.
 +     */
 +    public void stop(boolean waitCompletion) {
 +        if (closed.compareAndSet(false, true)) {
 +            // Shutdown the topology refresh thread.
 +            if (topUpdateThread != null)
 +                topUpdateThread.interrupt();
 +
 +            // Shutdown listener notification.
 +            if (top != null)
 +                top.shutdown();
 +
 +            if (connMgr != null)
 +                connMgr.stop(waitCompletion);
 +
 +            for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) {
 +                GridClientDataAffinity aff = dataCfg.getAffinity();
 +
 +                if (aff instanceof GridClientTopologyListener)
 +                    removeTopologyListener((GridClientTopologyListener)aff);
 +            }
 +
 +            if (log.isLoggable(Level.INFO))
 +                log.info("Client stopped [id=" + id + ", waitCompletion=" + waitCompletion + ']');
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientData data() throws GridClientException {
 +        return data(null);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientData data(@Nullable final String cacheName) throws GridClientException {
 +        checkClosed();
 +
 +        Object key = maskNull(cacheName);
 +
 +        GridClientDataImpl data = dataMap.get(key);
 +
 +        if (data == null) {
 +            GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName);
 +
 +            if (dataCfg == null && cacheName != null)
 +                throw new GridClientException("Data configuration for given cache name was not provided: " +
 +                    cacheName);
 +
 +            GridClientLoadBalancer balancer = dataCfg != null ? dataCfg.getPinnedBalancer() :
 +                new GridClientRandomBalancer();
 +
 +            GridClientPredicate<GridClientNode> cacheNodes = new GridClientPredicate<GridClientNode>() {
 +                @Override public boolean apply(GridClientNode e) {
 +                    return e.caches().containsKey(cacheName);
 +                }
 +
 +                @Override public String toString() {
 +                    return "GridClientHasCacheFilter [cacheName=" + cacheName + "]";
 +                }
 +            };
 +
 +            data = new GridClientDataImpl(
 +                cacheName, this, null, cacheNodes, balancer, null, cfg.isEnableMetricsCache());
 +
 +            GridClientDataImpl old = dataMap.putIfAbsent(key, data);
 +
 +            if (old != null)
 +                data = old;
 +        }
 +
 +        return data;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute compute() {
 +        return compute;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void addTopologyListener(GridClientTopologyListener lsnr) {
 +        top.addTopologyListener(lsnr);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void removeTopologyListener(GridClientTopologyListener lsnr) {
 +        top.removeTopologyListener(lsnr);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<GridClientTopologyListener> topologyListeners() {
 +        return top.topologyListeners();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean connected() {
 +        return !top.failed();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        GridClientFactory.stop(id);
 +    }
 +
 +    /**
 +     * Gets topology instance.
 +     *
 +     * @return Topology instance.
 +     */
 +    public GridClientTopology topology() {
 +        return top;
 +    }
 +
 +    /**
 +     * @return Connection manager.
 +     */
 +    public GridClientConnectionManager connectionManager() {
 +        return connMgr;
 +    }
 +
 +    /**
 +     * Gets data affinity for a given cache name.
 +     *
 +     * @param cacheName Name of cache for which affinity is obtained. Data configuration with this name
 +     *      must be configured at client startup.
 +     * @return Data affinity object.
 +     * @throws IllegalArgumentException If client data with given name was not configured.
 +     */
 +    GridClientDataAffinity affinity(String cacheName) {
 +        GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName);
 +
 +        return dataCfg == null ? null : dataCfg.getAffinity();
 +    }
 +
 +    /**
 +     * Checks and throws an exception if this client was closed.
 +     *
 +     * @throws GridClientClosedException If client was closed.
 +     */
 +    private void checkClosed() throws GridClientClosedException {
 +        if (closed.get())
 +            throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore).");
 +    }
 +
 +    /**
 +     * Masks null cache name with unique object.
 +     *
 +     * @param cacheName Name to be masked.
 +     * @return Original name or some unique object if name is null.
 +     */
 +    private Object maskNull(String cacheName) {
 +        return cacheName == null ? NULL_MASK : cacheName;
 +    }
 +
 +    /**
 +     * Maps Collection of strings to collection of {@code InetSocketAddress}es.
 +     *
 +     * @param cfgAddrs Collection fo string representations of addresses.
 +     * @return Collection of {@code InetSocketAddress}es
 +     * @throws GridClientException In case of error.
 +     */
 +    private static Collection<InetSocketAddress> parseAddresses(Collection<String> cfgAddrs)
 +        throws GridClientException {
 +        Collection<InetSocketAddress> addrs = new ArrayList<>(cfgAddrs.size());
 +
 +        for (String srvStr : cfgAddrs) {
 +            try {
 +                String[] split = srvStr.split(":");
 +
 +                InetSocketAddress addr = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
 +
 +                addrs.add(addr);
 +            }
 +            catch (RuntimeException e) {
 +                throw new GridClientException("Failed to create client (invalid server address specified): " +
 +                    srvStr, e);
 +            }
 +        }
 +
 +        return Collections.unmodifiableCollection(addrs);
 +    }
 +
 +    /**
 +     * @return New connection manager based on current client settings.
 +     * @throws GridClientException If failed to start connection server.
 +     */
-     public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId) throws GridClientException {
-         return createConnectionManager(id, sslCtx, cfg, routers, top, marshId);
++    public GridClientConnectionManager newConnectionManager(@Nullable Byte marshId, boolean routerClient)
++        throws GridClientException {
++        return createConnectionManager(id, sslCtx, cfg, routers, top, marshId, routerClient);
 +    }
 +
 +    /**
 +     * @param clientId Client ID.
 +     * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
 +     * @param cfg Client configuration.
 +     * @param routers Routers or empty collection to use endpoints from topology info.
 +     * @param top Topology.
 +     * @throws GridClientException In case of error.
 +     */
 +    private GridClientConnectionManager createConnectionManager(UUID clientId, SSLContext sslCtx,
 +        GridClientConfiguration cfg, Collection<InetSocketAddress> routers, GridClientTopology top,
-         @Nullable Byte marshId)
++        @Nullable Byte marshId, boolean routerClient)
 +        throws GridClientException {
 +        GridClientConnectionManager mgr;
 +
 +        try {
 +            Class<?> cls = Class.forName(ENT_CONN_MGR_CLS);
 +
 +            Constructor<?> cons = cls.getConstructor(UUID.class, SSLContext.class, GridClientConfiguration.class,
 +                Collection.class, GridClientTopology.class, Byte.class);
 +
-             mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId);
++            mgr = (GridClientConnectionManager)cons.newInstance(clientId, sslCtx, cfg, routers, top, marshId,
++                routerClient);
 +        }
 +        catch (ClassNotFoundException ignored) {
-             mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId);
++            mgr = new GridClientConnectionManagerOsImpl(clientId, sslCtx, cfg, routers, top, marshId, routerClient);
 +        }
 +        catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
 +            throw new GridClientException("Failed to create client connection manager.", e);
 +        }
 +
 +        return mgr;
 +    }
 +
 +    /**
 +     * Tries to init client topology using configured set of servers or routers.
 +     *
 +     * @throws GridClientException If initialisation failed.
 +     * @throws InterruptedException If initialisation was interrupted.
 +     */
 +    private void tryInitTopology() throws GridClientException, InterruptedException {
 +        boolean hasSrvs = routers.isEmpty();
 +
 +        final Collection<InetSocketAddress> connSrvs = (hasSrvs) ? new LinkedHashSet<>(srvs) : routers;
 +
 +        if (hasSrvs) {
 +            // Add REST endpoints for all nodes from previous topology snapshot.
 +            try {
 +                for (GridClientNodeImpl node : top.nodes()) {
 +                    Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true);
 +
 +                    List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size());
 +
 +                    for (InetSocketAddress endpoint : endpoints)
 +                        if (!endpoint.isUnresolved())
 +                            resolvedEndpoints.add(endpoint);
 +
 +                    boolean sameHost = node.attributes().isEmpty() ||
 +                        F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
 +
 +                    if (sameHost) {
 +                        Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true));
 +
 +                        connSrvs.addAll(resolvedEndpoints);
 +                    }
 +                    else {
 +                        for (InetSocketAddress endpoint : resolvedEndpoints)
 +                            if (!endpoint.getAddress().isLoopbackAddress())
 +                                connSrvs.add(endpoint);
 +                    }
 +                }
 +            }
 +            catch (GridClientDisconnectedException ignored) {
 +                // Ignore if latest topology update failed.
 +            }
 +        }
 +
 +        connMgr.init(connSrvs);
 +
 +        Map<String, GridClientCacheMode> overallCaches = new HashMap<>();
 +
 +        for (GridClientNodeImpl node : top.nodes())
 +            overallCaches.putAll(node.caches());
 +
 +        for (Map.Entry<String, GridClientCacheMode> entry : overallCaches.entrySet()) {
 +            GridClientDataAffinity affinity = affinity(entry.getKey());
 +
 +            if (affinity instanceof GridClientPartitionAffinity && entry.getValue() !=
 +                GridClientCacheMode.PARTITIONED)
 +                log.warning(GridClientPartitionAffinity.class.getSimpleName() + " is used for a cache configured " +
 +                    "for non-partitioned mode [cacheName=" + entry.getKey() + ", cacheMode=" + entry.getValue() + ']');
 +        }
 +    }
 +
 +    /**
 +     * Thread that updates topology according to refresh interval specified in configuration.
 +     */
 +    @SuppressWarnings("BusyWait")
 +    private class TopologyUpdaterThread extends Thread {
 +        /**
 +         * Creates topology refresh thread.
 +         */
 +        private TopologyUpdaterThread() {
 +            super(id + "-topology-update");
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void run() {
 +            try {
 +                while (!isInterrupted()) {
 +                    Thread.sleep(cfg.getTopologyRefreshFrequency());
 +
 +                    try {
 +                        tryInitTopology();
 +                    }
 +                    catch (GridClientException e) {
 +                        top.fail(e);
 +
 +                        if (log.isLoggable(Level.FINE))
 +                            log.fine("Failed to update topology: " + e.getMessage());
 +                    }
 +                }
 +            }
 +            catch (InterruptedException ignored) {
 +                // Client is shutting down.
 +                Thread.currentThread().interrupt();
 +            }
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return "GridClientImpl [id=" + id + ", closed=" + closed + ']';
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java
index c6d4e1c,0000000..6692550
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientNodeImpl.java
@@@ -1,411 -1,0 +1,396 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl;
 +
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.net.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.*;
 +
 +/**
 + * Client node implementation.
 + */
 +public class GridClientNodeImpl implements GridClientNode {
 +    /** Node id. */
 +    private UUID nodeId;
 +
 +    /** Consistent ID. */
 +    private Object consistentId;
 +
 +    /** REST TCP server addresses. */
 +    private List<String> tcpAddrs = Collections.emptyList();
 +
 +    /** REST TCP server host names. */
 +    private List<String> tcpHostNames = Collections.emptyList();
 +
 +    /** Port for TCP rest binary protocol. */
 +    private int tcpPort;
 +
 +    /** Node attributes. */
 +    private Map<String, Object> attrs = Collections.emptyMap();
 +
 +    /** Node metrics. */
 +    private GridClientNodeMetrics metrics;
 +
 +    /** Node caches. */
 +    private Map<String, GridClientCacheMode> caches = Collections.emptyMap();
 +
 +    /** Replica count for partitioned cache. */
 +    private int replicaCnt;
 +
 +    /** Connectable property. */
 +    private boolean connectable;
 +
 +    /** Cache for REST TCP socket addresses. */
 +    private final AtomicReference<Collection<InetSocketAddress>> tcpSockAddrs = new AtomicReference<>();
 +
 +    /**
 +     * Default constructor (private).
 +     */
 +    private GridClientNodeImpl() {
 +        // No-op.
 +    }
 +
 +    /**
 +     * Creates and returns a builder for a new instance
 +     * of this class.
 +     *
 +     * @return Builder for new instance.
 +     */
 +    public static Builder builder() {
 +        return new Builder(new GridClientNodeImpl());
 +    }
 +
 +    /**
 +     * Creates and returns a builder for a new instance
 +     * of this class, copying data from an input instance.
 +     *
 +     * @param from Instance to copy data from.
 +     * @param skipAttrs Whether to skip attributes.
 +     * @param skipMetrics Whether to skip metrics.
 +     * @return Builder for new instance.
 +     */
 +    public static Builder builder(GridClientNode from, boolean skipAttrs, boolean skipMetrics) {
 +        Builder b = new Builder(new GridClientNodeImpl())
 +            .nodeId(from.nodeId())
 +            .consistentId(from.consistentId())
 +            .tcpAddresses(from.tcpAddresses())
 +            .tcpPort(from.tcpPort())
 +            .caches(from.caches())
 +            .replicaCount(from.replicaCount())
 +            .connectable(from.connectable());
 +
 +        if (!skipAttrs)
 +            b.attributes(from.attributes());
 +
 +        if (!skipMetrics)
 +            b.metrics(from.metrics());
 +
 +        return b;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public UUID nodeId() {
 +        return nodeId;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Object consistentId() {
 +        return consistentId;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public List<String> tcpAddresses() {
 +        return tcpAddrs;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public List<String> tcpHostNames() {
 +        return tcpHostNames;
 +    }
 +
 +    /** {@inheritDoc} */
-     @Override public List<String> jettyAddresses() {
-         throw new UnsupportedOperationException();
-     }
- 
-     /** {@inheritDoc} */
-     @Override public List<String> jettyHostNames() {
-         throw new UnsupportedOperationException();
-     }
- 
-     /** {@inheritDoc} */
 +    @Override public int tcpPort() {
 +        return tcpPort;
 +    }
 +
 +    /** {@inheritDoc} */
-     @Override public int httpPort() {
-         throw new UnsupportedOperationException();
-     }
- 
-     /** {@inheritDoc} */
 +    @Override public Map<String, Object> attributes() {
 +        return Collections.unmodifiableMap(attrs);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Nullable @Override public <T> T attribute(String name) {
 +        return (T)attrs.get(name);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientNodeMetrics metrics() {
 +        return metrics;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Map<String, GridClientCacheMode> caches() {
 +        return caches;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int replicaCount() {
 +        return replicaCnt;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<InetSocketAddress> availableAddresses(GridClientProtocol proto,
 +        boolean filterResolved) {
 +        Collection<String> addrs;
 +        Collection<String> hostNames;
 +        AtomicReference<Collection<InetSocketAddress>> addrsCache;
 +        final int port;
 +
 +        if (proto == GridClientProtocol.TCP) {
 +            addrsCache = tcpSockAddrs;
 +            addrs = tcpAddrs;
 +            hostNames = tcpHostNames;
 +            port = tcpPort;
 +        }
 +        else
 +            throw new AssertionError("Unknown protocol: " + proto);
 +
 +        Collection<InetSocketAddress> addrs0 = addrsCache.get();
 +
 +        if (addrs0 != null)
 +            return filterIfNecessary(addrs0, filterResolved);
 +
 +        addrs0 = U.toSocketAddresses(addrs, hostNames, port);
 +
 +        if (!addrsCache.compareAndSet(null, addrs0))
 +            return filterIfNecessary(addrsCache.get(), filterResolved);
 +
 +        return filterIfNecessary(addrs0, filterResolved);
 +    }
 +
 +    /**
 +     * Filters sockets with resolved addresses.
 +     *
 +     * @param addrs Addresses to filter.
 +     * @param filter Flag indicating whether filter should be applied or not.
 +     * @return Collection copy without unresolved addresses if flag is set and collection itself otherwise.
 +     */
 +    private Collection<InetSocketAddress> filterIfNecessary(Collection<InetSocketAddress> addrs, boolean filter) {
 +        if (!filter)
 +            return addrs;
 +
 +        List<InetSocketAddress> res = new ArrayList<>(addrs.size());
 +
 +        for (InetSocketAddress addr : addrs)
 +            if (!addr.isUnresolved())
 +                res.add(addr);
 +
 +        return res;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean connectable() {
 +        return connectable;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean equals(Object o) {
 +        if (this == o) return true;
 +
 +        if (!(o instanceof GridClientNodeImpl)) return false;
 +
 +        GridClientNodeImpl that = (GridClientNodeImpl)o;
 +
 +        return nodeId.equals(that.nodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int hashCode() {
 +        return nodeId.hashCode();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public String toString() {
 +        return "GridClientNodeImpl [nodeId=" + nodeId +
 +            ", consistentId=" + consistentId +
 +            ", tcpAddrs=" + tcpAddrs +
 +            ", tcpHostNames=" + tcpHostNames +
 +            ", binaryPort=" + tcpPort +
 +            ']';
 +    }
 +
 +    /**
 +     * Builder for instances of this class.
 +     */
 +    @SuppressWarnings("PublicInnerClass")
 +    public static final class Builder {
 +        /** */
 +        private GridClientNodeImpl impl;
 +
 +        /** */
 +        private boolean built;
 +
 +        /**
 +         * @param impl Implementation reference to build.
 +         */
 +        private Builder(GridClientNodeImpl impl) {
 +            this.impl = impl;
 +        }
 +
 +        /**
 +         * Finishes instance construction and returns a
 +         * newly-built instance.
 +         *
 +         * @return A newly-built instance.
 +         */
 +        public GridClientNodeImpl build() {
 +            if (built)
 +                throw new AssertionError("Instance already built.");
 +
 +            built = true;
 +
 +            return impl;
 +        }
 +
 +        /**
 +         * Sets node ID.
 +         *
 +         * @param nodeId Node ID.
 +         * @return This for chaining.
 +         */
 +        public Builder nodeId(UUID nodeId) {
 +            impl.nodeId = nodeId;
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets node consistent ID.
 +         *
 +         * @param consistentId New consistent ID.
 +         * @return This for chaining.
 +         */
 +        public Builder consistentId(Object consistentId) {
 +            impl.consistentId = consistentId;
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets list of REST TCP server addresses.
 +         *
 +         * @param tcpAddrs List of address strings.
 +         * @return This for chaining.
 +         */
 +        public Builder tcpAddresses(Collection<String> tcpAddrs) {
 +            impl.tcpAddrs = U.sealList(tcpAddrs);
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets list of REST TCP server host names.
 +         *
 +         * @param tcpHostNames List of host names.
 +         * @return This for chaining.
 +         */
 +        public Builder tcpHostNames(Collection<String> tcpHostNames) {
 +            impl.tcpHostNames = U.sealList(tcpHostNames);
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets remote TCP port value.
 +         *
 +         * @param tcpPort Sets remote port value.
 +         * @return This for chaining.
 +         */
 +        public Builder tcpPort(int tcpPort) {
 +            impl.tcpPort = tcpPort;
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets node attributes.
 +         *
 +         * @param attrs Node attributes.
 +         * @return This for chaining.
 +         */
 +        public Builder attributes(Map<String, Object> attrs) {
 +            impl.attrs = U.sealMap(attrs);
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets node metrics.
 +         *
 +         * @param metrics Metrics.
 +         * @return This for chaining.
 +         */
 +        public Builder metrics(GridClientNodeMetrics metrics) {
 +            impl.metrics = metrics;
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets caches available on remote node.
 +         *
 +         * @param caches Cache map.
 +         * @return This for chaining.
 +         */
 +        public Builder caches(Map<String, GridClientCacheMode> caches) {
 +            impl.caches = U.sealMap(caches);
 +
 +            return this;
 +        }
 +
 +
 +        /**
 +         * Sets replica count for node on consistent hash ring.
 +         *
 +         * @param replicaCnt Replica count.
 +         * @return This for chaining.
 +         */
 +        public Builder replicaCount(int replicaCnt) {
 +            impl.replicaCnt = replicaCnt;
 +
 +            return this;
 +        }
 +
 +        /**
 +         * Sets connectable property.
 +         *
 +         * @param connectable Connectable value.
 +         * @return This for chaining.
 +         */
 +        public Builder connectable(boolean connectable) {
 +            impl.connectable = connectable;
 +
 +            return this;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java
index 0d8c2f7,0000000..7fa6b4c
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/GridClientThreadFactory.java
@@@ -1,62 -1,0 +1,62 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl;
 +
 +import org.jetbrains.annotations.*;
 +
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +
 +/**
 + * Delegating thread factory which forces all spawned thread to be daemons.
 + */
 +public class GridClientThreadFactory implements ThreadFactory {
 +    /** Pool number. */
 +    private static final AtomicInteger poolCtr = new AtomicInteger(1);
 +
 +    /** Thread number. */
 +    private final AtomicInteger threadCtr = new AtomicInteger(1);
 +
 +    /** Prefix. */
 +    private final String prefix;
 +
 +    /** Daemon flag. */
 +    private final boolean daemon;
 +
 +    /**
 +     * Constructor.
 +     *
 +     * @param name Name prefix.
 +     * @param daemon Daemon flag.
 +     */
 +    public GridClientThreadFactory(String name, boolean daemon) {
 +        this.daemon = daemon;
 +
-         prefix = "gridgain-client-" + name + "-" + poolCtr.getAndIncrement() + "-";
++        prefix = "ignite-client-" + name + "-" + poolCtr.getAndIncrement() + "-";
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Thread newThread(@NotNull Runnable r) {
 +        Thread thread = new Thread(r, prefix + threadCtr.incrementAndGet());
 +
 +        if (daemon)
 +            thread.setDaemon(true);
 +
 +        return thread;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index e9a3267,0000000..eca5de0
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@@ -1,763 -1,0 +1,644 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl.connection;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.impl.*;
 +import org.apache.ignite.internal.client.util.*;
 +import org.apache.ignite.internal.processors.rest.client.message.*;
- import org.apache.ignite.internal.util.direct.*;
++import org.apache.ignite.internal.processors.rest.protocols.tcp.*;
 +import org.apache.ignite.internal.util.nio.*;
 +import org.apache.ignite.internal.util.nio.ssl.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.logger.java.*;
 +import org.apache.ignite.plugin.security.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.net.ssl.*;
 +import java.io.*;
 +import java.net.*;
 +import java.nio.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.logging.*;
 +
 +import static java.util.logging.Level.*;
- import static org.apache.ignite.internal.GridNodeAttributes.*;
 +import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*;
++import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 +
 +/**
 + * Cached connections manager.
 + */
 +abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager {
 +    /** Count of reconnect retries before init considered failed. */
 +    private static final int INIT_RETRY_CNT = 3;
 +
 +    /** Initialization retry interval. */
 +    private static final int INIT_RETRY_INTERVAL = 1000;
 +
 +    /** Class logger. */
 +    private final Logger log;
 +
 +    /** NIO server. */
 +    private GridNioServer srv;
 +
 +    /** Active connections. */
 +    private final ConcurrentMap<InetSocketAddress, GridClientConnection> conns = new ConcurrentHashMap<>();
 +
 +    /** Active connections of nodes. */
 +    private final ConcurrentMap<UUID, GridClientConnection> nodeConns = new ConcurrentHashMap<>();
 +
 +    /** SSL context. */
 +    private final SSLContext sslCtx;
 +
 +    /** Client configuration. */
 +    protected final GridClientConfiguration cfg;
 +
 +    /** Topology. */
 +    private final GridClientTopology top;
 +
 +    /** Client id. */
 +    private final UUID clientId;
 +
 +    /** Router endpoints to use instead of topology info. */
 +    private final Collection<InetSocketAddress> routers;
 +
 +    /** Closed flag. */
 +    private volatile boolean closed;
 +
 +    /** Shared executor service. */
 +    private final ExecutorService executor;
 +
 +    /** Endpoint striped lock. */
 +    private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16);
 +
 +    /** Service for ping requests, {@code null} if HTTP protocol is used. */
 +    private final ScheduledExecutorService pingExecutor;
 +
 +    /** Marshaller ID. */
 +    private final Byte marshId;
 +
-     /** Message writer. */
-     @SuppressWarnings("FieldCanBeLocal")
-     private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() {
-         @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
-             assert msg != null;
-             assert buf != null;
- 
-             msg.messageWriter(this, nodeId);
- 
-             return msg.writeTo(buf);
-         }
- 
-         @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out,
-             ByteBuffer buf) throws IOException {
-             assert msg != null;
-             assert out != null;
-             assert buf != null;
-             assert buf.hasArray();
- 
-             msg.messageWriter(this, nodeId);
- 
-             boolean finished = false;
-             int cnt = 0;
- 
-             while (!finished) {
-                 finished = msg.writeTo(buf);
- 
-                 out.write(buf.array(), 0, buf.position());
- 
-                 cnt += buf.position();
- 
-                 buf.clear();
-             }
- 
-             return cnt;
-         }
-     };
- 
 +    /**
 +     * @param clientId Client ID.
 +     * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
 +     * @param cfg Client configuration.
 +     * @param routers Routers or empty collection to use endpoints from topology info.
 +     * @param top Topology.
 +     * @param marshId Marshaller ID.
 +     * @throws GridClientException In case of error.
 +     */
 +    @SuppressWarnings("unchecked")
 +    protected GridClientConnectionManagerAdapter(UUID clientId,
 +        SSLContext sslCtx,
 +        GridClientConfiguration cfg,
 +        Collection<InetSocketAddress> routers,
 +        GridClientTopology top,
-         @Nullable Byte marshId)
++        @Nullable Byte marshId,
++        boolean routerClient)
 +        throws GridClientException {
 +        assert clientId != null : "clientId != null";
 +        assert cfg != null : "cfg != null";
 +        assert routers != null : "routers != null";
 +        assert top != null : "top != null";
 +
 +        this.clientId = clientId;
 +        this.sslCtx = sslCtx;
 +        this.cfg = cfg;
 +        this.routers = new ArrayList<>(routers);
 +        this.top = top;
 +
 +        log = Logger.getLogger(getClass().getName());
 +
 +        executor = cfg.getExecutorService() != null ? cfg.getExecutorService() :
 +            Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true));
 +
 +        pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool(
 +            Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null;
 +
 +        this.marshId = marshId;
 +
 +        if (marshId == null && cfg.getMarshaller() == null)
 +            throw new GridClientException("Failed to start client (marshaller is not configured).");
 +
 +        if (cfg.getProtocol() == GridClientProtocol.TCP) {
 +            try {
-                 IgniteLogger gridLog = new IgniteJavaLogger(false);
++                IgniteLogger gridLog = new JavaLogger(false);
 +
 +                GridNioFilter[] filters;
 +
-                 GridNioMessageReader msgReader = new GridNioMessageReader() {
-                     @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg,
-                         ByteBuffer buf) {
-                         assert msg != null;
-                         assert buf != null;
- 
-                         msg.messageReader(this, nodeId);
- 
-                         return msg.readFrom(buf);
-                     }
- 
-                     @Nullable @Override public GridTcpMessageFactory messageFactory() {
-                         return null;
-                     }
-                 };
- 
-                 GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true);
++                GridNioFilter codecFilter = new GridNioCodecFilter(new GridTcpRestParser(routerClient), gridLog, false);
 +
 +                if (sslCtx != null) {
 +                    GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog);
 +
-                     sslFilter.directMode(true);
++                    sslFilter.directMode(false);
 +                    sslFilter.clientMode(true);
 +
 +                    filters = new GridNioFilter[]{codecFilter, sslFilter};
 +                }
 +                else
 +                    filters = new GridNioFilter[]{codecFilter};
 +
 +                srv = GridNioServer.builder().address(U.getLocalHost())
 +                    .port(-1)
 +                    .listener(new NioListener(log))
 +                    .filters(filters)
 +                    .logger(gridLog)
 +                    .selectorCount(Runtime.getRuntime().availableProcessors())
 +                    .sendQueueLimit(1024)
 +                    .byteOrder(ByteOrder.nativeOrder())
 +                    .tcpNoDelay(cfg.isTcpNoDelay())
 +                    .directBuffer(true)
-                     .directMode(true)
++                    .directMode(false)
 +                    .socketReceiveBufferSize(0)
 +                    .socketSendBufferSize(0)
 +                    .idleTimeout(Long.MAX_VALUE)
-                     .gridName("gridClient")
-                     .messageWriter(msgWriter)
++                    .gridName(routerClient ? "routerClient" : "gridClient")
 +                    .daemon(cfg.isDaemon())
 +                    .build();
 +
 +                srv.start();
 +            }
 +            catch (IOException | IgniteCheckedException e) {
 +                throw new GridClientException("Failed to start connection server.", e);
 +            }
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("BusyWait")
 +    @Override public void init(Collection<InetSocketAddress> srvs) throws GridClientException, InterruptedException {
 +        init0();
 +
 +        GridClientException firstEx = null;
 +
 +        for (int i = 0; i < INIT_RETRY_CNT; i++) {
 +            Collection<InetSocketAddress> srvsCp = new ArrayList<>(srvs);
 +
 +            while (!srvsCp.isEmpty()) {
 +                GridClientConnection conn = null;
 +
 +                try {
 +                    conn = connect(null, srvsCp);
 +
 +                    conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get();
 +
 +                    return;
 +                }
 +                catch (GridServerUnreachableException e) {
 +                    // No connection could be opened to any of initial addresses - exit to retry loop.
 +                    assert conn == null :
 +                        "GridClientConnectionResetException was thrown from GridClientConnection#topology";
 +
 +                    if (firstEx == null)
 +                        firstEx = e;
 +
 +                    break;
 +                }
 +                catch (GridClientConnectionResetException e) {
 +                    // Connection was established but topology update failed -
 +                    // trying other initial addresses if any.
 +                    assert conn != null : "GridClientConnectionResetException was thrown from connect()";
 +
 +                    if (firstEx == null)
 +                        firstEx = e;
 +
 +                    if (!srvsCp.remove(conn.serverAddress()))
 +                        // We have misbehaving collection or equals - just exit to avoid infinite loop.
 +                        break;
 +                }
 +            }
 +
 +            Thread.sleep(INIT_RETRY_INTERVAL);
 +        }
 +
 +        for (GridClientConnection c : conns.values()) {
 +            conns.remove(c.serverAddress(), c);
 +
 +            c.close(FAILED, false);
 +        }
 +
 +        throw firstEx;
 +    }
 +
 +    /**
 +     * Additional initialization.
 +     *
 +     * @throws GridClientException In case of error.
 +     */
 +    protected abstract void init0() throws GridClientException;
 +
 +    /**
 +     * Gets active communication facade.
 +     *
 +     * @param node Remote node to which connection should be established.
 +     * @throws GridServerUnreachableException If none of the servers can be reached after the exception.
 +     * @throws GridClientClosedException If client was closed manually.
 +     * @throws InterruptedException If connection was interrupted.
 +     */
 +    @Override public GridClientConnection connection(GridClientNode node)
 +        throws GridClientClosedException, GridServerUnreachableException, InterruptedException {
 +        assert node != null;
 +
 +        // Use router's connections if defined.
 +        if (!routers.isEmpty())
 +            return connection(null, routers);
 +
 +        GridClientConnection conn = nodeConns.get(node.nodeId());
 +
 +        if (conn != null) {
 +            // Ignore closed connections.
 +            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
 +                closeIdle();
 +            else
 +                return conn;
 +        }
 +
 +        // Use node's connection, if node is available over rest.
 +        Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true);
 +
 +        List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size());
 +
 +        for (InetSocketAddress endpoint : endpoints)
 +            if (!endpoint.isUnresolved())
 +                resolvedEndpoints.add(endpoint);
 +
 +        if (resolvedEndpoints.isEmpty()) {
 +            throw new GridServerUnreachableException("No available endpoints to connect " +
 +                "(is rest enabled for this node?): " + node);
 +        }
 +
 +        boolean sameHost = node.attributes().isEmpty() ||
 +            F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
 +
 +        Collection<InetSocketAddress> srvs = new LinkedHashSet<>();
 +
 +        if (sameHost) {
 +            Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true));
 +
 +            srvs.addAll(resolvedEndpoints);
 +        }
 +        else {
 +            for (InetSocketAddress endpoint : resolvedEndpoints)
 +                if (!endpoint.getAddress().isLoopbackAddress())
 +                    srvs.add(endpoint);
 +        }
 +
 +        return connection(node.nodeId(), srvs);
 +    }
 +
 +    /**
 +     * Returns connection to one of the given addresses.
 +     *
 +     * @param nodeId {@code UUID} of node for mapping with connection.
 +     *      {@code null} if no need of mapping.
 +     * @param srvs Collection of addresses to connect to.
 +     * @return Connection to use for operations, targeted for the given node.
 +     * @throws GridServerUnreachableException If connection can't be established.
 +     * @throws GridClientClosedException If connections manager has been closed already.
 +     * @throws InterruptedException If connection was interrupted.
 +     */
 +    public GridClientConnection connection(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
 +        throws GridServerUnreachableException, GridClientClosedException, InterruptedException {
 +        if (srvs == null || srvs.isEmpty())
 +            throw new GridServerUnreachableException("Failed to establish connection to the grid" +
 +                " (address list is empty).");
 +
 +        checkClosed();
 +
 +        // Search for existent connection.
 +        for (InetSocketAddress endPoint : srvs) {
 +            assert endPoint != null;
 +
 +            GridClientConnection conn = conns.get(endPoint);
 +
 +            if (conn == null)
 +                continue;
 +
 +            // Ignore closed connections.
 +            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
 +                closeIdle();
 +
 +                continue;
 +            }
 +
 +            if (nodeId != null)
 +                nodeConns.put(nodeId, conn);
 +
 +            return conn;
 +        }
 +
 +        return connect(nodeId, srvs);
 +    }
 +
 +    /**
 +     * Creates a connected facade and returns it. Called either from constructor or inside
 +     * a write lock.
 +     *
 +     * @param nodeId {@code UUID} of node for mapping with connection.
 +     *      {@code null} if no need of mapping.
 +     * @param srvs List of server addresses that this method will try to connect to.
 +     * @return Established connection.
 +     * @throws GridServerUnreachableException If none of the servers can be reached.
 +     * @throws InterruptedException If connection was interrupted.
 +     */
 +    protected GridClientConnection connect(@Nullable UUID nodeId, Collection<InetSocketAddress> srvs)
 +        throws GridServerUnreachableException, InterruptedException {
 +        if (srvs.isEmpty())
 +            throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " +
 +                "list is empty).");
 +
 +        Exception cause = null;
 +
 +        for (InetSocketAddress srv : srvs) {
 +            try {
 +                return connect(nodeId, srv);
 +            }
 +            catch (InterruptedException e) {
 +                throw e;
 +            }
 +            catch (Exception e) {
 +                if (cause == null)
 +                    cause = e;
 +                else if (log.isLoggable(INFO))
 +                    log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']');
 +            }
 +        }
 +
 +        assert cause != null;
 +
 +        throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause);
 +    }
 +
 +    /**
 +     * Create new connection to specified server.
 +     *
 +     * @param nodeId {@code UUID} of node for mapping with connection.
 +     *      {@code null} if no need of mapping.
 +     * @param addr Remote socket to connect.
 +     * @return Established connection.
 +     * @throws IOException If connection failed.
 +     * @throws GridClientException If protocol error happened.
 +     * @throws InterruptedException If thread was interrupted before connection was established.
 +     */
 +    protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr)
 +        throws IOException, GridClientException, InterruptedException {
 +        endpointStripedLock.lock(addr);
 +
 +        try {
 +            GridClientConnection old = conns.get(addr);
 +
 +            if (old != null) {
 +                if (old.isClosed()) {
 +                    conns.remove(addr, old);
 +
 +                    if (nodeId != null)
 +                        nodeConns.remove(nodeId, old);
 +                }
 +                else {
 +                    if (nodeId != null)
 +                        nodeConns.put(nodeId, old);
 +
 +                    return old;
 +                }
 +            }
 +
 +            GridSecurityCredentials cred = null;
 +
 +            try {
 +                if (cfg.getSecurityCredentialsProvider() != null)
 +                    cred = cfg.getSecurityCredentialsProvider().credentials();
 +            }
 +            catch (IgniteCheckedException e) {
 +                throw new GridClientException("Failed to obtain client credentials.", e);
 +            }
 +
 +            GridClientConnection conn;
 +
 +            if (cfg.getProtocol() == GridClientProtocol.TCP) {
 +                conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor,
 +                    cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(),
 +                    cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal());
 +            }
 +            else
 +                throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " +
 +                    cfg.getProtocol());
 +
 +            old = conns.putIfAbsent(addr, conn);
 +
 +            assert old == null;
 +
 +            if (nodeId != null)
 +                nodeConns.put(nodeId, conn);
 +
 +            return conn;
 +        }
 +        finally {
 +            endpointStripedLock.unlock(addr);
 +        }
 +    }
 +
 +    /**
 +     * @return Get thread local used to enable keep portables mode.
 +     */
 +    protected ThreadLocal<Boolean> keepPortablesThreadLocal() {
 +        return null;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) {
 +        if (log.isLoggable(Level.FINE))
 +            log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" +
 +                conn.serverAddress() + ", errMsg=" + e.getMessage() + ']');
 +
 +        closeIdle();
 +
 +        conn.close(FAILED, false);
 +    }
 +
 +    /**
 +     * Closes all opened connections.
 +     *
 +     * @param waitCompletion If {@code true} waits for all pending requests to be proceeded.
 +     */
 +    @SuppressWarnings("TooBroadScope")
 +    @Override public void stop(boolean waitCompletion) {
 +        Collection<GridClientConnection> closeConns;
 +
 +        if (closed)
 +            return;
 +
 +        // Mark manager as closed.
 +        closed = true;
 +
 +        // Remove all connections from cache.
 +        closeConns = new ArrayList<>(conns.values());
 +
 +        conns.clear();
 +
 +        nodeConns.clear();
 +
 +        // Close old connection outside the writer lock.
 +        for (GridClientConnection conn : closeConns)
 +            conn.close(CLIENT_CLOSED, waitCompletion);
 +
 +        if (pingExecutor != null)
 +            GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log);
 +
 +        GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log);
 +
 +        if (srv != null)
 +            srv.stop();
 +    }
 +
 +    /**
 +     * Close all connections idling for more then
 +     * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds.
 +     */
 +    @SuppressWarnings("ForLoopReplaceableByForEach")
 +    private void closeIdle() {
 +        for (Iterator<Map.Entry<UUID, GridClientConnection>> it = nodeConns.entrySet().iterator(); it.hasNext(); ) {
 +            Map.Entry<UUID, GridClientConnection> entry = it.next();
 +
 +            GridClientConnection conn = entry.getValue();
 +
 +            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) {
 +                conns.remove(conn.serverAddress(), conn);
 +
 +                nodeConns.remove(entry.getKey(), conn);
 +            }
 +        }
 +
 +        for (GridClientConnection conn : conns.values())
 +            if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime()))
 +                conns.remove(conn.serverAddress(), conn);
 +    }
 +
 +    /**
 +     * Checks and throws an exception if this client was closed.
 +     *
 +     * @throws GridClientClosedException If client was closed.
 +     */
 +    private void checkClosed() throws GridClientClosedException {
 +        if (closed)
 +            throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore).");
 +    }
 +
 +    /**
 +     */
 +    private static class NioListener implements GridNioServerListener {
 +        /** */
 +        private final Logger log;
 +
 +        /**
 +         * @param log Logger.
 +         */
 +        private NioListener(Logger log) {
 +            this.log = log;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onConnected(GridNioSession ses) {
 +            if (log.isLoggable(Level.FINE))
 +                log.fine("Session connected: " + ses);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
 +            if (log.isLoggable(Level.FINE))
 +                log.fine("Session disconnected: " + ses);
 +
 +            GridClientFutureAdapter<Boolean> handshakeFut =
 +                ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
 +
 +            if (handshakeFut != null)
 +                handshakeFut.onDone(
 +                    new GridClientConnectionResetException("Failed to perform handshake (connection failed)."));
 +            else {
 +                GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
 +
 +                if (conn != null)
 +                    conn.close(FAILED, false);
 +            }
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onMessage(GridNioSession ses, Object msg) {
 +            GridClientFutureAdapter<Boolean> handshakeFut =
 +                ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
 +
 +            if (handshakeFut != null) {
 +                assert msg instanceof GridClientHandshakeResponse;
 +
 +                handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg);
 +            }
 +            else {
 +                GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN);
 +
 +                assert conn != null;
 +
-                 if (msg instanceof GridClientMessageWrapper) {
-                     GridClientMessageWrapper req = (GridClientMessageWrapper)msg;
- 
-                     if (req.messageSize() != 0) {
-                         assert req.message() != null;
- 
-                         conn.handleResponse(req);
-                     }
-                     else
-                         conn.handlePingResponse();
-                 }
-                 else {
-                     assert msg instanceof GridClientPingPacket : msg;
- 
++                if (msg instanceof GridClientPingPacket)
 +                    conn.handlePingResponse();
++                else {
++                    try {
++                        conn.handleResponse((GridClientMessage)msg);
++                    }
++                    catch (IOException e) {
++                        log.log(Level.SEVERE, "Failed to parse response.", e);
++                    }
 +                }
 +            }
 +        }
 +
 +        /**
 +         * Handles client handshake response.
 +         *
 +         * @param handshakeFut Future.
 +         * @param msg A handshake response.
 +         */
 +        private void handleHandshakeResponse(GridClientFutureAdapter<Boolean> handshakeFut,
 +            GridClientHandshakeResponse msg) {
 +            byte rc = msg.resultCode();
 +
 +            if (rc != GridClientHandshakeResponse.OK.resultCode()) {
 +                handshakeFut.onDone(new GridClientHandshakeException(rc,
 +                    "Handshake failed due to internal error (see server log for more details)."));
 +            }
 +            else
 +                handshakeFut.onDone(true);
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onSessionWriteTimeout(GridNioSession ses) {
 +            if (log.isLoggable(Level.FINE))
 +                log.fine("Closing NIO session because of write timeout.");
 +
 +            ses.close();
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public void onSessionIdleTimeout(GridNioSession ses) {
 +            if (log.isLoggable(Level.FINE))
 +                log.fine("Closing NIO session because of idle timeout.");
 +
 +            ses.close();
 +        }
 +    }
- 
-     /**
-      *
-      */
-     private static class NioParser implements GridNioParser {
-         /** Message metadata key. */
-         private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
- 
-         /** Message reader. */
-         private final GridNioMessageReader msgReader;
- 
-         /**
-          * @param msgReader Message reader.
-          */
-         NioParser(GridNioMessageReader msgReader) {
-             this.msgReader = msgReader;
-         }
- 
-         /** {@inheritDoc} */
-         @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
-             GridClientFutureAdapter<?> handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE);
- 
-             if (handshakeFut != null) {
-                 byte code = buf.get();
- 
-                 return new GridClientHandshakeResponse(code);
-             }
- 
-             GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY);
- 
-             if (msg == null && buf.hasRemaining()) {
-                 byte type = buf.get();
- 
-                 if (type == GridClientMessageWrapper.REQ_HEADER)
-                     msg = new GridClientMessageWrapper();
-                 else
-                     throw new IOException("Invalid message type: " + type);
-             }
- 
-             boolean finished = false;
- 
-             if (buf.hasRemaining())
-                 finished = msgReader.read(null, msg, buf);
- 
-             if (finished)
-                 return msg;
-             else {
-                 ses.addMeta(MSG_META_KEY, msg);
- 
-                 return null;
-             }
-         }
- 
-         /** {@inheritDoc} */
-         @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
-             // No encoding needed for direct messages.
-             throw new UnsupportedEncodingException();
-         }
-     }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
index d16b235,0000000..6ffd50d
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerOsImpl.java
@@@ -1,47 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl.connection;
 +
 +import org.apache.ignite.internal.client.*;
 +
 +import javax.net.ssl.*;
 +import java.net.*;
 +import java.util.*;
 +
 +/**
 + * Open source version of connection manager.
 + */
 +public class GridClientConnectionManagerOsImpl extends GridClientConnectionManagerAdapter {
 +    /**
 +     * @param clientId Client ID.
 +     * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
 +     * @param cfg Client configuration.
 +     * @param routers Routers or empty collection to use endpoints from topology info.
 +     * @param top Topology.
 +     * @throws GridClientException In case of error.
 +     */
 +    public GridClientConnectionManagerOsImpl(UUID clientId, SSLContext sslCtx, GridClientConfiguration cfg,
-         Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId) throws GridClientException {
-         super(clientId, sslCtx, cfg, routers, top, marshId);
++        Collection<InetSocketAddress> routers, GridClientTopology top, Byte marshId, boolean routerClient)
++        throws GridClientException {
++        super(clientId, sslCtx, cfg, routers, top, marshId, routerClient);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void init0() throws GridClientException {
 +        // No-op.
 +    }
 +}


Mime
View raw message