Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BCFB510F69 for ; Tue, 5 May 2015 16:15:20 +0000 (UTC) Received: (qmail 30255 invoked by uid 500); 5 May 2015 16:15:20 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 30224 invoked by uid 500); 5 May 2015 16:15:20 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 30215 invoked by uid 99); 5 May 2015 16:15:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 16:15:20 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.191.145.13] (HELO mx1-us-west.apache.org) (54.191.145.13) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 16:15:15 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id D11502501E for ; Tue, 5 May 2015 16:14:52 +0000 (UTC) Received: (qmail 29614 invoked by uid 99); 5 May 2015 16:14:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 16:14:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A09A3E10A1; Tue, 5 May 2015 16:14:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sevdokimov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <5e20b0e4eb7b477d9e29701f7f1b9392@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # IGNITE-831 Done. Date: Tue, 5 May 2015 16:14:52 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-836_2 ac7597e15 -> e028de86e # IGNITE-831 Done. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e028de86 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e028de86 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e028de86 Branch: refs/heads/ignite-836_2 Commit: e028de86ee375ad7c96a8eeac333487258534cff Parents: ac7597e Author: sevdokimov Authored: Tue May 5 19:14:43 2015 +0300 Committer: sevdokimov Committed: Tue May 5 19:14:43 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousMessageType.java | 12 - .../continuous/GridContinuousProcessor.java | 544 +++---------------- .../StartRoutineAckDiscoveryMessage.java | 62 +++ .../StartRoutineDiscoveryMessage.java | 89 +++ .../StopRoutineAckDiscoveryMessage.java | 49 ++ .../continuous/StopRoutineDiscoveryMessage.java | 56 ++ .../ignite/spi/discovery/DiscoverySpi.java | 4 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 6 +- .../tcp/TcpClientDiscoverySelfTest.java | 4 +- 9 files changed, 338 insertions(+), 488 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java index eb33613..1b79430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java @@ -23,18 +23,6 @@ import org.jetbrains.annotations.*; * Continuous processor message types. */ enum GridContinuousMessageType { - /** Consume start request. */ - MSG_START_REQ, - - /** Consume start acknowledgement. */ - MSG_START_ACK, - - /** Consume stop request. */ - MSG_STOP_REQ, - - /** Consume stop acknowledgement. */ - MSG_STOP_ACK, - /** Remote event notification. */ MSG_EVT_NOTIFICATION, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 41f5940..d71609b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; @@ -64,21 +65,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Start futures. */ private final ConcurrentMap startFuts = new ConcurrentHashMap8<>(); - /** Start ack wait lists. */ - private final ConcurrentMap> waitForStartAck = new ConcurrentHashMap8<>(); - /** Stop futures. */ private final ConcurrentMap stopFuts = new ConcurrentHashMap8<>(); - /** Stop ack wait lists. */ - private final ConcurrentMap> waitForStopAck = new ConcurrentHashMap8<>(); - /** Threads started by this processor. */ private final Collection threads = new GridConcurrentHashSet<>(); - /** Pending start requests. */ - private final Map> pending = new HashMap<>(); - /** */ private final ConcurrentMap syncMsgFuts = new ConcurrentHashMap8<>(); @@ -91,18 +83,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Lock for stop process. */ private final Lock stopLock = new ReentrantLock(); + /** Marshaller. */ + private Marshaller marsh; + /** Delay in milliseconds between retries. */ private long retryDelay = 1000; /** Number of retries using to send messages. */ private int retryCnt = 3; - /** Acknowledgement timeout. */ - private long ackTimeout; - - /** Marshaller. */ - private Marshaller marsh; - /** * @param ctx Kernal context. */ @@ -117,15 +106,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); - ackTimeout = ctx.config().getNetworkTimeout(); - - if (ackTimeout < retryDelay * retryCnt) { - U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " + - "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" + - ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']'); - - ackTimeout = retryDelay * retryCnt; - } marsh = ctx.config().getMarshaller(); @@ -136,111 +116,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - Collection reqs; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - pendingLock.lock(); + // Unregister handlers created by left node. + for (Map.Entry e : rmtInfos.entrySet()) { + UUID routineId = e.getKey(); + RemoteRoutineInfo info = e.getValue(); - try { - // Remove pending requests to send to joined node - // (if node is left or failed, they are dropped). - reqs = pending.remove(nodeId); - } - finally { - pendingLock.unlock(); + if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) + unregisterRemote(routineId); } - switch (evt.type()) { - case EVT_NODE_JOINED: - if (reqs != null) { - UUID routineId = null; + for (Map.Entry e : syncMsgFuts.entrySet()) { + SyncMessageAckFuture fut = e.getValue(); - // Send pending requests. - try { - for (GridContinuousMessage req : reqs) { - routineId = req.routineId(); + if (fut.nodeId().equals(nodeId)) { + SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); - sendWithRetries(nodeId, req, null); - } - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send pending start request to node (is node alive?): " + - nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send pending start request to node: " + nodeId, e); + if (fut0 != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Node left grid while sending message to: " + nodeId); - completeStartFuture(routineId); - } - } - - break; - - case EVT_NODE_LEFT: - case EVT_NODE_FAILED: - // Do not wait for start acknowledgements from left node. - for (Map.Entry> e : waitForStartAck.entrySet()) { - Collection nodeIds = e.getValue(); - - for (Iterator it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); - - break; - } - } - - if (nodeIds.isEmpty()) - completeStartFuture(e.getKey()); + fut0.onDone(err); } + } + } + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); - // Do not wait for stop acknowledgements from left node. - for (Map.Entry> e : waitForStopAck.entrySet()) { - Collection nodeIds = e.getValue(); + ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) { + if (!snd.id().equals(ctx.localNodeId())) + processStartRequest(snd.id(), msg); + } + }); - for (Iterator it = nodeIds.iterator(); it.hasNext();) { - if (nodeId.equals(it.next())) { - it.remove(); + ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) { + StartFuture fut = startFuts.remove(msg.routineId()); - break; - } - } + if (fut != null) { + if (msg.errs().isEmpty()) + fut.onRemoteRegistered(); + else { + IgniteCheckedException firstEx = F.first(msg.errs().values()); - if (nodeIds.isEmpty()) - completeStopFuture(e.getKey()); - } - - // Unregister handlers created by left node. - for (Map.Entry e : rmtInfos.entrySet()) { - UUID routineId = e.getKey(); - RemoteRoutineInfo info = e.getValue(); + fut.onDone(firstEx); - if (info.autoUnsubscribe && nodeId.equals(info.nodeId)) - unregisterRemote(routineId); + stopRoutine(msg.routineId()); } - for (Map.Entry e : syncMsgFuts.entrySet()) { - SyncMessageAckFuture fut = e.getValue(); - - if (fut.nodeId().equals(nodeId)) { - SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey()); + } + } + }); - if (fut0 != null) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( - "Node left grid while sending message to: " + nodeId); + ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (!snd.id().equals(ctx.localNodeId())) { + UUID routineId = msg.routineId(); - fut0.onDone(err); - } - } - } + unregisterRemote(routineId); + } + } + }); - break; + ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) { + StopFuture fut = stopFuts.remove(msg.routineId()); - default: - assert false : "Unexpected event received: " + evt.shortDisplay(); + if (fut != null) + fut.onDone(); } - } - }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); + }); ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object obj) { @@ -258,26 +209,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } switch (msg.type()) { - case MSG_START_REQ: - processStartRequest(nodeId, msg); - - break; - - case MSG_START_ACK: - processStartAck(nodeId, msg); - - break; - - case MSG_STOP_REQ: - processStopRequest(nodeId, msg); - - break; - - case MSG_STOP_ACK: - processStopAck(nodeId, msg); - - break; - case MSG_EVT_NOTIFICATION: processNotification(nodeId, msg); @@ -323,9 +254,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { pendingLock.lock(); try { - // Create empty pending set. - pending.put(nodeId, new HashSet()); - DiscoveryData data = new DiscoveryData(ctx.localNodeId()); // Collect listeners information (will be sent to @@ -486,32 +414,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { }); } - Collection nodes; - Collection nodeIds; - pendingLock.lock(); try { - // Nodes that participate in routine (request will be sent to these nodes directly). - nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId()))); - - // Stop with exception if projection is empty. - if (nodes.isEmpty() && !locIncluded) { - return new GridFinishedFuture<>( - new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty).")); - } - - // IDs of nodes where request will be sent. - nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())); - - // If there are currently joining nodes, add request to their pending lists. - // Node IDs set is updated to make sure that we wait for acknowledgement from - // these nodes. - for (Map.Entry> e : pending.entrySet()) { - if (nodeIds.add(e.getKey())) - e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false)); - } - // Register routine locally. locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); } @@ -521,68 +426,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = new StartFuture(ctx, routineId); - if (!nodeIds.isEmpty()) { - // Wait for acknowledgements. - waitForStartAck.put(routineId, nodeIds); - - startFuts.put(routineId, fut); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) { - @Override public void onTimeout() { - // Stop waiting for acknowledgements. - Collection ids = waitForStartAck.remove(routineId); - - if (ids != null) { - StartFuture f = startFuts.remove(routineId); + startFuts.put(routineId, fut); - assert f != null; - - // If there are still nodes without acknowledgements, - // Stop routine with exception. Continue and complete - // future otherwise. - if (!ids.isEmpty()) { - f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " + - "expired): " + ids + ". Will unregister all continuous listeners.")); - - stopRoutine(routineId); - } - else - f.onRemoteRegistered(); - } - } - }); + try { + ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData)); } + catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter. + startFuts.remove(routineId); - if (!nodes.isEmpty()) { - // Do not send projection predicate (nodes already filtered). - reqData.projectionPredicate(null); - reqData.projectionPredicateBytes(null); - - // Send start requests. - try { - GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false); - - sendWithRetries(nodes, req, null); - } - catch (IgniteCheckedException e) { - startFuts.remove(routineId); - waitForStartAck.remove(routineId); - - fut.onDone(e); - - stopRoutine(routineId); + locInfos.remove(routineId); - locIncluded = false; - } - } - else { - // There are no remote nodes, but we didn't throw topology exception. - assert locIncluded; + fut.onDone(e); - // Do not wait anything from remote nodes. - fut.onRemoteRegistered(); + return fut; } // Register local handler if needed. @@ -640,61 +496,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Unregister handler locally. unregisterHandler(routineId, routine.hnd, true); - pendingLock.lock(); - - try { - // Remove pending requests for this routine. - for (Collection msgs : pending.values()) { - Iterator it = msgs.iterator(); - - while (it.hasNext()) { - if (it.next().routineId().equals(routineId)) - it.remove(); - } - } - } - finally { - pendingLock.unlock(); - } - - // Nodes where to send stop requests. - Collection nodes = F.view(ctx.discovery().allNodes(), - F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId()))); - - if (!nodes.isEmpty()) { - // Wait for acknowledgements. - waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()))); - - // Register acknowledge timeout (timeout object will be removed when - // future is completed). - fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId, - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false))); - - // Send stop requests. - try { - for (ClusterNode node : nodes) { - try { - sendWithRetries(node.id(), - new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false), - null); - } - catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send stop request (node left topology): " + node.id()); - } - } - } - catch (IgniteCheckedException e) { - stopFuts.remove(routineId); - waitForStopAck.remove(routineId); - - fut.onDone(e); - } - } - else { - stopFuts.remove(routineId); - - fut.onDone(); - } + ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); } return fut; @@ -727,7 +529,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { assert info.interval == 0 || !sync; if (sync) { - SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId); + SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId); IgniteUuid futId = IgniteUuid.randomUuid(); @@ -782,12 +584,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param nodeId Sender ID. * @param req Start request. */ - private void processStartRequest(UUID nodeId, GridContinuousMessage req) { + private void processStartRequest(UUID nodeId, StartRoutineDiscoveryMessage req) { assert nodeId != null; assert req != null; UUID routineId = req.routineId(); - StartRequestData data = req.data(); + StartRequestData data = req.startRequestData(); GridContinuousHandler hnd = data.handler(); @@ -836,100 +638,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e); - } + if (err != null) + req.addError(ctx.localNodeId(), err); if (registered) hnd.onListenerRegistered(routineId, ctx); } /** - * @param nodeId Sender ID. - * @param ack Start acknowledgement. - */ - private void processStartAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - final IgniteCheckedException err = ack.data(); - - if (err != null) { - if (waitForStartAck.remove(routineId) != null) { - final StartFuture fut = startFuts.remove(routineId); - - if (fut != null) { - fut.onDone(err); - - stopRoutine(routineId); - } - } - } - - Collection nodeIds = waitForStartAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStartFuture(routineId); - } - } - - /** - * @param nodeId Sender ID. - * @param req Stop request. - */ - private void processStopRequest(UUID nodeId, GridContinuousMessage req) { - assert nodeId != null; - assert req != null; - - UUID routineId = req.routineId(); - - unregisterRemote(routineId); - - try { - sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e); - } - } - - /** - * @param nodeId Sender ID. - * @param ack Stop acknowledgement. - */ - private void processStopAck(UUID nodeId, GridContinuousMessage ack) { - assert nodeId != null; - assert ack != null; - - UUID routineId = ack.routineId(); - - Collection nodeIds = waitForStopAck.get(routineId); - - if (nodeIds != null) { - nodeIds.remove(nodeId); - - if (nodeIds.isEmpty()) - completeStopFuture(routineId); - } - } - - /** * @param msg Message. */ private void processMessageAck(GridContinuousMessage msg) { @@ -972,36 +688,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** - * @param routineId Consume ID. - */ - private void completeStartFuture(UUID routineId) { - assert routineId != null; - - if (waitForStartAck.remove(routineId) != null) { - StartFuture fut = startFuts.remove(routineId); - - assert fut != null; - - fut.onRemoteRegistered(); - } - } - - /** - * @param routineId Consume ID. - */ - private void completeStopFuture(UUID routineId) { - assert routineId != null; - - if (waitForStopAck.remove(routineId) != null) { - GridFutureAdapter fut = stopFuts.remove(routineId); - - assert fut != null; - - fut.onDone(); - } - } - - /** * @param nodeId Node ID. * @param routineId Consume ID. * @param hnd Handler. @@ -1589,13 +1275,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private volatile GridTimeoutObject timeoutObj; /** - * Required by {@link Externalizable}. - */ - public StartFuture() { - // No-op. - } - - /** * @param ctx Kernal context. * @param routineId Consume ID. */ @@ -1706,10 +1385,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { private UUID nodeId; /** - * @param ctx Kernal context. * @param nodeId Master node ID. */ - SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) { + SyncMessageAckFuture(UUID nodeId) { this.nodeId = nodeId; } @@ -1725,76 +1403,4 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return S.toString(SyncMessageAckFuture.class, this); } } - - /** - * Timeout object for stop process. - */ - private class StopTimeoutObject extends GridTimeoutObjectAdapter { - /** Timeout. */ - private final long timeout; - - /** Routine ID. */ - private final UUID routineId; - - /** Request. */ - private final GridContinuousMessage req; - - /** - * @param timeout Timeout. - * @param routineId Routine ID. - * @param req Request. - */ - protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) { - super(timeout); - - assert routineId != null; - assert req != null; - - this.timeout = timeout; - this.routineId = routineId; - this.req = req; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - Collection ids = waitForStopAck.remove(routineId); - - if (ids != null) { - U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids + - ". Will retry."); - - StopFuture f = stopFuts.get(routineId); - - if (f != null) { - if (!ids.isEmpty()) { - waitForStopAck.put(routineId, ids); - - // Resend requests. - for (UUID id : ids) { - try { - sendWithRetries(id, req, null); - } - catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to resend stop request to node (is node alive?): " + id); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to resend stop request to node: " + id, e); - - ids.remove(id); - - if (ids.isEmpty()) - f.onDone(e); - } - } - - // Reschedule timeout. - ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req)); - } - else if (stopFuts.remove(routineId) != null) - f.onDone(); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java new file mode 100644 index 0000000..4e5bb9c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.discovery.*; + +import java.util.*; + +/** + * + */ +public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { + /** Routine ID. */ + private final UUID routineId; + + /** */ + private final Map errs; + + /** + * @param routineId Routine id. + * @param errs Errs. + */ + public StartRoutineAckDiscoveryMessage(UUID routineId, Map errs) { + this.routineId = routineId; + this.errs = new HashMap<>(errs); + } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return false; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** + * @return Errs. + */ + public Map errs() { + return errs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java new file mode 100644 index 0000000..7fa78b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; + +import java.util.*; + +/** + * + */ +public class StartRoutineDiscoveryMessage implements RingEndAwareCustomMessage { + /** Routine ID. */ + private final UUID routineId; + + /** */ + private final StartRequestData startReqData; + + /** */ + private final Map errs = new HashMap<>(); + + /** + * @param routineId Routine id. + * @param startReqData Start request data. + */ + public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { + this.routineId = routineId; + this.startReqData = startReqData; + } + + + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return false; + } + + /** + * @return Start request data. + */ + public StartRequestData startRequestData() { + return startReqData; + } + + /** + * @param nodeId Node id. + * @param e Exception. + */ + public void addError(UUID nodeId, IgniteCheckedException e) { + errs.put(nodeId, e); + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** + * @return Errs. + */ + public Map errs() { + return errs; + } + + /** {@inheritDoc} */ + @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) { + return new StartRoutineAckDiscoveryMessage(routineId, errs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java new file mode 100644 index 0000000..755552b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; + +import java.util.*; + +/** + * + */ +public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage { + /** Routine ID. */ + private final UUID routineId; + + /** + * @param routineId Routine id. + */ + public StopRoutineAckDiscoveryMessage(UUID routineId) { + this.routineId = routineId; + } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return false; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java new file mode 100644 index 0000000..9c480a0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.spi.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * + */ +public class StopRoutineDiscoveryMessage implements RingEndAwareCustomMessage { + /** Routine ID. */ + private final UUID routineId; + + /** + * @param routineId Routine id. + */ + public StopRoutineDiscoveryMessage(UUID routineId) { + this.routineId = routineId; + } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return false; + } + + /** + * @return Routine ID. + */ + public UUID routineId() { + return routineId; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) { + return new StopRoutineAckDiscoveryMessage(routineId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 247ff67..84a5f41 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery; +import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.lang.*; @@ -142,8 +143,9 @@ public interface DiscoverySpi extends IgniteSpi { /** * Sends custom message across the ring. * @param evt Event. + * @throws IgniteException if failed to marshal evt. */ - public void sendCustomEvent(DiscoveryCustomMessage evt); + public void sendCustomEvent(DiscoveryCustomMessage evt) throws IgniteException; /** * Initiates failure of provided node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 5a78f9f..378d5a3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -42,6 +42,7 @@ import javax.cache.*; import javax.cache.configuration.*; import javax.cache.event.*; import javax.cache.integration.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -57,7 +58,7 @@ import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.* /** * Continuous queries tests. */ -public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest { +public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest implements Serializable { /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -177,10 +178,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size()); - assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStartAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size()); - assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); - assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java index 05fb52b..d1b25df 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java @@ -539,7 +539,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { try { id = msg.remoteListen(null, new MessageListener()); - msgLatch = new CountDownLatch(4); + msgLatch = new CountDownLatch(2); msg.send(null, "Message 1"); @@ -550,7 +550,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { checkNodes(3, 3); - msgLatch = new CountDownLatch(6); + msgLatch = new CountDownLatch(3); msg.send(null, "Message 2");