Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0954D200BCE for ; Thu, 17 Nov 2016 11:51:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 08687160B26; Thu, 17 Nov 2016 10:51:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EF7C5160B25 for ; Thu, 17 Nov 2016 11:51:25 +0100 (CET) Received: (qmail 77691 invoked by uid 500); 17 Nov 2016 10:51:25 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 77301 invoked by uid 99); 17 Nov 2016 10:51:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Nov 2016 10:51:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C9E06F1715; Thu, 17 Nov 2016 10:51:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 17 Nov 2016 10:51:38 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] ignite git commit: GG-11655 - Fix merge archived-at: Thu, 17 Nov 2016 10:51:27 -0000 GG-11655 - Fix merge Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62a0136 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62a0136 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62a0136 Branch: refs/heads/ignite-2693 Commit: a62a0136d295486d95c6e2ab5bba88270d831753 Parents: 92fff63 Author: dkarachentsev Authored: Wed Nov 2 19:07:45 2016 +0300 Committer: dkarachentsev Committed: Wed Nov 2 19:10:01 2016 +0300 ---------------------------------------------------------------------- .../service/GridServiceProcessor.java | 136 ++++++++++--------- 1 file changed, 74 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a62a0136/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 8489875..6c26363 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -851,7 +851,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - return new GridServiceProxy<>(prj, name, svcItf, sticky, ctx).proxy(); + return new GridServiceProxy(prj, name, svcItf, sticky, ctx).proxy(); } /** @@ -904,7 +904,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @throws IgniteCheckedException If failed. */ - private void reassign(GridServiceDeployment dep, long topVer) throws IgniteCheckedException { + private void reassign(GridServiceDeployment dep, AffinityTopologyVersion topVer) throws IgniteCheckedException { ServiceConfiguration cfg = dep.configuration(); Object nodeFilter = cfg.getNodeFilter(); @@ -918,7 +918,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Object affKey = cfg.getAffinityKey(); while (true) { - GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer); + GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(), topVer.topologyVersion()); Collection nodes; @@ -948,7 +948,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { Map cnts = new HashMap<>(); if (affKey != null) { - ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, new AffinityTopologyVersion(topVer)); + ClusterNode n = ctx.affinity().mapKeyToNode(cacheName, affKey, topVer); if (n != null) { int cnt = maxPerNodeCnt == 0 ? totalCnt == 0 ? 1 : totalCnt : maxPerNodeCnt; @@ -1180,7 +1180,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { if (cfg instanceof LazyServiceConfiguration) { byte[] bytes = ((LazyServiceConfiguration)cfg).serviceBytes(); - Service srvc = m.unmarshal(bytes, U.resolveClassLoader(null, ctx.config())); + Service srvc = U.unmarshal(m, bytes, U.resolveClassLoader(null, ctx.config())); ctx.resource().inject(srvc); @@ -1190,10 +1190,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { Service svc = cfg.getService(); try { - byte[] bytes = m.marshal(svc); + byte[] bytes = U.marshal(m, svc); - Service cp = m.unmarshal(bytes, - U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config())); + Service cp = U.unmarshal(m, bytes, U.resolveClassLoader(svc.getClass().getClassLoader(), ctx.config())); ctx.resource().inject(cp); @@ -1268,8 +1267,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { ClusterNode oldestSrvNode = CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); - if (oldestSrvNode == null) - return F.emptyIterator(); + if (oldestSrvNode == null) + return new GridEmptyIterator<>(); GridCacheQueryManager qryMgr = cache.context().queries(); @@ -1455,7 +1454,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { svcName.set(dep.configuration().getName()); // Ignore other utility cache events. - long topVer = ctx.discovery().topologyVersion(); + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null); @@ -1506,60 +1505,60 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - /** - * Deployment callback. - * - * @param dep Service deployment. - * @param topVer Topology version. - */ - private void onDeployment(final GridServiceDeployment dep, final long topVer) { - // Retry forever. - try { - long newTopVer = ctx.discovery().topologyVersion(); - - // If topology version changed, reassignment will happen from topology event. - if (newTopVer == topVer) - reassign(dep, topVer); - } - catch (IgniteCheckedException e) { - if (!(e instanceof ClusterTopologyCheckedException)) - log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - - long newTopVer = ctx.discovery().topologyVersion(); - - if (newTopVer != topVer) { - assert newTopVer > topVer; + /** + * Deployment callback. + * + * @param dep Service deployment. + * @param topVer Topology version. + */ + private void onDeployment(final GridServiceDeployment dep, final AffinityTopologyVersion topVer) { + // Retry forever. + try { + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - // Reassignment will happen from topology event. - return; + // If topology version changed, reassignment will happen from topology event. + if (newTopVer.equals(topVer)) + reassign(dep, topVer); } + catch (IgniteCheckedException e) { + if (!(e instanceof ClusterTopologyCheckedException)) + log.error("Failed to do service reassignment (will retry): " + dep.configuration().getName(), e); - ctx.timeout().addTimeoutObject(new GridTimeoutObject() { - private IgniteUuid id = IgniteUuid.randomUuid(); + AffinityTopologyVersion newTopVer = ctx.discovery().topologyVersionEx(); - private long start = System.currentTimeMillis(); + if (!newTopVer.equals(topVer)) { + assert newTopVer.compareTo(topVer) > 0; - @Override public IgniteUuid timeoutId() { - return id; + // Reassignment will happen from topology event. + return; } - @Override public long endTime() { - return start + RETRY_TIMEOUT; - } + ctx.timeout().addTimeoutObject(new GridTimeoutObject() { + private IgniteUuid id = IgniteUuid.randomUuid(); - @Override public void onTimeout() { - if (!busyLock.enterBusy()) - return; + private long start = System.currentTimeMillis(); - try { - // Try again. - onDeployment(dep, topVer); + @Override public IgniteUuid timeoutId() { + return id; } - finally { - busyLock.leaveBusy(); + + @Override public long endTime() { + return start + RETRY_TIMEOUT; } - } - }); + + @Override public void onTimeout() { + if (!busyLock.enterBusy()) + return; + + try { + // Try again. + onDeployment(dep, topVer); + } + finally { + busyLock.leaveBusy(); + } + } + }); } } @@ -1568,16 +1567,28 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ private class TopologyListener implements GridLocalEventListener { /** {@inheritDoc} */ - @Override public void onEvent(final Event evt) { + @Override public void onEvent(Event evt) { if (!busyLock.enterBusy()) return; try { + final AffinityTopologyVersion topVer; + + if (evt instanceof DiscoveryCustomEvent) { + DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage(); + + topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion(); + + if (msg instanceof CacheAffinityChangeMessage) { + if (!((CacheAffinityChangeMessage)msg).exchangeNeeded()) + return; + } + } + else + topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); + depExe.submit(new BusyRunnable() { @Override public void run0() { - AffinityTopologyVersion topVer = - new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion()); - ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); if (oldest != null && oldest.isLocal()) { @@ -1612,7 +1623,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity(). affinityReadyFuture(topVer).get(); - reassign(dep, topVer.topologyVersion()); + reassign(dep, topVer); } catch (IgniteCheckedException ex) { if (!(e instanceof ClusterTopologyCheckedException)) @@ -1629,7 +1640,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } if (!retries.isEmpty()) - onReassignmentFailed(topVer.topologyVersion(), retries); + onReassignmentFailed(topVer, retries); } // Clean up zombie assignments. @@ -1666,13 +1677,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param topVer Topology version. * @param retries Retries. */ - private void onReassignmentFailed(final long topVer, final Collection retries) { + private void onReassignmentFailed(final AffinityTopologyVersion topVer, + final Collection retries) { if (!busyLock.enterBusy()) return; try { // If topology changed again, let next event handle it. - if (ctx.discovery().topologyVersion() != topVer) + if (ctx.discovery().topologyVersionEx().equals(topVer)) return; for (Iterator it = retries.iterator(); it.hasNext(); ) {