Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1398617CB6 for ; Fri, 19 Jun 2015 11:38:16 +0000 (UTC) Received: (qmail 40793 invoked by uid 500); 19 Jun 2015 11:38:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 40764 invoked by uid 500); 19 Jun 2015 11:38:16 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 40755 invoked by uid 99); 19 Jun 2015 11:38:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 11:38:15 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 19 Jun 2015 11:36:03 +0000 Received: (qmail 37133 invoked by uid 99); 19 Jun 2015 11:37:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Jun 2015 11:37:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DAD6AE027D; Fri, 19 Jun 2015 11:37:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <2ee2cd9df58d4580af1c847343b0245e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # ignite-sprint-7 do not call ping from exchange worker Date: Fri, 19 Jun 2015 11:37:46 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-7 d00638970 -> 8fa9d3dd9 # ignite-sprint-7 do not call ping from exchange worker Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fa9d3dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fa9d3dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fa9d3dd Branch: refs/heads/ignite-sprint-7 Commit: 8fa9d3dd9214e85748a6b7b900fa2433079ce246 Parents: d006389 Author: sboikov Authored: Fri Jun 19 14:37:37 2015 +0300 Committer: sboikov Committed: Fri Jun 19 14:37:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 64 ++++++++++++------ .../GridCachePartitionExchangeManager.java | 70 +++++++++----------- .../GridCacheAbstractFailoverSelfTest.java | 6 +- 3 files changed, 79 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fa9d3dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index eef9fde..74a4512 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -323,7 +323,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * Processes failed messages. * @param nodeId niode id. * @param msg message. - * @throws IgniteCheckedException + * @throws IgniteCheckedException If failed. */ private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException { GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); @@ -511,6 +511,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @param cacheMsg Cache message to get start future. * @return Preloader start future. */ + @SuppressWarnings("unchecked") private IgniteInternalFuture startFuture(GridCacheMessage cacheMsg) { int cacheId = cacheMsg.cacheId(); @@ -574,6 +575,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * * @param node Node to send the message to. * @param msg Message to send. + * @param plc IO policy. * @throws IgniteCheckedException If sending failed. * @throws ClusterTopologyCheckedException If receiver left. */ @@ -734,6 +736,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * * @param nodeId ID of node to send the message to. * @param msg Message to send. + * @param plc IO policy. * @throws IgniteCheckedException If sending failed. */ public void send(UUID nodeId, GridCacheMessage msg, GridIoPolicy plc) throws IgniteCheckedException { @@ -795,8 +798,41 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * Sends message without retries and node ping in case of error. + * + * @param node Node to send message to. + * @param msg Message. + * @param plc IO policy. + * @throws IgniteCheckedException If send failed. + */ + public void sendNoRetry(ClusterNode node, + GridCacheMessage msg, + GridIoPolicy plc) + throws IgniteCheckedException + { + assert node != null; + assert msg != null; + + onSend(msg, null); + + try { + cctx.gridIO().send(node, TOPIC_CACHE, msg, plc); + + if (log.isDebugEnabled()) + log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']'); + } + catch (IgniteCheckedException e) { + if (!cctx.discovery().alive(node.id())) + throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); + else + throw e; + } + } + + /** * Adds message handler. * + * @param cacheId Cache ID. * @param type Type of message. * @param c Handler. */ @@ -846,29 +882,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { idxClsHandlers.remove(cacheId); - for (Iterator iterator = clsHandlers.keySet().iterator(); iterator.hasNext(); ) { - ListenerKey key = iterator.next(); + for (Iterator iter = clsHandlers.keySet().iterator(); iter.hasNext(); ) { + ListenerKey key = iter.next(); if (key.cacheId == cacheId) - iterator.remove(); + iter.remove(); } } /** - * @param lsnr Listener to add. - */ - public void addDisconnectListener(GridDisconnectListener lsnr) { - cctx.kernalContext().io().addDisconnectListener(lsnr); - } - - /** - * @param lsnr Listener to remove. - */ - public void removeDisconnectListener(GridDisconnectListener lsnr) { - cctx.kernalContext().io().removeDisconnectListener(lsnr); - } - - /** * @param msgCls Message class to check. * @return Message index. */ @@ -1029,11 +1051,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public int hashCode() { - int result = cacheId; + int res = cacheId; - result = 31 * result + msgCls.hashCode(); + res = 31 * res + msgCls.hashCode(); - return result; + return res; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fa9d3dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 3df45cb..81019e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -567,27 +567,21 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana Collection rmts = null; - try { - // If this is the oldest node. - if (oldest.id().equals(cctx.localNodeId())) { - rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); - - if (log.isDebugEnabled()) - log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); + // If this is the oldest node. + if (oldest.id().equals(cctx.localNodeId())) { + rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); - sendAllPartitions(rmts); - } - else { - if (log.isDebugEnabled()) - log.debug("Refreshing local partitions from non-oldest node: " + - cctx.localNodeId()); + if (log.isDebugEnabled()) + log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); - sendLocalPartitions(oldest, null); - } + sendAllPartitions(rmts); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to refresh partition map [oldest=" + oldest.id() + ", rmts=" + U.nodeIds(rmts) + - ", loc=" + cctx.localNodeId() + ']', e); + else { + if (log.isDebugEnabled()) + log.debug("Refreshing local partitions from non-oldest node: " + + cctx.localNodeId()); + + sendLocalPartitions(oldest, null); } } @@ -616,10 +610,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param nodes Nodes. * @return {@code True} if message was sent, {@code false} if node left grid. - * @throws IgniteCheckedException If failed. */ - private boolean sendAllPartitions(Collection nodes) - throws IgniteCheckedException { + private boolean sendAllPartitions(Collection nodes) { GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -634,7 +626,19 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); - cctx.io().safeSend(nodes, m, SYSTEM_POOL, null); + for (ClusterNode node : nodes) { + try { + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + + node.id() + ", msg=" + m + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partitions full message [node=" + node + ']', e); + } + } return true; } @@ -642,11 +646,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param node Node. * @param id ID. - * @return {@code True} if message was sent, {@code false} if node left grid. - * @throws IgniteCheckedException If failed. */ - private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) - throws IgniteCheckedException { + private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), cctx.versions().last()); @@ -669,16 +670,15 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); try { - cctx.io().send(node, m, SYSTEM_POOL); - - return true; + cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + node.id() + ", msg=" + m + ']'); - - return false; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send local partition map to node [node=" + node + ", exchId=" + id + ']', e); } } @@ -902,13 +902,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana return; try { - try { - sendLocalPartitions(node, msg.exchangeId()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send local partition map to node [nodeId=" + node.id() + ", exchId=" + - msg.exchangeId() + ']', e); - } + sendLocalPartitions(node, msg.exchangeId()); } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fa9d3dd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java index b6cd88e..3c74674 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFailoverSelfTest.java @@ -74,9 +74,11 @@ public abstract class GridCacheAbstractFailoverSelfTest extends GridCacheAbstrac TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - discoSpi.setSocketTimeout(10_000); - discoSpi.setAckTimeout(10_000); + discoSpi.setSocketTimeout(30_000); + discoSpi.setAckTimeout(30_000); discoSpi.setNetworkTimeout(60_000); + discoSpi.setHeartbeatFrequency(30_000); + discoSpi.setReconnectCount(2); return cfg; }