ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [07/43] ignite git commit: IGNITE-3060 Discovery: optimize resource usage for client connections
Date Thu, 19 May 2016 09:37:40 GMT
IGNITE-3060 Discovery: optimize resource usage for client connections


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b0edfbd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b0edfbd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b0edfbd

Branch: refs/heads/ignite-3163
Commit: 7b0edfbdf1790dbb7a4f935d486bc0af5e5e2a27
Parents: 84b2fdf
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Apr 27 15:52:18 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Apr 27 15:52:18 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteComponentType.java    |   4 +-
 .../continuous/CacheContinuousQueryHandler.java |   9 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 168 ++++++++++++-------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   5 +-
 .../TcpDiscoveryClientHeartbeatMessage.java     |   1 +
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  44 ++++-
 6 files changed, 160 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
index 01872b6..76e495f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java
@@ -285,7 +285,7 @@ public enum IgniteComponentType {
                 return (T)ctor.newInstance(ctx);
             }
         }
-        catch (Exception e) {
+        catch (Throwable e) {
             throw componentException(e);
         }
     }
@@ -309,7 +309,7 @@ public enum IgniteComponentType {
      * @param err Creation error.
      * @return Component creation exception.
      */
-    private IgniteCheckedException componentException(Exception err) {
+    private IgniteCheckedException componentException(Throwable err) {
         return new IgniteCheckedException("Failed to create Ignite component (consider adding
" + module +
             " module to classpath) [component=" + this + ", cls=" + clsName + ']', err);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index a46a526..d0a3722 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -638,6 +638,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         Collection<CacheContinuousQueryEntry> entries) {
         final GridCacheContext cctx = cacheContext(ctx);
 
+        if (cctx == null) {
+            IgniteLogger log = ctx.log(CacheContinuousQueryHandler.class);
+
+            if (log.isDebugEnabled())
+                log.debug("Failed to notify callback, cache is not found: " + cacheId);
+
+            return;
+        }
+
         final Collection<CacheEntryEvent<? extends K, ? extends V>> entries0
= new ArrayList<>(entries.size());
 
         for (CacheContinuousQueryEntry e : entries) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index b082ba2..ab2f6b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -160,7 +160,7 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 @SuppressWarnings("All")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
-    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE,
1024 * 10);
+    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE,
1024);
 
     /** */
     private static final IgniteProductVersion CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE
=
@@ -1398,6 +1398,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 ", topSize=" + ring.allNodes().size() +
                 ", leavingNodesSize=" + leavingNodesSize + ", failedNodesSize=" + failedNodesSize
+
                 ", msgWorker.queue.size=" + (msgWorker != null ? msgWorker.queueSize() :
"N/A") +
+                ", clients=" + ring.clientNodes().size() +
+                ", clientWorkers=" + clientMsgWorkers.size() +
                 ", lastUpdate=" + (locNode != null ? U.format(locNode.lastUpdateTime()) :
"N/A") +
                 ", heapFree=" + runtime.freeMemory() / (1024 * 1024) +
                 "M, heapTotal=" + runtime.maxMemory() / (1024 * 1024) + "M]");
@@ -2101,7 +2103,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Message worker thread for messages processing.
      */
-    private class RingMessageWorker extends MessageWorkerAdapter {
+    private class RingMessageWorker extends MessageWorkerAdapter<TcpDiscoveryAbstractMessage>
{
         /** Next node. */
         @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
         private TcpDiscoveryNode next;
@@ -2159,6 +2161,31 @@ class ServerImpl extends TcpDiscoveryImpl {
             initConnectionCheckFrequency();
         }
 
+        /**
+         * Adds message to queue.
+         *
+         * @param msg Message to add.
+         */
+        void addMessage(TcpDiscoveryAbstractMessage msg) {
+            if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
+                msg instanceof TcpDiscoveryJoinRequestMessage ||
+                msg instanceof TcpDiscoveryCustomEventMessage) &&
+                queue.contains(msg)) {
+                if (log.isDebugEnabled())
+                    log.debug("Ignoring duplicate message: " + msg);
+
+                return;
+            }
+
+            if (msg.highPriority())
+                queue.addFirst(msg);
+            else
+                queue.add(msg);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to queue: " + msg);
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             try {
@@ -2320,28 +2347,44 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
-                byte[] marshalledMsg = null;
+                byte[] msgBytes = null;
 
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
-                    // Send a clone to client to avoid ConcurrentModificationException
-                    TcpDiscoveryAbstractMessage msgClone;
-
-                    try {
-                        if (marshalledMsg == null)
-                            marshalledMsg = spi.marsh.marshal(msg);
+                    if (msgBytes == null) {
+                        try {
+                            msgBytes = spi.marsh.marshal(msg);
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to marshal message: " + msg, e);
 
-                        msgClone = spi.marsh.unmarshal(marshalledMsg,
-                            U.resolveClassLoader(spi.ignite().configuration()));
+                            break;
+                        }
                     }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to marshal message: " + msg, e);
 
-                        msgClone = msg;
-                    }
+                    TcpDiscoveryAbstractMessage msg0 = msg;
+                    byte[] msgBytes0 = msgBytes;
+
+                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                        TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg;
+
+                        TcpDiscoveryNode node = nodeAddedMsg.node();
+
+                        if (clientMsgWorker.clientNodeId.equals(node.id())) {
+                            try {
+                                msg0 = spi.marsh.unmarshal(msgBytes,
+                                    U.resolveClassLoader(spi.ignite().configuration()));
 
-                    prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null,
null, null);
+                                prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId,
null, null, null);
 
-                    clientMsgWorker.addMessage(msgClone);
+                                msgBytes0 = null;
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Failed to create message copy: " + msg, e);
+                            }
+                        }
+                    }
+
+                    clientMsgWorker.addMessage(msg0, msgBytes0);
                 }
             }
         }
@@ -5599,16 +5642,13 @@ class ServerImpl extends TcpDiscoveryImpl {
 
     /**
      */
-    private class ClientMessageWorker extends MessageWorkerAdapter {
+    private class ClientMessageWorker extends MessageWorkerAdapter<T2<TcpDiscoveryAbstractMessage,
byte[]>> {
         /** Node ID. */
         private final UUID clientNodeId;
 
         /** Socket. */
         private final Socket sock;
 
-        /** Output stream. */
-        private final OutputStream out;
-
         /** Current client metrics. */
         private volatile ClusterMetrics metrics;
 
@@ -5627,8 +5667,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             this.sock = sock;
             this.clientNodeId = clientNodeId;
-
-            out = new BufferedOutputStream(sock.getOutputStream(), sock.getSendBufferSize());
         }
 
         /**
@@ -5652,11 +5690,43 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.metrics = metrics;
         }
 
+        /**
+         * @param msg Message.
+         */
+        public void addMessage(TcpDiscoveryAbstractMessage msg) {
+            addMessage(msg, null);
+        }
+
+        /**
+         * @param msg Message.
+         * @param msgBytes Optional message bytes.
+         */
+        public void addMessage(TcpDiscoveryAbstractMessage msg, @Nullable byte[] msgBytes)
{
+            T2 t = new T2<>(msg, msgBytes);
+
+            if (msg.highPriority())
+                queue.addFirst(t);
+            else
+                queue.add(t);
+
+            if (log.isDebugEnabled())
+                log.debug("Message has been added to client queue: " + msg);
+        }
+
         /** {@inheritDoc} */
-        @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+        @Override protected void processMessage(T2<TcpDiscoveryAbstractMessage, byte[]>
msgT) {
+            boolean success = false;
+
+            TcpDiscoveryAbstractMessage msg = msgT.get1();
+
             try {
                 assert msg.verified() : msg;
 
+                byte[] msgBytes = msgT.get2();
+
+                if (msgBytes == null)
+                    msgBytes = spi.marsh.marshal(msg);
+
                 if (msg instanceof TcpDiscoveryClientAckResponse) {
                     if (clientVer == null) {
                         ClusterNode node = spi.getNode(clientNodeId);
@@ -5675,7 +5745,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Sending message ack to client [sock=" + sock + ",
locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg="
+ msg + ']');
 
-                        spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
+                        spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled()
?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
                     }
                 }
@@ -5686,9 +5756,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     assert topologyInitialized(msg) : msg;
 
-                    spi.writeToSocket(sock, out, msg, spi.failureDetectionTimeoutEnabled()
?
+                    spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled()
?
                         spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
+
+                success = true;
             }
             catch (IgniteCheckedException | IOException e) {
                 if (log.isDebugEnabled())
@@ -5697,12 +5769,15 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 onException("Client connection failed [sock=" + sock + ", locNodeId="
                     + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg +
']', e);
+            }
+            finally {
+                if (!success) {
+                    clientMsgWorkers.remove(clientNodeId, this);
 
-                clientMsgWorkers.remove(clientNodeId, this);
-
-                U.interrupt(this);
+                    U.interrupt(this);
 
-                U.closeQuiet(sock);
+                    U.closeQuiet(sock);
+                }
             }
         }
 
@@ -5792,9 +5867,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Base class for message workers.
      */
-    protected abstract class MessageWorkerAdapter extends IgniteSpiThread {
+    protected abstract class MessageWorkerAdapter<T> extends IgniteSpiThread {
         /** Message queue. */
-        private final BlockingDeque<TcpDiscoveryAbstractMessage> queue = new LinkedBlockingDeque<>();
+        protected final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
 
         /** Backed interrupted flag. */
         private volatile boolean interrupted;
@@ -5820,7 +5895,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() +
']');
 
             while (!isInterrupted()) {
-                TcpDiscoveryAbstractMessage msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
+                T msg = queue.poll(pollingTimeout, TimeUnit.MILLISECONDS);
 
                 if (msg == null)
                     noMessageLoop();
@@ -5849,34 +5924,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Adds message to queue.
-         *
-         * @param msg Message to add.
-         */
-        void addMessage(TcpDiscoveryAbstractMessage msg) {
-            if ((msg instanceof TcpDiscoveryStatusCheckMessage ||
-                msg instanceof TcpDiscoveryJoinRequestMessage ||
-                msg instanceof TcpDiscoveryCustomEventMessage) &&
-                queue.contains(msg)) {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring duplicate message: " + msg);
-
-                return;
-            }
-
-            if (msg.highPriority())
-                queue.addFirst(msg);
-            else
-                queue.add(msg);
-
-            if (log.isDebugEnabled())
-                log.debug("Message has been added to queue: " + msg);
-        }
-
-        /**
          * @param msg Message.
          */
-        protected abstract void processMessage(TcpDiscoveryAbstractMessage msg);
+        protected abstract void processMessage(T msg);
 
         /**
          * Called when there is no message to process giving ability to perform other activity.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index abe380c..73b541f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1265,7 +1265,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
 
         sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
 
-        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
+        writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
 
         return sock;
     }
@@ -1295,12 +1295,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      * Writes message to the socket.
      *
      * @param sock Socket.
+     * @param msg Message.
      * @param data Raw data to write.
      * @param timeout Socket write timeout.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException
{
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, byte[] data,
long timeout) throws IOException {
         assert sock != null;
         assert data != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
index 37b578c..3993de0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java
@@ -38,6 +38,7 @@ public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMess
      * Constructor.
      *
      * @param creatorNodeId Creator node.
+     * @param metrics Metrics.
      */
     public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics)
{
         super(creatorNodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b0edfbd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index e01094c..331b581 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -1736,8 +1736,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         final AtomicBoolean err = new AtomicBoolean(false);
 
         client.events().localListen(new IgnitePredicate<Event>() {
-            @Override
-            public boolean apply(Event evt) {
+            @Override public boolean apply(Event evt) {
                 if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
                     log.info("Disconnected event.");
 
@@ -2158,17 +2157,49 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
+        @Override protected void writeToSocket(Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             waitFor(writeLock);
 
+            if (!onMessage(sock, msg))
+                return;
+
+            super.writeToSocket(sock, out, msg, timeout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
byte[] msgBytes,
+            long timeout) throws IOException {
+            waitFor(writeLock);
+
+            if (!onMessage(sock, msg))
+                return;
+
+            super.writeToSocket(sock, msg, msgBytes, timeout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
+        }
+
+        /**
+         * @param sock Socket.
+         * @param msg Message.
+         * @return {@code False} if should not further process message.
+         * @throws IOException If failed.
+         */
+        private boolean onMessage(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException
{
             boolean fail = false;
 
             if (skipNodeAdded &&
                 (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage))
{
                 log.info("Skip message: " + msg);
 
-                return;
+                return false;
             }
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
@@ -2184,10 +2215,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 sock.close();
             }
 
-            super.writeToSocket(sock, out, msg, timeout);
-
-            if (afterWrite != null)
-                afterWrite.apply(msg, sock);
+            return true;
         }
 
         /** {@inheritDoc} */


Mime
View raw message