Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9AD9D185E8 for ; Mon, 30 Nov 2015 09:25:55 +0000 (UTC) Received: (qmail 8468 invoked by uid 500); 30 Nov 2015 09:25:55 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 8389 invoked by uid 500); 30 Nov 2015 09:25:55 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 7871 invoked by uid 99); 30 Nov 2015 09:25:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Nov 2015 09:25:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B313DFC8B; Mon, 30 Nov 2015 09:25:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 30 Nov 2015 09:26:39 -0000 Message-Id: <24167924bb59407b8333728b1ef45cdd@git.apache.org> In-Reply-To: <6c7165248491445d8c065305807d0887@git.apache.org> References: <6c7165248491445d8c065305807d0887@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [46/49] ignite git commit: Fixed communication subsystem stop notification. Fixed communication subsystem stop notification. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59f37266 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59f37266 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59f37266 Branch: refs/heads/ignite-1537 Commit: 59f3726696ec47b52f68103aa8250d9f2015b49b Parents: 3a6a463 Author: Yakov Zhdanov Authored: Sat Nov 28 18:46:51 2015 +0300 Committer: Yakov Zhdanov Committed: Sat Nov 28 18:46:51 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 88 +------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 4 +- .../communication/tcp/TcpCommunicationSpi.java | 17 ++-- .../ignite/testframework/GridTestUtils.java | 61 +++++++++++++- 5 files changed, 76 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index ea82d7f..a8557cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -46,8 +46,6 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -103,9 +101,6 @@ import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIM * Grid communication manager. */ public class GridIoManager extends GridManagerAdapter> { - /** */ - public static volatile boolean TURBO_DEBUG_MODE; - /** Empty array of message factories. */ public static final MessageFactory[] EMPTY = {}; @@ -775,7 +770,7 @@ public class GridIoManager extends GridManagerAdapter - * This method eliminates network between nodes started in single JVM - * when {@link #TURBO_DEBUG_MODE} is set to {@code true}. - *

- * How to use it: - *

    - *
  1. Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)} - * with this method.
  2. - *
  3. Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.
  4. - *
  5. Perform test operations on the topology. No network will be there.
  6. - *
  7. DO NOT turn on turbo debug before all grids started. This will cause deadlocks.
  8. - *
- * - * @param node Destination node. - * @param topic Topic to send the message to. - * @param topicOrd GridTopic enumeration ordinal. - * @param msg Message to send. - * @param plc Type of processing. - * @param ordered Ordered flag. - * @param timeout Timeout. - * @param skipOnTimeout Whether message can be skipped on timeout. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private void sendTurboDebug( - ClusterNode node, - Object topic, - int topicOrd, - Message msg, - byte plc, - boolean ordered, - long timeout, - boolean skipOnTimeout - ) throws IgniteCheckedException { - assert node != null; - assert topic != null; - assert msg != null; - - GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout); - - IgniteKernal rmt; - - if (locNodeId.equals(node.id())) { - assert plc != P2P_POOL; - - CommunicationListener commLsnr = this.commLsnr; - - if (commLsnr == null) - throw new IgniteCheckedException("Trying to send message when grid is not fully started."); - - if (ordered) - processOrderedMessage(locNodeId, ioMsg, plc, null); - else - processRegularMessage0(ioMsg, locNodeId); - } - else if (TURBO_DEBUG_MODE && (rmt = IgnitionEx.gridxx(locNodeId)) != null) { - if (ioMsg.isOrdered()) - rmt.context().io().processOrderedMessage(locNodeId, ioMsg, ioMsg.policy(), null); - else - rmt.context().io().processRegularMessage0(ioMsg, locNodeId); - } - else { - if (topicOrd < 0) - ioMsg.topicBytes(marsh.marshal(topic)); - - try { - getSpi().sendMessage(node, ioMsg); - } - catch (IgniteSpiException e) { - throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + - "TCP connection cannot be established due to firewall issues) " + - "[node=" + node + ", topic=" + topic + - ", msg=" + msg + ", policy=" + plc + ']', e); - } - } - } - - /** * @param nodeId Id of destination node. * @param topic Topic to send the message to. * @param msg Message to send. http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 34addfa..9f1f8a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -81,8 +81,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; @@ -1208,7 +1206,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture 0) // Safety. ctxInitLatch.countDown(); @@ -1976,7 +1981,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client = clients.get(nodeId); if (client == null) { - if (isNodeStopping()) + if (stopping) throw new IgniteSpiException("Node is stopping."); // Do not allow concurrent connects. @@ -2311,8 +2316,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.closeQuiet(ch); - throw new ClusterTopologyCheckedException("Failed to send message, " + - "node left cluster: " + node); + throw new ClusterTopologyCheckedException("Failed to send message " + + "(node left topology): " + node); } long rcvCnt = -1; @@ -2784,18 +2789,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Node ID message. */ private NodeIdMessage nodeIdMessage() { - ClusterNode localNode = getLocalNode(); + ClusterNode locNode = getLocalNode(); UUID id; - if (localNode == null) { + if (locNode == null) { U.warn(log, "Local node is not started or fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); id = new UUID(0, 0); } else - id = localNode.id(); + id = locNode.id(); return new NodeIdMessage(id); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59f37266/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index d1c3d9f..7116227 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -42,10 +42,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Queue; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; import javax.cache.configuration.Factory; @@ -81,6 +86,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridAbsClosure; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.T2; @@ -88,6 +94,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; import org.apache.ignite.ssl.SslContextFactory; import org.apache.ignite.testframework.config.GridTestProperties; @@ -147,6 +154,9 @@ public final class GridTestUtils { /** */ private static final GridBusyLock busyLock = new GridBusyLock(); + /** */ + public static final ConcurrentMap, IgnitePair>> msgMap = new ConcurrentHashMap<>(); + /** * Ensure singleton. */ @@ -155,6 +165,55 @@ public final class GridTestUtils { } /** + * @param from From node ID. + * @param to To node ID. + * @param msg Message. + * @param sent Sent or received. + */ + public static void addMessage(UUID from, UUID to, Message msg, boolean sent) { + IgnitePair key = F.pair(from, to); + + IgnitePair> val = msgMap.get(key); + + if (val == null) { + IgnitePair> old = msgMap.putIfAbsent(key, + val = F.>pair(new ConcurrentLinkedQueue(), new ConcurrentLinkedQueue())); + + if (old != null) + val = old; + } + + (sent ? val.get1() : val.get2()).add(msg); + } + + /** + * Dumps all messages tracked with {@link #addMessage(UUID, UUID, Message, boolean)} to std out. + */ + public static void dumpMessages() { + for (Map.Entry, IgnitePair>> entry : msgMap.entrySet()) { + U.debug("\n" + entry.getKey().get1() + " [sent to] " + entry.getKey().get2()); + + for (Message message : entry.getValue().get1()) + U.debug("\t" + message); + + U.debug(entry.getKey().get2() + " [received from] " + entry.getKey().get1()); + + for (Message message : entry.getValue().get2()) + U.debug("\t" + message); + } + } + +// static { +// new Thread(new Runnable() { +// @Override public void run() { +// JOptionPane.showMessageDialog(null, "Close this to dump messages."); +// +// dumpMessages(); +// } +// }).start(); +// } + + /** * Checks whether callable throws expected exception or not. * * @param log Logger (optional). @@ -1728,4 +1787,4 @@ public final class GridTestUtils { /** Evict to offheap with eviction policy + evict from offheap to swap when max offheap memory limit is reached. */ OFFHEAP_EVICT_SWAP, } -} \ No newline at end of file +}