ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [14/50] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' into ignite-128
Date Wed, 11 Feb 2015 03:02:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 1882663,0000000..d84bca5
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@@ -1,1147 -1,0 +1,1063 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl.connection;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.impl.*;
 +import org.apache.ignite.internal.client.marshaller.*;
 +import org.apache.ignite.internal.client.marshaller.jdk.*;
 +import org.apache.ignite.internal.client.marshaller.optimized.*;
 +import org.apache.ignite.internal.processors.rest.client.message.*;
 +import org.apache.ignite.internal.util.nio.*;
 +import org.apache.ignite.internal.util.nio.ssl.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.jetbrains.annotations.*;
 +
 +import javax.net.ssl.*;
 +import java.io.*;
 +import java.net.*;
- import java.nio.*;
 +import java.nio.channels.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.*;
 +import java.util.logging.*;
 +
 +import static org.apache.ignite.internal.client.GridClientCacheFlag.*;
 +import static org.apache.ignite.internal.client.impl.connection.GridClientConnectionCloseReason.*;
 +import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*;
++import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
 +
 +/**
 + * This class performs request to grid over tcp protocol. Serialization is performed with marshaller
 + * provided.
 + */
 +public class GridClientNioTcpConnection extends GridClientConnection {
 +    /** */
 +    static final int SES_META_HANDSHAKE = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /** */
 +    static final int SES_META_CONN = GridNioSessionMetaKey.nextUniqueKey();
 +
 +    /** Logger */
 +    private static final Logger log = Logger.getLogger(GridClientNioTcpConnection.class.getName());
 +
 +    /** Ping interval. */
 +    private final long pingInterval;
 +
 +    /** Ping timeout. */
 +    private final long pingTimeout;
 +
 +    /** Requests that are waiting for response. */
 +    private ConcurrentMap<Long, TcpClientFuture> pendingReqs = new ConcurrentHashMap<>();
 +
 +    /** Node by node id requests. Map for reducing server load. */
 +    private ConcurrentMap<UUID, TcpClientFuture> refreshNodeReqs = new ConcurrentHashMap<>();
 +
 +    /** Latch indicating pending are empty and connection could be terminated. */
 +    private final CountDownLatch closedLatch = new CountDownLatch(1);
 +
 +    /** Request ID counter. */
 +    private AtomicLong reqIdCntr = new AtomicLong(1);
 +
 +    /** Timestamp of last sent message. */
 +    private volatile long lastMsgSndTime;
 +
 +    /** Timestamp of last received message. */
 +    private volatile long lastMsgRcvTime;
 +
 +    /**
 +     * Ping receive time.
 +     * {@code 0} until first ping send and {@link Long#MAX_VALUE} while response isn't received.
 +     */
 +    private volatile long lastPingRcvTime;
 +
 +    /** Ping send time. */
 +    private volatile long lastPingSndTime;
 +
 +    /** Connection create timestamp. */
 +    private long createTs;
 +
 +    /** Session token. */
 +    private volatile byte[] sesTok;
 +
 +    /** Timer to run ping checks. */
 +    private ScheduledFuture<?> pingTask;
 +
 +    /** NIO session. */
 +    private GridNioSession ses;
 +
 +    /** Marshaller. */
 +    private final GridClientMarshaller marsh;
 +
 +    /** */
 +    private final ThreadLocal<Boolean> keepPortablesMode;
 +
 +    /**
 +     * Creates a client facade, tries to connect to remote server, in case of success starts reader thread.
 +     *
 +     * @param srv NIO server.
 +     * @param clientId Client identifier.
 +     * @param srvAddr Server to connect to.
 +     * @param sslCtx SSL context to use if SSL is enabled, {@code null} otherwise.
 +     * @param pingExecutor Executor service for sending ping requests.
 +     * @param connectTimeout Connect timeout.
 +     * @param pingInterval Ping interval.
 +     * @param pingTimeout Ping timeout.
 +     * @param tcpNoDelay TCP_NODELAY flag for outgoing socket connection.
 +     * @param marsh Marshaller to use in communication.
 +     * @param top Topology instance.
 +     * @param cred Client credentials.      @throws IOException If connection could not be established.
 +     * @throws IOException If IO error occurs.
 +     * @throws GridClientException If handshake error occurs.
 +     */
 +    @SuppressWarnings("unchecked")
 +    GridClientNioTcpConnection(GridNioServer srv,
 +        UUID clientId,
 +        InetSocketAddress srvAddr,
 +        SSLContext sslCtx,
 +        ScheduledExecutorService pingExecutor,
 +        int connectTimeout,
 +        long pingInterval,
 +        long pingTimeout,
 +        boolean tcpNoDelay,
 +        GridClientMarshaller marsh,
 +        Byte marshId,
 +        GridClientTopology top,
 +        Object cred,
 +        ThreadLocal<Boolean> keepPortablesMode
 +    ) throws IOException, GridClientException {
 +        super(clientId, srvAddr, sslCtx, top, cred);
 +
 +        assert marsh != null || marshId != null;
 +
 +        this.marsh = marsh;
 +        this.pingInterval = pingInterval;
 +        this.pingTimeout = pingTimeout;
 +        this.keepPortablesMode = keepPortablesMode;
 +
 +        SocketChannel ch = null;
 +        Socket sock = null;
 +        boolean cleanup = true;
 +
 +        try {
 +            ch = SocketChannel.open();
 +            sock = ch.socket();
 +
 +            sock.setTcpNoDelay(tcpNoDelay);
 +            sock.setKeepAlive(true);
 +
 +            sock.connect(srvAddr, connectTimeout);
 +
 +            GridClientFuture<?> handshakeFut = new GridClientFutureAdapter<>();
 +
 +            Map<Integer, Object> meta = new HashMap<>();
 +
 +            meta.put(SES_META_HANDSHAKE, handshakeFut);
 +
 +            GridNioFuture<?> sslHandshakeFut = null;
 +
 +            if (sslCtx != null) {
 +                sslHandshakeFut = new GridNioFutureImpl<>();
 +
 +                meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
 +            }
 +
 +            ses = (GridNioSession)srv.createSession(ch, meta).get();
 +
 +            if (sslHandshakeFut != null)
 +                sslHandshakeFut.get();
 +
 +            GridClientHandshakeRequest req = new GridClientHandshakeRequest();
 +
 +            if (marshId != null)
 +                req.marshallerId(marshId);
 +            // marsh != null.
 +            else if (marsh instanceof GridClientOptimizedMarshaller)
 +                req.marshallerId(GridClientOptimizedMarshaller.ID);
 +            else if (marsh instanceof GridClientJdkMarshaller)
 +                req.marshallerId(GridClientJdkMarshaller.ID);
 +
-             GridClientHandshakeRequestWrapper wrapper = new GridClientHandshakeRequestWrapper(req);
++            ses.addMeta(MARSHALLER.ordinal(), marsh);
 +
-             ses.send(wrapper);
++            ses.send(req);
 +
 +            handshakeFut.get();
 +
 +            ses.addMeta(SES_META_CONN, this);
 +
 +            if (log.isLoggable(Level.INFO))
 +                log.info("Client TCP connection established: " + serverAddress());
 +
 +            pingTask = pingExecutor.scheduleAtFixedRate(new Runnable() {
 +                @Override public void run() {
 +                    try {
 +                        makeRequest(GridClientPingPacket.PING_MESSAGE, (TcpClientFuture)null, false);
 +                    }
 +                    catch (Exception e) {
 +                        log.warning("Failed to send ping message: " + e);
 +                    }
 +                }
 +            }, 500, 500, TimeUnit.MILLISECONDS);
 +
 +            createTs = System.currentTimeMillis();
 +
 +            cleanup = false;
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new GridClientException(e);
 +        }
 +        finally {
 +            if (cleanup) {
 +                if (ses != null)
 +                    srv.close(ses);
 +
 +                if (sock!= null)
 +                    sock.close();
 +
 +                if (ch != null)
 +                    ch.close();
 +            }
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override void close(GridClientConnectionCloseReason reason, boolean waitCompletion) {
 +        close(reason, waitCompletion, null);
 +    }
 +
 +    /**
 +     * Closes connection facade.
 +     *
 +     * @param reason Why this connection should be closed.
 +     * @param waitCompletion If {@code true} this method will wait for all pending requests to be completed.
 +     * @param cause The cause of connection close, or {@code null} if it is an ordinal close.
 +     */
 +    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
 +    private void close(GridClientConnectionCloseReason reason, boolean waitCompletion, @Nullable Throwable cause) {
 +        synchronized (this) {
 +            if (closeReason != null)
 +                return;
 +
 +            closeReason = reason;
 +        }
 +
 +        try {
 +            // Wait for all pending requests to be processed.
 +            if (waitCompletion && !pendingReqs.isEmpty() && ses.closeTime() == 0)
 +                closedLatch.await();
 +        }
 +        catch (InterruptedException ignored) {
 +            log.warning("Interrupted while waiting for all requests to be processed (all pending " +
 +                "requests will be failed): " + serverAddress());
 +
 +            Thread.currentThread().interrupt();
 +        }
 +
 +        if (pingTask != null)
 +            pingTask.cancel(false);
 +
 +        if (ses != null)
 +            ses.close(); // Async close.
 +
 +        for (Iterator<TcpClientFuture> it = pendingReqs.values().iterator(); it.hasNext(); ) {
 +            GridClientFutureAdapter fut = it.next();
 +
 +            fut.onDone(getCloseReasonAsException(closeReason, cause));
 +
 +            it.remove();
 +        }
 +
 +        if (log.isLoggable(Level.INFO))
 +            log.info("Client TCP connection closed: " + serverAddress());
 +    }
 +
 +    /**
 +     * Closes client only if there are no pending requests in map.
 +     *
 +     * @return {@code True} if client was closed.
 +     */
 +    @Override boolean closeIfIdle(long idleTimeout) {
 +        if (closeReason != null)
 +            return true;
 +
 +        // Timestamp of the last sent or received message.
 +        long lastMsgTime = Math.max(Math.max(lastMsgSndTime, lastMsgRcvTime), createTs);
 +
 +        if (lastMsgTime + idleTimeout < System.currentTimeMillis() && pendingReqs.isEmpty()) {
 +            // In case of new request came between empty check and setting closing flag
 +            // await for finishing all requests.
 +            close(CONN_IDLE, true);
 +
 +            return true;
 +        }
 +
 +        return false;
 +    }
 +
 +    /**
 +     * Makes request to server via tcp protocol and returns a future that will be completed when
 +     * response is received.
 +     *
 +     * @param msg Message to request,
 +     * @param destId Destination node identifier.
 +     * @return Response object.
 +     * @throws GridClientConnectionResetException If request failed.
 +     * @throws GridClientClosedException If client was closed.
 +     */
 +    private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, UUID destId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        return makeRequest(msg, destId, false);
 +    }
 +
 +    /**
 +     * Makes request to server via tcp protocol and returns a future that will be completed when
 +     * response is received.
 +     *
 +     * @param msg Message to request,
 +     * @param destId Destination node identifier.
 +     * @param keepPortables Keep portables flag.
 +     * @return Response object.
 +     * @throws GridClientConnectionResetException If request failed.
 +     * @throws GridClientClosedException If client was closed.
 +     */
 +    private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, UUID destId, boolean keepPortables)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert msg != null;
 +
 +        TcpClientFuture<R> res = new TcpClientFuture<>(false, keepPortables);
 +
 +        msg.destinationId(destId);
 +
 +        return makeRequest(msg, res);
 +    }
 +
 +    /**
 +     * Makes request to server via tcp protocol and returns a future that will be completed when response is received.
 +     *
 +     * @param msg Message to request,
 +     * @param fut Future that will handle response.
 +     * @return Response object.
 +     * @throws GridClientConnectionResetException If request failed.
 +     * @throws GridClientClosedException If client was closed.
 +     */
 +    private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, TcpClientFuture<R> fut)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        return makeRequest(msg, fut, false);
 +    }
 +
 +    /**
 +     * Makes request to server via tcp protocol and returns a future that will be completed when response is received.
 +     *
 +     * @param msg Message to request,
 +     * @param fut Future that will handle response.
 +     * @param routeMode If {@code true} then this method should overwrite session token by the cached one,
 +     *     otherwise keep original value.
 +     * @return Response object.
 +     * @throws GridClientConnectionResetException If request failed.
 +     * @throws GridClientClosedException If client closed.
 +     */
 +    private <R> GridClientFutureAdapter<R> makeRequest(GridClientMessage msg, final TcpClientFuture<R> fut,
 +        boolean routeMode) throws GridClientConnectionResetException, GridClientClosedException {
 +        assert msg != null;
 +
 +        if (msg instanceof GridClientPingPacket) {
 +            long now = U.currentTimeMillis();
 +
 +            if (Math.min(now, lastPingRcvTime) - lastPingSndTime >= pingTimeout)
 +                close(FAILED, false,
 +                    new IOException("Did not receive any packets within ping response interval (connection is " +
 +                        "considered to be half-opened) [lastPingReceiveTime=" + lastPingRcvTime +
 +                        ", lastPingSendTime=" + lastPingSndTime + ", now=" + now + ", timeout=" + pingTimeout +
 +                        ", addr=" + serverAddress() + ']')
 +                );
 +            // Do not pass ping requests if ping interval didn't pass yet
 +            // or we've already waiting for ping response.
 +            else if (now - lastPingSndTime > pingInterval && lastPingRcvTime != Long.MAX_VALUE) {
 +                lastPingRcvTime = Long.MAX_VALUE;
 +
-                 ses.send(new GridClientPingPacketWrapper());
++                ses.send(GridClientPingPacket.PING_MESSAGE);
 +
 +                lastPingSndTime = now;
 +            }
 +        }
 +        else {
 +            long reqId = reqIdCntr.getAndIncrement();
 +
 +            msg.requestId(reqId);
 +
 +            if (!routeMode) {
 +                msg.clientId(clientId);
 +                msg.sessionToken(sesTok);
 +            }
 +
 +            fut.pendingMessage(msg);
 +
 +            checkClosed(closeReason);
 +
 +            GridClientFutureAdapter old = pendingReqs.putIfAbsent(reqId, fut);
 +
 +            assert old == null;
 +
-             GridClientMessageWrapper wrapper;
- 
-             try {
-                 wrapper = messageWrapper(msg);
-             }
-             catch (IOException e) {
-                 log.log(Level.SEVERE, "Failed to marshal message: " + msg, e);
- 
-                 removePending(reqId);
- 
-                 fut.onDone(e);
- 
-                 return fut;
-             }
- 
-             GridNioFuture<?> sndFut = ses.send(wrapper);
++            GridNioFuture<?> sndFut = ses.send(msg);
 +
 +            lastMsgSndTime = U.currentTimeMillis();
 +
 +            if (routeMode) {
 +                sndFut.listenAsync(new CI1<GridNioFuture<?>>() {
 +                    @Override public void apply(GridNioFuture<?> sndFut) {
 +                        try {
 +                            sndFut.get();
 +                        }
 +                        catch (Exception e) {
 +                            close(FAILED, false, e);
 +
 +                            fut.onDone(getCloseReasonAsException(FAILED, e));
 +                        }
 +                    }
 +                });
 +            }
 +            else {
 +                try {
 +                    sndFut.get();
 +                }
 +                catch (Exception e) {
 +                    throw new GridClientConnectionResetException("Failed to send message over connection " +
 +                        "(will try to reconnect): " + serverAddress(), e);
 +                }
 +            }
 +        }
 +
 +        return fut;
 +    }
 +
 +    /**
 +     * Handles ping response.
 +     */
 +    void handlePingResponse() {
 +        lastPingRcvTime = U.currentTimeMillis();
 +    }
 +
 +    /**
 +     * Handles incoming response message. If this connection is closed this method would signal empty event
 +     * if there is no more pending requests.
 +     *
-      * @param req Incoming response data.
++     * @param res Incoming response data.
 +     */
 +    @SuppressWarnings({"unchecked", "TooBroadScope"})
-     void handleResponse(GridClientMessageWrapper req) {
++    void handleResponse(GridClientMessage res) throws IOException {
 +        lastMsgRcvTime = U.currentTimeMillis();
 +
-         TcpClientFuture fut = pendingReqs.get(req.requestId());
++        TcpClientFuture fut = pendingReqs.get(res.requestId());
 +
 +        if (fut == null) {
 +            log.warning("Response for an unknown request is received, ignoring. " +
-                 "[req=" + req + ", ses=" + ses + ']');
++                "[res=" + res + ", ses=" + ses + ']');
 +
 +            return;
 +        }
 +
 +        if (fut.forward()) {
-             GridRouterResponse msg = new GridRouterResponse(
-                 req.messageArray(),
-                 req.requestId(),
-                 clientId,
-                 req.destinationId());
- 
-             removePending(msg.requestId());
++            removePending(res.requestId());
 +
-             fut.onDone(msg);
++            fut.onDone(res);
 +        }
 +        else {
-             GridClientMessage msg;
- 
-             if (keepPortablesMode != null)
-                 keepPortablesMode.set(fut.keepPortables());
- 
-             try {
-                 msg = marsh.unmarshal(req.messageArray());
-             }
-             catch (IOException e) {
-                 fut.onDone(new GridClientException("Failed to unmarshal message.", e));
++            GridClientMessage res0 = res;
 +
-                 return;
-             }
++            if (res instanceof GridRouterResponse) {
++                res0 = marsh.unmarshal(((GridRouterResponse)res).body());
 +
-             finally {
-                 if (keepPortablesMode != null)
-                     keepPortablesMode.set(true);
++                res0.requestId(res.requestId());
++                res0.clientId(res.clientId());
++                res0.destinationId(res.destinationId());
 +            }
-             msg.requestId(req.requestId());
-             msg.clientId(req.clientId());
-             msg.destinationId(req.destinationId());
 +
-             if (msg instanceof GridClientResponse)
-                 handleClientResponse(fut, (GridClientResponse)msg);
++            if (res0 instanceof GridClientResponse)
++                handleClientResponse(fut, (GridClientResponse)res0);
 +            else
-                 log.warning("Unsupported response type received: " + msg);
++                log.warning("Unsupported response type received: " + res0);
 +        }
 +    }
 +
 +    /**
 +     * Handler responses addressed to this client.
 +     *
 +     * @param fut Response future.
 +     * @param resp Response.
 +     */
 +    @SuppressWarnings("unchecked")
 +    private void handleClientResponse(TcpClientFuture fut, GridClientResponse resp) {
 +        if (resp.sessionToken() != null)
 +            sesTok = resp.sessionToken();
 +
 +        GridClientMessage src = fut.pendingMessage();
 +
 +        switch (fut.retryState()) {
 +            case TcpClientFuture.STATE_INITIAL: {
 +                if (resp.successStatus() == GridClientResponse.STATUS_AUTH_FAILURE) {
 +                    if (credentials() == null) {
 +                        fut.onDone(new GridClientAuthenticationException("Authentication failed on server " +
 +                            "(client has no credentials) [clientId=" + clientId +
 +                            ", srvAddr=" + serverAddress() + ", errMsg=" + resp.errorMessage() +']'));
 +
 +                        return;
 +                    }
 +
 +                    fut.retryState(TcpClientFuture.STATE_AUTH_RETRY);
 +
 +                    GridClientAuthenticationRequest req = buildAuthRequest();
 +
 +                    req.requestId(resp.requestId());
 +
-                     GridClientMessageWrapper wrapper;
- 
-                     try {
-                         wrapper = messageWrapper(req);
-                     }
-                     catch (IOException e) {
-                         log.log(Level.SEVERE, "Failed to marshal message: " + req, e);
- 
-                         removePending(resp.requestId());
- 
-                         fut.onDone(e);
- 
-                         return;
-                     }
- 
-                     ses.send(wrapper);
++                    ses.send(req);
 +
 +                    return;
 +                }
 +
 +                break;
 +            }
 +
 +            case TcpClientFuture.STATE_AUTH_RETRY: {
 +                if (resp.successStatus() == GridClientResponse.STATUS_SUCCESS) {
 +                    fut.retryState(TcpClientFuture.STATE_REQUEST_RETRY);
 +
 +                    src.sessionToken(sesTok);
 +
-                     GridClientMessageWrapper wrapper;
- 
-                     try {
-                         wrapper = messageWrapper(src);
-                     }
-                     catch (IOException e) {
-                         log.log(Level.SEVERE, "Failed to marshal message: " + src, e);
- 
-                         removePending(resp.requestId());
- 
-                         fut.onDone(e);
- 
-                         return;
-                     }
- 
-                     ses.send(wrapper);
++                    ses.send(src);
 +
 +                    return;
 +                }
 +
 +                break;
 +            }
 +        }
 +
 +        removePending(resp.requestId());
 +
 +        if (resp.successStatus() == GridClientResponse.STATUS_AUTH_FAILURE)
 +            fut.onDone(new GridClientAuthenticationException("Client authentication failed [clientId=" + clientId +
 +                ", srvAddr=" + serverAddress() + ", errMsg=" + resp.errorMessage() +']'));
 +        else if (resp.errorMessage() != null)
 +            fut.onDone(new GridClientException(resp.errorMessage()));
 +        else
 +            fut.onDone(resp.result());
 +    }
 +
 +    /**
-      * @param msg Client message.
-      * @return Message wrapper for direct marshalling.
-      * @throws IOException If failed to marshal message.
-      */
-     private GridClientMessageWrapper messageWrapper(GridClientMessage msg) throws IOException {
-         GridClientMessageWrapper wrapper = new GridClientMessageWrapper();
- 
-         wrapper.requestId(msg.requestId());
-         wrapper.clientId(clientId);
-         wrapper.destinationId(msg.destinationId());
- 
-         ByteBuffer data = (msg instanceof GridRouterRequest) ? ByteBuffer.wrap(((GridRouterRequest)msg).body()) :
-             marsh.marshal(msg, 0);
- 
-         wrapper.message(data);
-         wrapper.messageSize(data.remaining() + 40);
- 
-         return wrapper;
-     }
- 
-     /**
 +     * Removes pending request and signals to {@link #closedLatch} if necessary.
 +     *
 +     * @param reqId Request Id.
 +     */
 +    private void removePending(long reqId) {
 +        pendingReqs.remove(reqId);
 +
 +        if (pendingReqs.isEmpty() && closeReason != null)
 +            closedLatch.countDown();
 +    }
 +
 +    /**
 +     * Builds authentication request message with credentials taken from credentials object.
 +     *
 +     * @return AuthenticationRequest message.
 +     */
 +    private GridClientAuthenticationRequest buildAuthRequest() {
 +        GridClientAuthenticationRequest req = new GridClientAuthenticationRequest();
 +
 +        req.clientId(clientId);
 +
 +        req.credentials(credentials());
 +
 +        return req;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Boolean> cachePutAll(String cacheName, Map<K, V> entries,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert entries != null;
 +
 +        GridClientCacheRequest req = new GridClientCacheRequest(PUT_ALL);
 +
 +        req.cacheName(cacheName);
 +        req.values((Map<Object, Object>)entries);
 +        req.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(req, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Map<K, V>> cacheGetAll(String cacheName, Collection<K> keys,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert keys != null;
 +
 +        GridClientCacheRequest req = new GridClientCacheRequest(GET_ALL);
 +
 +        req.cacheName(cacheName);
 +        req.keys((Iterable<Object>)keys);
 +        req.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(req, destNodeId, flags.contains(KEEP_PORTABLES));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K> GridClientFutureAdapter<Boolean> cacheRemove(String cacheName, K key,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        GridClientCacheRequest req = new GridClientCacheRequest(RMV);
 +
 +        req.cacheName(cacheName);
 +        req.key(key);
 +        req.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(req, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K> GridClientFutureAdapter<Boolean> cacheRemoveAll(String cacheName, Collection<K> keys,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert keys != null;
 +
 +        GridClientCacheRequest req = new GridClientCacheRequest(RMV_ALL);
 +
 +        req.cacheName(cacheName);
 +        req.keys((Iterable<Object>)keys);
 +        req.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(req, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Boolean> cacheReplace(String cacheName, K key, V val,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert key != null;
 +        assert val != null;
 +
 +        GridClientCacheRequest replace = new GridClientCacheRequest(REPLACE);
 +
 +        replace.cacheName(cacheName);
 +        replace.key(key);
 +        replace.value(val);
 +        replace.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(replace, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Boolean> cacheCompareAndSet(String cacheName, K key, V newVal,
 +        V oldVal, Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert key != null;
 +
 +        GridClientCacheRequest msg = new GridClientCacheRequest(CAS);
 +
 +        msg.cacheName(cacheName);
 +        msg.key(key);
 +        msg.value(newVal);
 +        msg.value2(oldVal);
 +        msg.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(msg, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public <K> GridClientFutureAdapter<GridClientDataMetrics> cacheMetrics(String cacheName, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        GridClientCacheRequest metrics = new GridClientCacheRequest(METRICS);
 +
 +        metrics.cacheName(cacheName);
 +        metrics.destinationId(destNodeId);
 +
 +        TcpClientFuture fut = new TcpClientFuture() {
 +            @Override public void onDone(Object res) {
 +                super.onDone(metricsMapToMetrics((Map<String, Number>)res));
 +            }
 +        };
 +
 +        return makeRequest(metrics, fut);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Boolean> cacheAppend(String cacheName, K key, V val,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert key != null;
 +        assert val != null;
 +
 +        GridClientCacheRequest append = new GridClientCacheRequest(APPEND);
 +
 +        append.cacheName(cacheName);
 +        append.key(key);
 +        append.value(val);
 +        append.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(append, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <K, V> GridClientFutureAdapter<Boolean> cachePrepend(String cacheName, K key, V val,
 +        Set<GridClientCacheFlag> flags, UUID destNodeId)
 +        throws GridClientConnectionResetException, GridClientClosedException {
 +        assert key != null;
 +        assert val != null;
 +
 +        GridClientCacheRequest prepend = new GridClientCacheRequest(PREPEND);
 +
 +        prepend.cacheName(cacheName);
 +        prepend.key(key);
 +        prepend.value(val);
 +        prepend.cacheFlagsOn(encodeCacheFlags(flags));
 +
 +        return makeRequest(prepend, destNodeId);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <R> GridClientFutureAdapter<R> execute(String taskName, Object arg, UUID destNodeId,
 +        final boolean keepPortables) throws GridClientConnectionResetException, GridClientClosedException {
 +        GridClientTaskRequest msg = new GridClientTaskRequest();
 +
 +        msg.taskName(taskName);
 +        msg.argument(arg);
 +        msg.keepPortables(keepPortables);
 +
 +        return this.<GridClientTaskResultBean>makeRequest(msg, destNodeId).chain(
 +            new GridClientFutureCallback<GridClientTaskResultBean, R>() {
 +                @Override public R onComplete(GridClientFuture<GridClientTaskResultBean> fut)
 +                    throws GridClientException {
 +                    return fut.get().getResult();
 +                }
 +            });
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public GridClientFuture<GridClientNode> node(final UUID id, boolean inclAttrs, boolean inclMetrics,
 +        UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException {
 +        assert id != null;
 +
 +        TcpClientFuture fut = refreshNodeReqs.get(id);
 +
 +        // Return request that is in progress.
 +        if (fut != null)
 +            return fut;
 +
 +        GridClientTopologyRequest msg = new GridClientTopologyRequest();
 +
 +        fut = new TcpClientFuture() {
 +            @Override public void onDone(Object res) {
 +                //Clean up the node id requests map.
 +                refreshNodeReqs.remove(id);
 +
 +                GridClientNodeImpl node = nodeBeanToNode((GridClientNodeBean)res);
 +
 +                if (node != null)
 +                    top.updateNode(node);
 +
 +                super.onDone(node);
 +            }
 +        };
 +
 +        GridClientFutureAdapter old = refreshNodeReqs.putIfAbsent(id, fut);
 +
 +        // If concurrent thread put request, do not send the message.
 +        if (old != null)
 +            return old;
 +
 +        msg.nodeId(id);
 +        msg.includeAttributes(inclAttrs);
 +        msg.includeMetrics(inclMetrics);
 +        msg.destinationId(destNodeId);
 +
 +        return makeRequest(msg, fut);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public GridClientFuture<GridClientNode> node(String ipAddr, boolean inclAttrs, boolean includeMetrics,
 +        UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException {
 +        GridClientTopologyRequest msg = new GridClientTopologyRequest();
 +
 +        TcpClientFuture fut = new TcpClientFuture() {
 +            @Override public void onDone(Object res) {
 +                GridClientNodeImpl node = nodeBeanToNode((GridClientNodeBean)res);
 +
 +                if (node != null)
 +                    super.onDone(top.updateNode(node));
 +                else
 +                    super.onDone(node);
 +            }
 +        };
 +
 +        msg.nodeIp(ipAddr);
 +        msg.includeAttributes(inclAttrs);
 +        msg.includeMetrics(includeMetrics);
 +        msg.destinationId(destNodeId);
 +
 +        return makeRequest(msg, fut);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public GridClientFuture<List<GridClientNode>> topology(boolean inclAttrs, boolean inclMetrics,
 +        UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException {
 +        GridClientTopologyRequest msg = new GridClientTopologyRequest();
 +
 +        TcpClientFuture fut = new TcpClientFuture() {
 +            @Override public void onDone(Object res) {
 +                Collection<GridClientNodeBean> beans = (Collection<GridClientNodeBean>)res;
 +
 +                Collection<GridClientNodeImpl> nodes = new ArrayList<>(beans.size());
 +
 +                for (GridClientNodeBean bean : beans)
 +                    nodes.add(nodeBeanToNode(bean));
 +
 +                super.onDone(top.updateTopology(nodes));
 +            }
 +        };
 +
 +        msg.includeAttributes(inclAttrs);
 +        msg.includeMetrics(inclMetrics);
 +        msg.destinationId(destNodeId);
 +
 +        return makeRequest(msg, fut);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientFutureAdapter<GridRouterRequest> forwardMessage(Object msg)
 +        throws GridClientException {
 +        assert msg instanceof GridRouterRequest;
 +
 +        TcpClientFuture<GridRouterRequest> res = new TcpClientFuture<>(true, false);
 +
 +        makeRequest((GridClientMessage)msg, res, true);
 +
 +        return res;
 +    }
 +
 +    /**
 +     * Creates client node instance from message.
 +     *
 +     * @param nodeBean Node bean message.
 +     * @return Created node.
 +     */
 +    @Nullable private GridClientNodeImpl nodeBeanToNode(@Nullable GridClientNodeBean nodeBean) {
 +        if (nodeBean == null)
 +            return null;
 +
 +        GridClientNodeImpl.Builder nodeBuilder = GridClientNodeImpl.builder()
 +            .nodeId(nodeBean.getNodeId())
 +            .consistentId(nodeBean.getConsistentId())
 +            .tcpAddresses(nodeBean.getTcpAddresses())
 +            .tcpPort(nodeBean.getTcpPort())
 +            .replicaCount(nodeBean.getReplicaCount());
 +
 +        Map<String, GridClientCacheMode> caches = new HashMap<>();
 +
 +        if (nodeBean.getCaches() != null) {
 +            for (Map.Entry<String, String> e : nodeBean.getCaches().entrySet()) {
 +                try {
 +                    caches.put(e.getKey(), GridClientCacheMode.valueOf(e.getValue()));
 +                }
 +                catch (IllegalArgumentException ignored) {
 +                    log.warning("Invalid cache mode received from remote node (will ignore) [srv=" + serverAddress() +
 +                        ", cacheName=" + e.getKey() + ", cacheMode=" + e.getValue() + ']');
 +                }
 +            }
 +        }
 +
 +        if (nodeBean.getDefaultCacheMode() != null) {
 +            try {
 +                caches.put(null, GridClientCacheMode.valueOf(nodeBean.getDefaultCacheMode()));
 +            }
 +            catch (IllegalArgumentException ignored) {
 +                log.warning("Invalid cache mode received for default cache from remote node (will ignore) [srv="
 +                    + serverAddress() + ", cacheMode=" + nodeBean.getDefaultCacheMode() + ']');
 +            }
 +        }
 +
 +        if (!caches.isEmpty())
 +            nodeBuilder.caches(caches);
 +
 +        if (nodeBean.getAttributes() != null)
 +            nodeBuilder.attributes(nodeBean.getAttributes());
 +
 +        GridClientNodeMetricsBean metricsBean = nodeBean.getMetrics();
 +
 +        if (metricsBean != null) {
 +            GridClientNodeMetricsAdapter metrics = new GridClientNodeMetricsAdapter();
 +
 +            metrics.setStartTime(metricsBean.getStartTime());
 +            metrics.setAverageActiveJobs(metricsBean.getAverageActiveJobs());
 +            metrics.setAverageCancelledJobs(metricsBean.getAverageCancelledJobs());
 +            metrics.setAverageCpuLoad(metricsBean.getAverageCpuLoad());
 +            metrics.setAverageJobExecuteTime(metricsBean.getAverageJobExecuteTime());
 +            metrics.setAverageJobWaitTime(metricsBean.getAverageJobWaitTime());
 +            metrics.setAverageRejectedJobs(metricsBean.getAverageRejectedJobs());
 +            metrics.setAverageWaitingJobs(metricsBean.getAverageWaitingJobs());
 +            metrics.setCurrentActiveJobs(metricsBean.getCurrentActiveJobs());
 +            metrics.setCurrentCancelledJobs(metricsBean.getCurrentCancelledJobs());
 +            metrics.setCurrentCpuLoad(metricsBean.getCurrentCpuLoad());
 +            metrics.setCurrentGcCpuLoad(metricsBean.getCurrentGcCpuLoad());
 +            metrics.setCurrentDaemonThreadCount(metricsBean.getCurrentDaemonThreadCount());
 +            metrics.setCurrentIdleTime(metricsBean.getCurrentIdleTime());
 +            metrics.setCurrentJobExecuteTime(metricsBean.getCurrentJobExecuteTime());
 +            metrics.setCurrentJobWaitTime(metricsBean.getCurrentJobWaitTime());
 +            metrics.setCurrentRejectedJobs(metricsBean.getCurrentRejectedJobs());
 +            metrics.setCurrentThreadCount(metricsBean.getCurrentThreadCount());
 +            metrics.setCurrentWaitingJobs(metricsBean.getCurrentWaitingJobs());
 +            metrics.setFileSystemFreeSpace(metricsBean.getFileSystemFreeSpace());
 +            metrics.setFileSystemTotalSpace(metricsBean.getFileSystemTotalSpace());
 +            metrics.setFileSystemUsableSpace(metricsBean.getFileSystemUsableSpace());
 +            metrics.setHeapMemoryCommitted(metricsBean.getHeapMemoryCommitted());
 +            metrics.setHeapMemoryInitialized(metricsBean.getHeapMemoryInitialized());
 +            metrics.setHeapMemoryMaximum(metricsBean.getHeapMemoryMaximum());
 +            metrics.setHeapMemoryUsed(metricsBean.getHeapMemoryUsed());
 +            metrics.setLastDataVersion(metricsBean.getLastDataVersion());
 +            metrics.setLastUpdateTime(metricsBean.getLastUpdateTime());
 +            metrics.setMaximumActiveJobs(metricsBean.getMaximumActiveJobs());
 +            metrics.setMaximumCancelledJobs(metricsBean.getMaximumCancelledJobs());
 +            metrics.setMaximumJobExecuteTime(metricsBean.getMaximumJobExecuteTime());
 +            metrics.setMaximumJobWaitTime(metricsBean.getMaximumJobWaitTime());
 +            metrics.setMaximumRejectedJobs(metricsBean.getMaximumRejectedJobs());
 +            metrics.setMaximumThreadCount(metricsBean.getMaximumThreadCount());
 +            metrics.setMaximumWaitingJobs(metricsBean.getMaximumWaitingJobs());
 +            metrics.setNodeStartTime(metricsBean.getNodeStartTime());
 +            metrics.setNonHeapMemoryCommitted(metricsBean.getNonHeapMemoryCommitted());
 +            metrics.setNonHeapMemoryInitialized(metricsBean.getNonHeapMemoryInitialized());
 +            metrics.setNonHeapMemoryMaximum(metricsBean.getNonHeapMemoryMaximum());
 +            metrics.setNonHeapMemoryUsed(metricsBean.getNonHeapMemoryUsed());
 +            metrics.setStartTime(metricsBean.getStartTime());
 +            metrics.setTotalCancelledJobs(metricsBean.getTotalCancelledJobs());
 +            metrics.setTotalCpus(metricsBean.getTotalCpus());
 +            metrics.setTotalExecutedJobs(metricsBean.getTotalExecutedJobs());
 +            metrics.setTotalIdleTime(metricsBean.getTotalIdleTime());
 +            metrics.setTotalRejectedJobs(metricsBean.getTotalRejectedJobs());
 +            metrics.setTotalStartedThreadCount(metricsBean.getTotalStartedThreadCount());
 +            metrics.setTotalExecutedTasks(metricsBean.getTotalExecutedTasks());
 +            metrics.setSentMessagesCount(metricsBean.getSentMessagesCount());
 +            metrics.setSentBytesCount(metricsBean.getSentBytesCount());
 +            metrics.setReceivedMessagesCount(metricsBean.getReceivedMessagesCount());
 +            metrics.setReceivedBytesCount(metricsBean.getReceivedBytesCount());
 +            metrics.setUpTime(metricsBean.getUpTime());
 +
 +            nodeBuilder.metrics(metrics);
 +        }
 +
 +        return nodeBuilder.build();
 +    }
 +
 +    /**
 +     * Future extension that holds client tcp message and auth retry flag.
 +     */
 +    private static class TcpClientFuture<R> extends GridClientFutureAdapter<R> {
 +        /** */
 +        private static final long serialVersionUID = 0L;
 +
 +        /** Initial request. */
 +        private static final int STATE_INITIAL = 0;
 +
 +        /** Authentication retry. */
 +        private static final int STATE_AUTH_RETRY = 1;
 +
 +        /** Request retry after auth retry. */
 +        private static final int STATE_REQUEST_RETRY = 2;
 +
 +        /** Flag indicating if connected message is a forwarded. */
 +        private final boolean forward;
 +
 +        /** Keep portables flag. */
 +        private final boolean keepPortables;
 +
 +        /** Pending message for this future. */
 +        private GridClientMessage pendingMsg;
 +
 +        /** Flag indicating whether authentication retry was attempted for this request. */
 +        @SuppressWarnings("RedundantFieldInitialization")
 +        private int authRetry = STATE_INITIAL;
 +
 +        /**
 +         * Creates new future with {@code forward} flag set to {@code false}.
 +         */
 +        private TcpClientFuture() {
 +            forward = false;
 +            keepPortables = false;
 +        }
 +
 +        /**
 +         * Creates new future with the given {@code forward} flag value.
 +         *
 +         * @param forward Flag value.
 +         */
 +        private TcpClientFuture(boolean forward, boolean keepPortables) {
 +            this.forward = forward;
 +            this.keepPortables = keepPortables;
 +        }
 +
 +        /**
 +         * @return Originating request message.
 +         */
 +        public GridClientMessage pendingMessage() {
 +            return pendingMsg;
 +        }
 +
 +        /**
 +         * @param pendingMsg Originating request message.
 +         */
 +        public void pendingMessage(GridClientMessage pendingMsg) {
 +            this.pendingMsg = pendingMsg;
 +        }
 +
 +        /**
 +         * @return Whether or not authentication retry attempted.
 +         */
 +        public int retryState() {
 +            return authRetry;
 +        }
 +
 +        /**
 +         * @param authRetry Whether or not authentication retry attempted.
 +         */
 +        public void retryState(int authRetry) {
 +            this.authRetry = authRetry;
 +        }
 +
 +        /**
 +         * @return {@code true} if this future created for forwarded message, {@code false} otherwise.
 +         */
 +        public boolean forward() {
 +            return forward;
 +        }
 +
 +        /**
 +         * @return Keep portables flag.
 +         */
 +        public boolean keepPortables() {
 +            return keepPortables;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public String toString() {
 +            return "TcpClientFuture [state=" + authRetry + ", forward=" + forward + ", message=" + pendingMsg + "]";
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java
index 7df1121,0000000..a15cb65
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientTopology.java
@@@ -1,450 -1,0 +1,450 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.impl.connection;
 +
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.impl.*;
 +import org.apache.ignite.internal.client.util.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +
 +import java.net.*;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.locks.*;
 +import java.util.logging.*;
 +
- import static org.apache.ignite.internal.GridNodeAttributes.*;
++import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 +
 +/**
 + * Client topology cache.
 + */
 +public class GridClientTopology {
 +    /** Logger. */
 +    private static final Logger log = Logger.getLogger(GridClientTopology.class.getName());
 +
 +    /** Topology cache */
 +    private Map<UUID, GridClientNodeImpl> nodes = Collections.emptyMap();
 +
 +    /** Cached last error prevented topology from update. */
 +    private GridClientException lastError;
 +
 +    /**
 +     * Set of router addresses to infer direct connectivity
 +     * when client is working in router connection mode.
 +     * {@code null} when client is working in direct connection node.
 +     */
 +    private final Set<String> routerAddrs;
 +
 +    /** Protocol. */
 +    private final GridClientProtocol prot;
 +
 +    /** Flag indicating whether metrics should be cached. */
 +    private final boolean metricsCache;
 +
 +    /** Flag indicating whether metrics should be cached. */
 +    private final boolean attrCache;
 +
 +    /** Lock for topology changing. */
 +    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 +
 +    /** Topology listeners. */
 +    private final Collection<GridClientTopologyListener> topLsnrs = new ConcurrentLinkedQueue<>();
 +
 +    /** Executor for listener notification. */
 +    private final ExecutorService exec =
 +        Executors.newSingleThreadExecutor(new GridClientThreadFactory("top-lsnr", true));
 +
 +    /**
 +     * Creates topology instance.
 +     *
 +     * @param cfg Client configuration.
 +     */
 +    public GridClientTopology(GridClientConfiguration cfg) {
 +        metricsCache = cfg.isEnableMetricsCache();
 +        attrCache = cfg.isEnableAttributesCache();
 +        prot = cfg.getProtocol();
 +        routerAddrs = (!cfg.getRouters().isEmpty() && cfg.getServers().isEmpty()) ?
 +            new HashSet<>(cfg.getRouters()) : null;
 +    }
 +
 +    /**
 +     * Adds topology listener.
 +     *
 +     * @param lsnr Topology listener.
 +     */
 +    public void addTopologyListener(GridClientTopologyListener lsnr) {
 +        topLsnrs.add(lsnr);
 +    }
 +
 +    /**
 +     * Removes topology listener.
 +     *
 +     * @param lsnr Topology listener.
 +     */
 +    public void removeTopologyListener(GridClientTopologyListener lsnr) {
 +        topLsnrs.remove(lsnr);
 +    }
 +
 +    /**
 +     * Returns all added topology listeners.
 +     *
 +     * @return Unmodifiable view of topology listeners.
 +     */
 +    public Collection<GridClientTopologyListener> topologyListeners() {
 +        return Collections.unmodifiableCollection(topLsnrs);
 +    }
 +
 +    /**
 +     * Updates topology if cache enabled. If cache is disabled, returns original node.
 +     *
 +     * @param node Converted rest server response.
 +     * @return Node in topology.
 +     */
 +    public GridClientNode updateNode(GridClientNodeImpl node) {
 +        lock.writeLock().lock();
 +
 +        try {
 +            boolean newNode = !nodes.containsKey(node.nodeId());
 +
 +            GridClientNodeImpl preparedNode = prepareNode(node);
 +
 +            // We update the whole topology if node was not in topology or we cache metrics.
 +            if (newNode || metricsCache || attrCache) {
 +                Map<UUID, GridClientNodeImpl> updatedTop = new HashMap<>(nodes);
 +
 +                updatedTop.put(node.nodeId(), preparedNode);
 +
 +                // Change the reference to new topology.
 +                // So everyone who captured old version will see a consistent snapshot.
 +                nodes = updatedTop;
 +                lastError = null;
 +            }
 +
 +            if (newNode)
 +                notifyEvents(Collections.singletonList(new TopologyEvent(true, preparedNode)));
 +
 +            return preparedNode;
 +        }
 +        finally {
 +            lock.writeLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Updates (if cache is enabled) the whole topology. If cache is disabled, original collection is returned.
 +     *
 +     * @param nodeList Converted rest server response.
 +     * @return Topology nodes.
 +     */
 +    public Collection<? extends GridClientNode> updateTopology(Collection<GridClientNodeImpl> nodeList) {
 +        Collection<TopologyEvent> evts = new LinkedList<>();
 +
 +        lock.writeLock().lock();
 +
 +        try {
 +            Map<UUID, GridClientNodeImpl> updated = new HashMap<>();
 +
 +            Collection<GridClientNodeImpl> preparedNodes = F.transform(nodeList,
 +                new C1<GridClientNodeImpl, GridClientNodeImpl>() {
 +                    @Override public GridClientNodeImpl apply(GridClientNodeImpl e) {
 +                        return prepareNode(e);
 +                    }
 +                });
 +
 +            for (GridClientNodeImpl node : preparedNodes) {
 +                updated.put(node.nodeId(), node);
 +
 +                // Generate add events.
 +                if (!nodes.containsKey(node.nodeId()))
 +                    evts.add(new TopologyEvent(true, node));
 +            }
 +
 +            for (Map.Entry<UUID, GridClientNodeImpl> e : nodes.entrySet()) {
 +                if (!updated.containsKey(e.getKey()))
 +                    evts.add(new TopologyEvent(false, e.getValue()));
 +            }
 +
 +            nodes = updated;
 +            lastError = null;
 +
 +            if (!evts.isEmpty())
 +                notifyEvents(evts);
 +
 +            return preparedNodes;
 +        }
 +        finally {
 +            lock.writeLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Marks topology as failed. After this method called all accessors will throw exception
 +     * until a next successful update.
 +     *
 +     * @param cause Exception caused the failure.
 +     */
 +    public void fail(GridClientException cause) {
 +        lock.writeLock().lock();
 +
 +        try {
 +            lastError = cause;
 +
 +            for (GridClientNode n : nodes.values())
 +                notifyEvents(Collections.singletonList(new TopologyEvent(false, n)));
 +
 +            nodes = Collections.emptyMap();
 +        }
 +        finally {
 +            lock.writeLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Updates topology when node that is expected to be in topology fails.
 +     *
 +     * @param nodeId Node id for which node failed to be obtained.
 +     */
 +    public void nodeFailed(UUID nodeId) {
 +        lock.writeLock().lock();
 +
 +        try {
 +            boolean nodeDeleted = nodes.containsKey(nodeId);
 +
 +            GridClientNode deleted = null;
 +
 +            // We update the whole topology if node was not in topology or we cache metrics.
 +            if (nodeDeleted) {
 +                Map<UUID, GridClientNodeImpl> updatedTop = new HashMap<>(nodes);
 +
 +                deleted = updatedTop.remove(nodeId);
 +
 +                // Change the reference to new topology.
 +                // So everyone who captured old version will see a consistent snapshot.
 +                nodes = updatedTop;
 +            }
 +
 +            if (nodeDeleted)
 +                notifyEvents(Collections.singletonList(new TopologyEvent(false, deleted)));
 +        }
 +        finally {
 +            lock.writeLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Gets node from last saved topology snapshot by it's id.
 +     *
 +     * @param id Node id.
 +     * @return Node or {@code null} if node was not found.
 +     * @throws GridClientException If topology is failed and no nodes available.
 +     */
 +    public GridClientNode node(UUID id) throws GridClientException {
 +        assert id != null;
 +
 +        lock.readLock().lock();
 +
 +        try {
 +            if (lastError != null)
 +                throw new GridClientDisconnectedException(
 +                    "Topology is failed [protocol=" + prot + ", routers=" + routerAddrs + ']', lastError);
 +            else
 +                return nodes.get(id);
 +        }
 +        finally {
 +            lock.readLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Gets a collection of nodes from last saved topology snapshot by their ids.
 +     *
 +     * @param ids Collection of ids for which nodes should be retrieved..
 +     * @return Collection of nodes that are in topology.
 +     * @throws GridClientException If topology is failed and no nodes available.
 +     */
 +    public Collection<GridClientNode> nodes(Iterable<UUID> ids) throws GridClientException {
 +        assert ids != null;
 +
 +        Collection<GridClientNode> res = new LinkedList<>();
 +
 +        lock.readLock().lock();
 +
 +        try {
 +            if (lastError != null)
 +                throw new GridClientDisconnectedException(
 +                    "Latest topology update failed.", lastError);
 +
 +            for (UUID id : ids) {
 +                GridClientNodeImpl node = nodes.get(id);
 +
 +                if (node != null)
 +                    res.add(node);
 +            }
 +
 +            return res;
 +        }
 +        finally {
 +            lock.readLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Gets full topology snapshot.
 +     *
 +     * @return Collection of nodes that were in last captured topology snapshot.
 +     * @throws GridClientException If topology is failed and no nodes available.
 +     */
 +    public Collection<GridClientNodeImpl> nodes() throws GridClientException {
 +        lock.readLock().lock();
 +
 +        try {
 +            if (lastError != null)
 +                throw new GridClientDisconnectedException(
 +                    "Latest topology update failed.", lastError);
 +
 +            return Collections.unmodifiableCollection(nodes.values());
 +        }
 +        finally {
 +            lock.readLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * @return Whether topology is failed.
 +     */
 +    public boolean failed() {
 +        lock.readLock().lock();
 +
 +        try {
 +            return lastError != null;
 +        }
 +        finally {
 +            lock.readLock().unlock();
 +        }
 +    }
 +
 +    /**
 +     * Shutdowns executor service that performs listener notification.
 +     */
 +    public void shutdown() {
 +        GridClientUtils.shutdownNow(GridClientTopology.class, exec, log);
 +    }
 +
 +    /**
 +     * Updates node properties according to current topology settings.
 +     * Particularly attributes and metrics caching policies.
 +     *
 +     * @param node Node to be processed.
 +     * @return The same node if cache is enabled or node contains no attributes and metrics,
 +     *      otherwise will return new node without attributes and metrics.
 +     */
 +    private GridClientNodeImpl prepareNode(final GridClientNodeImpl node) {
 +        final boolean noAttrsAndMetrics =
 +            (metricsCache && attrCache) || (node.attributes().isEmpty() && node.metrics() == null);
 +
 +        // Try to bypass object copying.
 +        if (noAttrsAndMetrics && routerAddrs == null && node.connectable())
 +            return node;
 +
 +        // Return a new node instance based on the original one.
 +        GridClientNodeImpl.Builder nodeBuilder = GridClientNodeImpl.builder(node, !attrCache, !metricsCache);
 +
 +        for (InetSocketAddress addr : node.availableAddresses(prot, true)) {
 +            boolean router = routerAddrs == null ||
 +                routerAddrs.contains(addr.getHostName() + ":" + addr.getPort()) ||
 +                routerAddrs.contains(addr.getAddress().getHostAddress() + ":" + addr.getPort());
 +
 +            boolean reachable = noAttrsAndMetrics || !addr.getAddress().isLoopbackAddress() ||
 +                F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
 +
 +            if (router && reachable) {
 +                nodeBuilder.connectable(true);
 +
 +                break;
 +            }
 +        }
 +
 +        return nodeBuilder.build();
 +    }
 +
 +    /**
 +     * Runs listener notification is separate thread.
 +     *
 +     * @param evts Event list.
 +     */
 +    private void notifyEvents(final Iterable<TopologyEvent> evts) {
 +        try {
 +            exec.execute(new Runnable() {
 +                @Override public void run() {
 +                    for (TopologyEvent evt : evts) {
 +                        if (evt.added()) {
 +                            for (GridClientTopologyListener lsnr : topLsnrs)
 +                                lsnr.onNodeAdded(evt.node());
 +                        }
 +                        else {
 +                            for (GridClientTopologyListener lsnr : topLsnrs)
 +                                lsnr.onNodeRemoved(evt.node());
 +                        }
 +                    }
 +                }
 +            });
 +        }
 +        catch (RejectedExecutionException e) {
 +            log.warning("Unable to notify event listeners on topology change since client is shutting down: " +
 +                e.getMessage());
 +        }
 +    }
 +
 +    /**
 +     * Event for node adding and removal.
 +     */
 +    private static class TopologyEvent {
 +        /** Added or removed flag */
 +        private boolean added;
 +
 +        /** Node that triggered event. */
 +        private GridClientNode node;
 +
 +        /**
 +         * Creates a new event.
 +         *
 +         * @param added If {@code true}, indicates that node was added to topology.
 +         *      If {@code false}, indicates that node was removed.
 +         * @param node Added or removed node.
 +         */
 +        private TopologyEvent(boolean added, GridClientNode node) {
 +            this.added = added;
 +            this.node = node;
 +        }
 +
 +        /**
 +         * @return Flag indicating whether node was added or removed.
 +         */
 +        private boolean added() {
 +            return added;
 +        }
 +
 +        /**
 +         * @return Node that triggered event.
 +         */
 +        private GridClientNode node() {
 +            return node;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
index f9695fa,0000000..c929575
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/marshaller/optimized/GridClientOptimizedMarshaller.java
@@@ -1,103 -1,0 +1,103 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.marshaller.optimized;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.client.marshaller.*;
 +import org.apache.ignite.marshaller.optimized.*;
 +import org.apache.ignite.internal.processors.rest.client.message.*;
 +
 +import java.io.*;
 +import java.nio.*;
 +import java.util.*;
 +
 +/**
-  * Wrapper, that adapts {@link org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller} to
-  * {@link org.apache.ignite.internal.client.marshaller.GridClientMarshaller} interface.
++ * Wrapper, that adapts {@link org.apache.ignite.marshaller.optimized.OptimizedMarshaller} to
++ * {@link GridClientMarshaller} interface.
 + */
 +public class GridClientOptimizedMarshaller implements GridClientMarshaller {
 +    /** ID. */
 +    public static final byte ID = 1;
 +
 +    /** Optimized marshaller. */
-     private final IgniteOptimizedMarshaller opMarsh;
++    private final OptimizedMarshaller opMarsh;
 +
 +    /**
 +     * Default constructor.
 +     */
 +    public GridClientOptimizedMarshaller() {
-         opMarsh = new IgniteOptimizedMarshaller();
++        opMarsh = new OptimizedMarshaller();
 +    }
 +
 +    /**
 +     * Constructs optimized marshaller with specific parameters.
 +     *
 +     * @param requireSer Flag to enforce {@link Serializable} interface or not. If {@code true},
 +     *      then objects will be required to implement {@link Serializable} in order to be
 +     *      marshalled, if {@code false}, then such requirement will be relaxed.
 +     * @param clsNames User preregistered class names.
 +     * @param clsNamesPath Path to a file with user preregistered class names.
 +     * @param poolSize Object streams pool size.
 +     * @throws IOException If an I/O error occurs while writing stream header.
 +     * @throws IgniteException If this marshaller is not supported on the current JVM.
-      * @see org.apache.ignite.marshaller.optimized.IgniteOptimizedMarshaller
++     * @see org.apache.ignite.marshaller.optimized.OptimizedMarshaller
 +     */
 +    public GridClientOptimizedMarshaller(boolean requireSer, List<String> clsNames, String clsNamesPath, int poolSize)
 +        throws IOException {
 +        try {
-             opMarsh = new IgniteOptimizedMarshaller(requireSer, clsNames, clsNamesPath, poolSize);
++            opMarsh = new OptimizedMarshaller(requireSer, clsNames, clsNamesPath, poolSize);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public ByteBuffer marshal(Object obj, int off) throws IOException {
 +        try {
 +            if (!(obj instanceof GridClientMessage))
 +                throw new IOException("Message serialization of given type is not supported: " +
 +                    obj.getClass().getName());
 +
 +            byte[] bytes = opMarsh.marshal(obj);
 +
 +            ByteBuffer buf = ByteBuffer.allocate(off + bytes.length);
 +
 +            buf.position(off);
 +
 +            buf.put(bytes);
 +
 +            buf.flip();
 +
 +            return buf;
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IOException(e);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public <T> T unmarshal(byte[] bytes) throws IOException {
 +        try {
 +            return opMarsh.unmarshal(bytes, null);
 +        }
 +        catch (IgniteCheckedException e) {
 +            throw new IOException(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java
index f5dc44c,0000000..479fd8c
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/GridTcpRouterMBean.java
@@@ -1,88 -1,0 +1,88 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.router;
 +
 +import org.apache.ignite.mxbean.*;
 +
 +import java.util.*;
 +
 +/**
 + * MBean interface for TCP router.
 + */
- @IgniteMXBeanDescription("MBean for TCP router.")
++@MXBeanDescription("MBean for TCP router.")
 +public interface GridTcpRouterMBean {
 +    /**
 +     * Gets host for TCP binary protocol server.
 +     *
 +     * @return TCP host.
 +     */
-     @IgniteMXBeanDescription("Host for TCP binary protocol server.")
++    @MXBeanDescription("Host for TCP binary protocol server.")
 +    public String getHost();
 +
 +    /**
 +     * Gets port for TCP binary protocol server.
 +     *
 +     * @return TCP port.
 +     */
-     @IgniteMXBeanDescription("Port for TCP binary protocol server.")
++    @MXBeanDescription("Port for TCP binary protocol server.")
 +    public int getPort();
 +
 +    /**
 +     * Gets a flag indicating whether or not remote clients will be required to have a valid SSL certificate which
 +     * validity will be verified with trust manager.
 +     *
 +     * @return Whether or not client authentication is required.
 +     */
-     @IgniteMXBeanDescription("Flag indicating whether or not SSL is enabled for incoming connections.")
++    @MXBeanDescription("Flag indicating whether or not SSL is enabled for incoming connections.")
 +    public boolean isSslEnabled();
 +
 +    /**
 +     * Gets a flag indicating whether or not remote clients will be required to have a valid SSL certificate which
 +     * validity will be verified with trust manager.
 +     *
 +     * @return Whether or not client authentication is required.
 +     */
-     @IgniteMXBeanDescription("Flag indicating whether or not remote clients are required to have a valid SSL certificate.")
++    @MXBeanDescription("Flag indicating whether or not remote clients are required to have a valid SSL certificate.")
 +    public boolean isSslClientAuth();
 +
 +    /**
 +     * Gets list of server addresses where router's embedded client should connect.
 +     *
 +     * @return List of server addresses.
 +     */
-     @IgniteMXBeanDescription("Gets list of server addresses where router's embedded client should connect.")
++    @MXBeanDescription("Gets list of server addresses where router's embedded client should connect.")
 +    public Collection<String> getServers();
 +
 +    /**
 +     * Returns number of messages received by this router.
 +     * Note that this parameter has approximate value.
 +     *
 +     * @return Number of messages received by this router.
 +     */
-     @IgniteMXBeanDescription("Number of messages received by this router.")
++    @MXBeanDescription("Number of messages received by this router.")
 +    public long getReceivedCount();
 +
 +    /**
 +     * Returns number of responses returned by this router.
 +     * Note that this parameter has approximate value.
 +     *
 +     * @return Number of responses returned by this router.
 +     */
-     @IgniteMXBeanDescription("Number of responses returned by this router.")
++    @MXBeanDescription("Number of responses returned by this router.")
 +    public long getSendCount();
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
index 08090c9,0000000..31c84a9
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterClientImpl.java
@@@ -1,200 -1,0 +1,200 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.router.impl;
 +
 +import org.apache.ignite.internal.client.*;
 +import org.apache.ignite.internal.client.impl.*;
 +import org.apache.ignite.internal.client.impl.connection.*;
 +import org.apache.ignite.internal.client.router.*;
 +import org.jdk8.backport.*;
 +import org.jetbrains.annotations.*;
 +
 +import java.util.*;
 +import java.util.concurrent.*;
 +
 +import static org.apache.ignite.internal.client.util.GridClientUtils.*;
 +
 +/**
 + * A {@link GridClient} router implementation.
 + */
 +class GridRouterClientImpl implements GridClient {
 +    /** Decorated client implementation. */
 +    private final GridClientImpl clientImpl;
 +
 +    /** Client configuration. */
 +    private final GridClientConfiguration cliCfg;
 +
 +    /** TCP connection managers. */
 +    private final ConcurrentMap<Byte, GridClientConnectionManager> connMgrMap = new ConcurrentHashMap8<>();
 +
 +    /**
 +     * Creates a new TCP client based on the given configuration.
 +     *
 +     * @param id Client identifier.
 +     * @param routerCfg Router configuration.
 +     * @throws GridClientException If client configuration is incorrect.
 +     * @throws GridServerUnreachableException If none of the servers
 +     *     specified in configuration can be reached.
 +     */
 +    GridRouterClientImpl(UUID id, GridTcpRouterConfiguration routerCfg) throws GridClientException {
 +        GridClientConfiguration cliCfg = new GridClientConfiguration();
 +
 +        cliCfg.setServers(routerCfg.getServers());
 +        cliCfg.setSslContextFactory(routerCfg.getSslContextFactory());
 +        cliCfg.setSecurityCredentialsProvider(routerCfg.getSecurityCredentialsProvider());
 +
 +        this.cliCfg = cliCfg;
 +
-         clientImpl = new GridClientImpl(id, cliCfg);
++        clientImpl = new GridClientImpl(id, cliCfg, true);
 +
 +        if (cliCfg.getProtocol() != GridClientProtocol.TCP)
 +            throw new AssertionError("Unknown protocol: " + cliCfg.getProtocol());
 +    }
 +
 +    /**
 +     * Send a raw packet "as is" directly to the given node.
 +     * The exact types of acceptable arguments and return values depends on underlying connections.
 +     *
 +     * @param msg Raw message to send.
 +     * @param destId Id of node to send message to. If {@code null} than node will be chosen
 +     *     from the topology randomly.
 +     * @return Future, representing forwarded message.
 +     * @throws GridServerUnreachableException If destination node can't be reached.
 +     * @throws GridClientClosedException If client is closed.
 +     * @throws GridClientException If any other client-related error occurs.
 +     * @throws InterruptedException If router was interrupted while trying.
 +     *     to establish connection with destination node.
 +     */
 +    GridClientFutureAdapter<?> forwardMessage(Object msg, @Nullable UUID destId, byte marshId)
 +        throws GridClientException, InterruptedException {
 +        GridClientTopology top = clientImpl.topology();
 +
 +        GridClientNode dest = destId != null ?
 +            top.node(destId) : cliCfg.getBalancer().balancedNode(
 +                applyFilter(top.nodes(), new GridClientPredicate<GridClientNodeImpl>() {
 +                    @Override public boolean apply(GridClientNodeImpl e) {
 +                        return restAvailable(e, cliCfg.getProtocol());
 +                    }
 +                }));
 +
 +        if (dest == null)
 +            throw new GridServerUnreachableException("Failed to resolve node for specified destination ID: " + destId);
 +
 +        GridClientConnectionManager connMgr = connectionManager(marshId);
 +
 +        GridClientConnection conn = null;
 +
 +        // No reconnection handling there. Let client to do it if needed.
 +        GridClientException cause;
 +
 +        try {
 +            conn = connMgr.connection(dest);
 +
 +            return conn.forwardMessage(msg);
 +        }
 +        catch (GridClientConnectionResetException e) {
 +            if (destId != null)
 +                connMgr.terminateConnection(conn, top.node(destId), e);
 +            else
 +                connMgr.terminateConnection(conn, null, e);
 +
 +            cause = e;
 +        }
 +        catch (GridClientException e) {
 +            cause = e;
 +        }
 +
 +        GridClientFutureAdapter<Object> fail = new GridClientFutureAdapter<>();
 +
 +        fail.onDone(cause);
 +
 +        return fail;
 +    }
 +
 +    /**
 +     * @param marshId Marshaller ID.
 +     * @return Connection manager.
 +     * @throws GridClientException In case of error.
 +     */
 +    private GridClientConnectionManager connectionManager(byte marshId) throws GridClientException {
 +        GridClientConnectionManager mgr = connMgrMap.get(marshId);
 +
 +        if (mgr == null) {
 +            GridClientConnectionManager old = connMgrMap.putIfAbsent(marshId, mgr =
-                 clientImpl.newConnectionManager(marshId));
++                clientImpl.newConnectionManager(marshId, true));
 +
 +            if (old != null)
 +                mgr = old;
 +        }
 +
 +        return mgr;
 +    }
 +
 +    /**
 +     * Closes client.
 +     * @param wait If {@code true} will wait for all pending requests to be proceeded.
 +     */
 +    public void stop(boolean wait) {
 +        clientImpl.stop(wait);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public UUID id() {
 +        return clientImpl.id();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientData data() throws GridClientException {
 +        return clientImpl.data();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientData data(String cacheName) throws GridClientException {
 +        return clientImpl.data(cacheName);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public GridClientCompute compute() {
 +        return clientImpl.compute();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void addTopologyListener(GridClientTopologyListener lsnr) {
 +        clientImpl.addTopologyListener(lsnr);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void removeTopologyListener(GridClientTopologyListener lsnr) {
 +        clientImpl.removeTopologyListener(lsnr);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<GridClientTopologyListener> topologyListeners() {
 +        return clientImpl.topologyListeners();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean connected() {
 +        return clientImpl.connected();
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void close() {
 +        clientImpl.close();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0a1b1b7a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java
index ccd0cc9,0000000..946896c
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridRouterCommandLineStartup.java
@@@ -1,169 -1,0 +1,169 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.client.router.impl;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.client.router.*;
 +import org.apache.ignite.internal.processors.spring.*;
 +import org.apache.ignite.internal.util.typedef.*;
 +import org.apache.ignite.internal.util.typedef.internal.*;
 +import org.apache.ignite.lang.*;
 +import org.apache.ignite.lifecycle.*;
 +
 +import java.net.*;
 +import java.util.*;
 +import java.util.logging.*;
 +
 +import static org.apache.ignite.internal.GridProductImpl.*;
 +import static org.apache.ignite.internal.IgniteComponentType.*;
 +
 +/**
 + * Loader class for router.
 + */
 +public class GridRouterCommandLineStartup {
 +    /** Logger. */
 +    @SuppressWarnings("FieldCanBeLocal")
 +    private IgniteLogger log;
 +
 +    /** TCP router. */
 +    private LifecycleAware tcpRouter;
 +
 +    /**
 +     * Search given context for required configuration and starts router.
 +     *
 +     * @param beans Beans loaded from spring configuration file.
 +     */
 +    public void start(Map<Class<?>, Object> beans) {
 +        log = (IgniteLogger)beans.get(IgniteLogger.class);
 +
 +        if (log == null) {
 +            U.error(log, "Failed to find logger definition in application context. Stopping the router.");
 +
 +            return;
 +        }
 +
 +        GridTcpRouterConfiguration tcpCfg = (GridTcpRouterConfiguration)beans.get(GridTcpRouterConfiguration.class);
 +
 +        if (tcpCfg == null)
 +            U.warn(log, "TCP router startup skipped (configuration not found).");
 +        else {
 +            tcpRouter = new GridTcpRouterImpl(tcpCfg);
 +
 +            try {
 +                tcpRouter.start();
 +            }
 +            catch (Exception e) {
 +                U.error(log, "Failed to start TCP router on port " + tcpCfg.getPort() + ": " + e.getMessage(), e);
 +
 +                tcpRouter = null;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Stops router.
 +     */
 +    public void stop() {
 +        if (tcpRouter != null) {
 +            try {
 +                tcpRouter.stop();
 +            }
 +            catch (Exception e) {
 +                U.error(log, "Error while stopping the router.", e);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Wrapper method to run router from command-line.
 +     *
 +     * @param args Command-line arguments.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    public static void main(String[] args) throws IgniteCheckedException {
 +        X.println(
 +            "   __________  ________________ ",
 +            "  /  _/ ___/ |/ /  _/_  __/ __/ ",
 +            " _/ // (_ /    // /  / / / _/   ",
 +            "/___/\\___/_/|_/___/ /_/ /___/  ",
 +            " ",
 +            "Ignite Router Command Line Loader",
 +            "ver. " + ACK_VER,
 +            COPYRIGHT,
 +            " "
 +        );
 +
 +        IgniteSpringProcessor spring = SPRING.create(false);
 +
 +        if (args.length < 1) {
 +            X.error("Missing XML configuration path.");
 +
 +            System.exit(1);
 +        }
 +
 +        String cfgPath = args[0];
 +
-         URL cfgUrl = U.resolveGridGainUrl(cfgPath);
++        URL cfgUrl = U.resolveIgniteUrl(cfgPath);
 +
 +        if (cfgUrl == null) {
 +            X.error("Spring XML file not found (is IGNITE_HOME set?): " + cfgPath);
 +
 +            System.exit(1);
 +        }
 +
 +        boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
 +
 +        IgniteBiTuple<Object, Object> t = null;
 +        Collection<Handler> savedHnds = null;
 +
 +        if (isLog4jUsed) {
 +            try {
 +                t = U.addLog4jNoOpLogger();
 +            }
 +            catch (Exception e) {
 +                isLog4jUsed = false;
 +            }
 +        }
 +
 +        if (!isLog4jUsed)
 +            savedHnds = U.addJavaNoOpLogger();
 +
 +        Map<Class<?>, Object> beans;
 +
 +        try {
 +            beans = spring.loadBeans(cfgUrl, IgniteLogger.class, GridTcpRouterConfiguration.class);
 +        }
 +        finally {
 +            if (isLog4jUsed && t != null)
 +                U.removeLog4jNoOpLogger(t);
 +
 +            if (!isLog4jUsed)
 +                U.removeJavaNoOpLogger(savedHnds);
 +        }
 +
 +        final GridRouterCommandLineStartup routerStartup = new GridRouterCommandLineStartup();
 +
 +        routerStartup.start(beans);
 +
 +        Runtime.getRuntime().addShutdownHook(new Thread() {
 +            @Override public void run() {
 +                routerStartup.stop();
 +            }
 +        });
 +    }
 +}


Mime
View raw message