Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7573B184F3 for ; Tue, 11 Aug 2015 07:27:30 +0000 (UTC) Received: (qmail 84561 invoked by uid 500); 11 Aug 2015 07:27:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 84528 invoked by uid 500); 11 Aug 2015 07:27:30 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 84518 invoked by uid 99); 11 Aug 2015 07:27:30 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 07:27:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E0771C0DF9 for ; Tue, 11 Aug 2015 07:27:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id n-u1yXzn1_hm for ; Tue, 11 Aug 2015 07:27:23 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 7FB2F34A3B for ; Tue, 11 Aug 2015 07:27:12 +0000 (UTC) Received: (qmail 82864 invoked by uid 99); 11 Aug 2015 07:27:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 07:27:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E780CE35DB; Tue, 11 Aug 2015 07:27:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 11 Aug 2015 07:27:24 -0000 Message-Id: <84c76c6c094d44df85565b523cd9e11e@git.apache.org> In-Reply-To: <01fe7fc564c3447aa57a428396b623f3@git.apache.org> References: <01fe7fc564c3447aa57a428396b623f3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] incubator-ignite git commit: Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104 Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104 Conflicts: modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cdd2440 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cdd2440 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cdd2440 Branch: refs/heads/ignite-426 Commit: 5cdd2440a6b9eb3c5fe0a7620202caf5cb2db441 Parents: 6c1655f 1c10ade Author: Valentin Kulichenko Authored: Fri Jul 31 13:38:39 2015 -0700 Committer: Valentin Kulichenko Committed: Fri Jul 31 13:38:39 2015 -0700 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 14 +- .../managers/communication/GridIoManager.java | 110 ++++- .../GridDhtPartitionsExchangeFuture.java | 20 +- .../handlers/query/QueryCommandHandler.java | 6 +- .../util/nio/GridCommunicationClient.java | 5 +- .../util/nio/GridNioFinishedFuture.java | 12 + .../ignite/internal/util/nio/GridNioFuture.java | 14 + .../internal/util/nio/GridNioFutureImpl.java | 15 + .../util/nio/GridNioRecoveryDescriptor.java | 13 +- .../ignite/internal/util/nio/GridNioServer.java | 5 + .../util/nio/GridNioSessionMetaKey.java | 5 +- .../util/nio/GridShmemCommunicationClient.java | 7 +- .../util/nio/GridTcpNioCommunicationClient.java | 14 +- .../communication/tcp/TcpCommunicationSpi.java | 84 +++- .../ignite/spi/discovery/tcp/ServerImpl.java | 45 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 8 +- .../IgniteSpiCommunicationSelfTestSuite.java | 1 + .../http/jetty/GridJettyRestHandler.java | 12 +- 20 files changed, 779 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cdd2440/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 765ba65,7e17efc..479d116 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@@ -1059,9 -982,9 +1061,10 @@@ public class GridIoManager extends Grid Message msg, byte plc, boolean ordered, + boolean seq, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure ackClosure ) throws IgniteCheckedException { assert node != null; assert topic != null; @@@ -1079,10 -1002,11 +1082,13 @@@ if (ordered) processOrderedMessage(locNodeId, ioMsg, plc, null); + else if (seq) + processSequentialMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); + + if (ackClosure != null) + ackClosure.apply(null); } else { if (topicOrd < 0) @@@ -1132,7 -1059,7 +1141,7 @@@ if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); ++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null); } /** @@@ -1144,7 -1071,7 +1153,7 @@@ */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, false, 0, false); - send(node, topic, -1, msg, plc, false, 0, false, null); ++ send(node, topic, -1, msg, plc, false, false, 0, false, null); } /** @@@ -1156,7 -1083,7 +1165,7 @@@ */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); ++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null); } /** @@@ -1178,7 -1105,7 +1187,7 @@@ ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); ++ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null); } /** @@@ -1205,7 -1132,7 +1214,7 @@@ if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); ++ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null); } /** @@@ -1264,47 -1217,30 +1299,71 @@@ } /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessage( + ClusterNode node, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** + * Sends sequential message. + * + * @param nodeId Destination node ID. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void sendSequentialMessage( + UUID nodeId, + Object topic, + Message msg, + byte plc + ) throws IgniteCheckedException { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + + sendSequentialMessage(node, topic, msg, plc); + } + + /** + * Sends sequential message. + * + * @param node Destination node. + * @param topic Topic. + * @param msg Message. + * @param plc Policy. + * @throws IgniteCheckedException In case of error. + */ + public void sendSequentialMessage( + ClusterNode node, + Object topic, + Message msg, + byte plc + ) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, true, 0, false); ++ send(node, topic, -1, msg, plc, false, true, 0, false, null); + } + + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@@ -1459,7 -1422,7 +1547,7 @@@ // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout); - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); ++ send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, null); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +