Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 499D618624 for ; Thu, 16 Jul 2015 22:59:46 +0000 (UTC) Received: (qmail 92034 invoked by uid 500); 16 Jul 2015 22:59:46 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 92003 invoked by uid 500); 16 Jul 2015 22:59:46 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 91994 invoked by uid 99); 16 Jul 2015 22:59:46 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jul 2015 22:59:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A23AEC0C9B for ; Thu, 16 Jul 2015 22:59:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Ajg9aDQJH9Ih for ; Thu, 16 Jul 2015 22:59:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 0C6492C6A1 for ; Thu, 16 Jul 2015 22:59:17 +0000 (UTC) Received: (qmail 88266 invoked by uid 99); 16 Jul 2015 22:58:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jul 2015 22:58:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BEA45E6832; Thu, 16 Jul 2015 22:58:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sevdokimov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 16 Jul 2015 22:58:35 -0000 Message-Id: <0b94055ca45f49ae97bef35d6b64f258@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] incubator-ignite git commit: # ignite-901 client reconnect support http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index dd04bf4..daa9494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -153,21 +153,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(Event evt) { - for (Iterator itr = startFuts.values().iterator(); itr.hasNext(); ) { - StartFuture fut = itr.next(); - - itr.remove(); - - fut.onDone(new IgniteException("Topology segmented")); - } - - for (Iterator itr = stopFuts.values().iterator(); itr.hasNext(); ) { - StopFuture fut = itr.next(); - - itr.remove(); - - fut.onDone(new IgniteException("Topology segmented")); - } + cancelFutures(new IgniteCheckedException("Topology segmented")); } }, EVT_NODE_SEGMENTED); @@ -263,6 +249,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param e Error. + */ + private void cancelFutures(IgniteCheckedException e) { + for (Iterator itr = startFuts.values().iterator(); itr.hasNext(); ) { + StartFuture fut = itr.next(); + + itr.remove(); + + fut.onDone(e); + } + + for (Iterator itr = stopFuts.values().iterator(); itr.hasNext(); ) { + StopFuture fut = itr.next(); + + itr.remove(); + + fut.onDone(e); + } + } + + /** * @return {@code true} if lock successful, {@code false} if processor already stopped. */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") @@ -318,27 +325,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { - if (!nodeId.equals(ctx.localNodeId())) { + if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); - // Collect listeners information (will be sent to - // joining node during discovery process). + // Collect listeners information (will be sent to joining node during discovery process). for (Map.Entry e : locInfos.entrySet()) { UUID routineId = e.getKey(); LocalRoutineInfo info = e.getValue(); - data.addItem(new DiscoveryDataItem(routineId, info.prjPred, - info.hnd, info.bufSize, info.interval)); + data.addItem(new DiscoveryDataItem(routineId, + info.prjPred, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe)); } return data; } - else - return null; + + return null; } /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable obj) { + @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) { DiscoveryData data = (DiscoveryData)obj; if (!ctx.isDaemon() && data != null) { @@ -377,6 +387,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * Callback invoked when cache is started. * * @param ctx Cache context. + * @throws IgniteCheckedException If failed. */ public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { for (Map.Entry entry : rmtInfos.entrySet()) { @@ -491,7 +502,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } // Register routine locally. - locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval)); + locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe)); StartFuture fut = new StartFuture(ctx, routineId); @@ -500,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData)); } - catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter. + catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter. startFuts.remove(routineId); locInfos.remove(routineId); @@ -565,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Unregister handler locally. unregisterHandler(routineId, routine.hnd, true); - ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + try { + ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } if (ctx.isStopping()) fut.onDone(); @@ -580,6 +596,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param obj Notification object. * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent. * @param sync If {@code true} then waits for event acknowledgment. + * @param msg If {@code true} then sent data is message. * @throws IgniteCheckedException In case of error. */ public void addNotification(UUID nodeId, @@ -630,6 +647,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected.")); + + for (UUID rmtId : rmtInfos.keySet()) + unregisterRemote(rmtId); + + rmtInfos.clear(); + + clientInfos.clear(); + } + /** * @param nodeId Node ID. * @param routineId Routine ID. @@ -637,6 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param toSnd Notification object to send. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param msg If {@code true} then sent data is collection of messages. * @throws IgniteCheckedException In case of error. */ private void sendNotification(UUID nodeId, @@ -703,8 +733,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { assert old == null; } - clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(), - data.interval())); + clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), + hnd, + data.bufferSize(), + data.interval(), + data.autoUnsubscribe())); } boolean registered = false; @@ -1022,14 +1055,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Time interval. */ private final long interval; + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + /** * @param prjPred Projection predicate. * @param hnd Continuous routine handler. * @param bufSize Buffer size. * @param interval Interval. + * @param autoUnsubscribe Automatic unsubscribe flag. */ - LocalRoutineInfo(@Nullable IgnitePredicate prjPred, GridContinuousHandler hnd, int bufSize, - long interval) { + LocalRoutineInfo(@Nullable IgnitePredicate prjPred, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe) + { assert hnd != null; assert bufSize > 0; assert interval >= 0; @@ -1038,6 +1079,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { this.hnd = hnd; this.bufSize = bufSize; this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; } /** @@ -1046,6 +1088,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { GridContinuousHandler handler() { return hnd; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(LocalRoutineInfo.class, this); + } } /** @@ -1053,7 +1100,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { */ private static class RemoteRoutineInfo { /** Master node ID. */ - private final UUID nodeId; + private UUID nodeId; /** Continuous routine handler. */ private final GridContinuousHandler hnd; @@ -1205,6 +1252,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return F.t(toSnd, diff < interval ? interval - diff : interval); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemoteRoutineInfo.class, this); + } } /** @@ -1221,6 +1273,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @GridToStringInclude private Collection items; + /** */ private Map> clientInfos; /** @@ -1232,6 +1285,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID. + * @param clientInfos Client information. */ DiscoveryData(UUID nodeId, Map> clientInfos) { assert nodeId != null; @@ -1308,9 +1362,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param hnd Handler. * @param bufSize Buffer size. * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. */ - DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate prjPred, - GridContinuousHandler hnd, int bufSize, long interval) { + DiscoveryDataItem(UUID routineId, + @Nullable IgnitePredicate prjPred, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe) + { assert routineId != null; assert hnd != null; assert bufSize > 0; @@ -1321,6 +1381,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { this.hnd = hnd; this.bufSize = bufSize; this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 54478f8..4f75e0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.stream.*; import org.apache.ignite.thread.*; @@ -63,13 +64,15 @@ public class DataStreamProcessor extends GridProcessorAdapter { public DataStreamProcessor(GridKernalContext ctx) { super(ctx); - ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof DataStreamerRequest; + if (!ctx.clientNode()) { + ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof DataStreamerRequest; - processRequest(nodeId, (DataStreamerRequest)msg); - } - }); + processRequest(nodeId, (DataStreamerRequest)msg); + } + }); + } marsh = ctx.config().getMarshaller(); } @@ -113,7 +116,8 @@ public class DataStreamProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; - ctx.io().removeMessageListener(TOPIC_DATASTREAM); + if (!ctx.clientNode()) + ctx.io().removeMessageListener(TOPIC_DATASTREAM); busyLock.block(); @@ -139,6 +143,12 @@ public class DataStreamProcessor extends GridProcessorAdapter { log.debug("Stopped data streamer processor."); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + for (DataStreamerImpl ldr : ldrs) + ldr.onDisconnected(reconnectFut); + } + /** * @param cacheName Cache name ({@code null} for default cache). * @return Data loader. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 26b0568..605f478 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -145,6 +145,9 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** */ + private CacheException disconnectErr; + /** Closed flag. */ private final AtomicBoolean closed = new AtomicBoolean(); @@ -245,7 +248,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed fut = new DataStreamerFuture(this); - publicFut = new IgniteFutureImpl<>(fut); + publicFut = new IgniteCacheFutureImpl<>(fut); } /** @@ -284,8 +287,12 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed * Enters busy lock. */ private void enterBusy() { - if (!busyLock.enterBusy()) + if (!busyLock.enterBusy()) { + if (disconnectErr != null) + throw disconnectErr; + throw new IllegalStateException("Data streamer has been closed."); + } } /** @@ -435,7 +442,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed load0(entries0, resFut, keys, 0); - return new IgniteFutureImpl<>(resFut); + return new IgniteCacheFutureImpl<>(resFut); } catch (IgniteException e) { return new IgniteFinishedFutureImpl<>(e); @@ -487,7 +494,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed load0(entries, resFut, keys, 0); - return new IgniteFutureImpl<>(resFut); + return new IgniteCacheFutureImpl<>(resFut); } catch (Throwable e) { resFut.onDone(e); @@ -631,6 +638,12 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed resFut.onDone(); } } + catch (IgniteClientDisconnectedCheckedException e1) { + if (log.isDebugEnabled()) + log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']'); + + resFut.onDone(e1); + } catch (IgniteCheckedException e1) { if (log.isDebugEnabled()) log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); @@ -757,6 +770,12 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed try { fut.get(); } + catch (IgniteClientDisconnectedCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to flush buffer: " + e); + + throw CU.convertToCacheException(e); + } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to flush buffer: " + e); @@ -802,7 +821,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed doFlush(); } catch (IgniteCheckedException e) { - throw GridCacheUtils.convertToCacheException(e); + throw CU.convertToCacheException(e); } finally { leaveBusy(); @@ -843,7 +862,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed closeEx(cancel); } catch (IgniteCheckedException e) { - throw GridCacheUtils.convertToCacheException(e); + throw CU.convertToCacheException(e); } } @@ -852,6 +871,15 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed * @throws IgniteCheckedException If failed. */ public void closeEx(boolean cancel) throws IgniteCheckedException { + closeEx(cancel, null); + } + + /** + * @param cancel {@code True} to close with cancellation. + * @param err Error. + * @throws IgniteCheckedException If failed. + */ + public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException { if (!closed.compareAndSet(false, true)) return; @@ -868,7 +896,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed cancelled = true; for (Buffer buf : bufMappings.values()) - buf.cancelAll(); + buf.cancelAll(err); } else doFlush(); @@ -881,13 +909,29 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed e = e0; } - fut.onDone(null, e); + fut.onDone(null, e != null ? e : err); if (e != null) throw e; } /** + * @param reconnectFut Reconnect future. + * @throws IgniteCheckedException If failed. + */ + public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Data streamer has been closed, client node disconnected."); + + disconnectErr = (CacheException)CU.convertToCacheException(err); + + for (Buffer buf : bufMappings.values()) + buf.cancelAll(err); + + closeEx(true, err); + } + + /** * @return {@code true} If the loader is closed. */ boolean isClosed() { @@ -1027,7 +1071,11 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed submit(entries0, topVer, curFut0); if (cancelled) - curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this)); + curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + + DataStreamerImpl.this)); + else if (ctx.clientDisconnected()) + curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Client node disconnected.")); } return curFut0; @@ -1227,11 +1275,18 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); } catch (IgniteCheckedException e) { - if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) - ((GridFutureAdapter)fut).onDone(e); - else - ((GridFutureAdapter)fut).onDone(new ClusterTopologyCheckedException("Failed to send " + - "request (node has left): " + node.id())); + GridFutureAdapter fut0 = ((GridFutureAdapter)fut); + + try { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + fut0.onDone(e); + else + fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " + + node.id())); + } + catch (IgniteClientDisconnectedCheckedException e0) { + fut0.onDone(e0); + } } } } @@ -1304,10 +1359,11 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } /** - * + * @param err Error. */ - void cancelAll() { - IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this); + void cancelAll(@Nullable IgniteCheckedException err) { + if (err == null) + err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this); for (IgniteInternalFuture f : locFuts) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 5c171e8..57b16f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -185,6 +185,32 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * @param key Key. + * @param obj Object. + */ + void onRemoved(GridCacheInternal key, GridCacheRemovable obj) { + dsMap.remove(key, obj); + } + + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + for (Map.Entry e : dsMap.entrySet()) { + GridCacheRemovable obj = e.getValue(); + + if (clusterRestarted) { + obj.onRemoved(); + + dsMap.remove(e.getKey(), obj); + } + else + obj.needCheckNotRemoved(); + } + + for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) + cctx.dataStructures().onReconnected(clusterRestarted); + } + + /** * Gets a sequence from cache or creates one if it's not cached. * * @param name Sequence name. @@ -1001,8 +1027,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.put(key, val); } - latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), - val.autoDelete(), key, cntDownLatchView, dsCacheCtx); + latch = new GridCacheCountDownLatchImpl(name, val.initialCount(), + val.autoDelete(), + key, + cntDownLatchView, + dsCacheCtx); dsMap.put(key, latch); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 5e9245d..1d6e735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -57,6 +57,9 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Removed flag.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic long key. */ private GridCacheInternalKey key; @@ -336,7 +339,31 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic long was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic long was removed from cache: " + name); } /** {@inheritDoc} */ @@ -345,8 +372,8 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 0c4e5e6..f740c4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; import java.io.*; import java.util.concurrent.*; @@ -56,6 +55,9 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef /** Status.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic reference key. */ private GridCacheInternalKey key; @@ -156,8 +158,8 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ @@ -293,7 +295,31 @@ public final class GridCacheAtomicReferenceImpl implements GridCacheAtomicRef */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic reference was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic reference was removed from cache: " + name); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 2400a7e..31f4f24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -61,6 +61,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** Removed flag. */ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Sequence key. */ private GridCacheInternalKey key; @@ -391,7 +394,31 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Sequence was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = seqView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Sequence was removed from cache: " + name); } /** {@inheritDoc} */ @@ -400,8 +427,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 76ea7ca..d2dedeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -59,6 +59,9 @@ public final class GridCacheAtomicStampedImpl implements GridCacheAtomicSt /** Removed flag.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic stamped key. */ private GridCacheInternalKey key; @@ -206,8 +209,8 @@ public final class GridCacheAtomicStampedImpl implements GridCacheAtomicSt } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ @@ -369,7 +372,31 @@ public final class GridCacheAtomicStampedImpl implements GridCacheAtomicSt */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic stamped was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic stamped was removed from cache: " + name); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 85b6cfd..95b970a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -67,9 +67,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** Cache context. */ private GridCacheContext ctx; - /** Current count. */ - private int cnt; - /** Initial count. */ private int initCnt; @@ -96,7 +93,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc * Constructor. * * @param name Latch name. - * @param cnt Current count. * @param initCnt Initial count. * @param autoDel Auto delete flag. * @param key Latch key. @@ -104,7 +100,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc * @param ctx Cache context. */ public GridCacheCountDownLatchImpl(String name, - int cnt, int initCnt, boolean autoDel, GridCacheInternalKey key, @@ -112,14 +107,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc GridCacheContext ctx) { assert name != null; - assert cnt >= 0; assert initCnt >= 0; assert key != null; assert latchView != null; assert ctx != null; this.name = name; - this.cnt = cnt; this.initCnt = initCnt; this.autoDel = autoDel; this.key = key; @@ -136,7 +129,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public int count() { - return cnt; + try { + return CU.outTx(new GetCountCallable(), ctx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @@ -207,13 +205,11 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public boolean onRemoved() { - assert cnt == 0; - return rmvd = true; } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { + @Override public void needCheckNotRemoved() { // No-op. } @@ -231,8 +227,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc @Override public void onUpdate(int cnt) { assert cnt >= 0; - this.cnt = cnt; - while (internalLatch != null && internalLatch.getCount() > cnt) internalLatch.countDown(); } @@ -253,9 +247,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc if (log.isDebugEnabled()) log.debug("Failed to find count down latch with given name: " + name); - assert cnt == 0; - - return new CountDownLatch(cnt); + return new CountDownLatch(0); } tx.commit(); @@ -337,6 +329,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** * */ + private class GetCountCallable implements Callable { + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + Integer val; + + try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue latchVal = latchView.get(key); + + if (latchVal == null) + return 0; + + val = latchVal.get(); + + tx.rollback(); + } + + return val; + } + } + + /** + * + */ private class CountDownCallable implements Callable { /** Value to count down on (if 0 then latch is counted down to 0). */ private final int val; @@ -359,9 +374,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc if (log.isDebugEnabled()) log.debug("Failed to find count down latch with given name: " + name); - assert cnt == 0; - - return cnt; + return 0; } int retVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java index 48d8644..dd4f2cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.datastructures; -import org.jetbrains.annotations.*; - /** * Provides callback for marking object as removed. */ @@ -31,7 +29,7 @@ public interface GridCacheRemovable { public boolean onRemoved(); /** - * @param err Error which cause data structure to become invalid. + * */ - public void onInvalid(@Nullable Exception err); + public void needCheckNotRemoved(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f74fe95..6d920fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -101,6 +101,19 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite return rmvd; } + /** + * @return {@code True} if set header found in cache. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public boolean checkHeader() throws IgniteCheckedException { + IgniteInternalCache cache0 = ctx.cache(); + + GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name)); + + return hdr != null && hdr.id().equals(id); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public int size() { @@ -476,7 +489,7 @@ public class GridCacheSetImpl extends AbstractCollection implements Ignite /** * @return Set ID. */ - IgniteUuid id() { + public IgniteUuid id() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index ba43da7..90c26f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -57,6 +57,9 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { /** Busy lock. */ private GridSpinBusyLock busyLock; + /** Check removed flag. */ + private boolean rmvCheck; + /** * Required by {@link Externalizable}. */ @@ -78,6 +81,13 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { } /** + * @return Set delegate. + */ + public GridCacheSetImpl delegate() { + return delegate; + } + + /** * Remove callback. */ public void blockOnRemove() { @@ -510,8 +520,43 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { * Enters busy state. */ private void enterBusy() { + boolean rmvd; + + if (rmvCheck) { + try { + rmvd = !delegate().checkHeader(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + delegate.removed(true); + + cctx.dataStructures().onRemoved(this); + + throw removedError(); + } + } + if (!busyLock.enterBusy()) - throw new IllegalStateException("Set has been removed from cache: " + delegate); + throw removedError(); + } + + /** + * + */ + public void needCheckNotRemoved() { + rmvCheck = true; + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Set has been removed from cache: " + delegate); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 48e9686..350068a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter { * @return {@code true} if node is dead, {@code false} is node is alive. */ private boolean isDeadNode(UUID uid) { - return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index d1ee5ad..3a309f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { * @return {@code true} if node is dead, {@code false} is node is alive. */ private boolean isDeadNode(UUID uid) { - return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 0cbb77a..8639bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -246,4 +246,11 @@ public interface GridQueryIndexing { * @return Backup filter. */ public IndexingQueryFilter backupFilter(List caches, AffinityTopologyVersion topVer, int[] parts); + + /** + * Client disconnected callback. + * + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture reconnectFut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 1ba1fae..f3ad4b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -234,6 +234,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { idx.stop(); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + if (idx != null) + idx.onDisconnected(reconnectFut); + } + /** * @param cctx Cache context. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index bb451c7..78b09e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache = ctx.cache().utilityCache(); - ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); + if (!ctx.clientNode()) + ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); try { if (ctx.deploy().enabled()) @@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { busyLock.block(); - ctx.event().removeLocalEventListener(topLsnr); + if (!ctx.clientNode()) + ctx.event().removeLocalEventListener(topLsnr); if (cfgQryId != null) cache.context().continuousQueries().cancelInternalQuery(cfgQryId); @@ -209,6 +211,27 @@ public class GridServiceProcessor extends GridProcessorAdapter { log.debug("Stopped service processor."); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + for (Map.Entry e : depFuts.entrySet()) { + GridServiceDeploymentFuture fut = e.getValue(); + + fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to deploy service, client node disconnected.")); + + depFuts.remove(e.getKey(), fut); + } + + for (Map.Entry> e : undepFuts.entrySet()) { + GridFutureAdapter fut = e.getValue(); + + fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to undeploy service, client node disconnected.")); + + undepFuts.remove(e.getKey(), fut); + } + } + /** * Validates service configuration. * @@ -328,6 +351,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { return old; } + if (ctx.clientDisconnected()) { + fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to deploy service, client node disconnected.")); + + depFuts.remove(cfg.getName(), fut); + } + while (true) { try { GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName()); @@ -646,10 +676,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } else { - Collection nodes = - assigns.nodeFilter() == null ? - ctx.discovery().nodes(topVer) : - F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter()); + Collection nodes = assigns.nodeFilter() == null ? + ctx.discovery().nodes(topVer) : + F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter()); if (!nodes.isEmpty()) { int size = nodes.size(); @@ -1019,7 +1048,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache.getAndRemove(key); } catch (IgniteCheckedException ex) { - log.error("Failed to remove assignments for undeployed service: " + name, ex); + U.error(log, "Failed to remove assignments for undeployed service: " + name, ex); } } } @@ -1164,7 +1193,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } catch (IgniteCheckedException ex) { - log.error("Failed to clean up zombie assignments for service: " + name, ex); + U.error(log, "Failed to clean up zombie assignments for service: " + name, ex); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index 8e13bc4..556beea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -68,9 +68,15 @@ class GridServiceProxy implements Serializable { * @param name Service name. * @param svc Service type class. * @param sticky Whether multi-node request should be done. + * @param ctx Context. */ - @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class svc, - boolean sticky, GridKernalContext ctx) { + @SuppressWarnings("unchecked") + GridServiceProxy(ClusterGroup prj, + String name, + Class svc, + boolean sticky, + GridKernalContext ctx) + { this.prj = prj; this.ctx = ctx; hasLocNode = hasLocalNode(prj); @@ -159,6 +165,9 @@ class GridServiceProxy implements Serializable { catch (RuntimeException | Error e) { throw e; } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } catch (Exception e) { throw new IgniteException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index d59a51d..d3caf5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@ -76,8 +76,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { private final LongAdder8 execTasks = new LongAdder8(); /** */ - private final ThreadLocal> thCtx = - new ThreadLocal<>(); + private final ThreadLocal> thCtx = new ThreadLocal<>(); /** */ private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); @@ -119,6 +118,24 @@ public class GridTaskProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); + + for (GridTaskWorker worker : tasks.values()) + worker.finishTask(null, err); + } + + /** + * @param reconnectFut Reconnect future. + * @return Client disconnected exception. + */ + private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture reconnectFut) { + return new IgniteClientDisconnectedCheckedException( + reconnectFut != null ? reconnectFut : ctx.cluster().clientReconnectFuture(), + "Failed to execute task, client node disconnected."); + } + + /** {@inheritDoc} */ @SuppressWarnings("TooBroadScope") @Override public void onKernalStop(boolean cancel) { lock.writeLock(); @@ -552,7 +569,7 @@ public class GridTaskProcessor extends GridProcessorAdapter { // Creates task session with task name and task version. GridTaskSessionImpl ses = ctx.session().createTaskSession( sesId, - ctx.config().getNodeId(), + ctx.localNodeId(), taskName, dep, taskCls == null ? null : taskCls.getName(), @@ -597,25 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter { assert taskWorker0 == null : "Session ID is not unique: " + sesId; - if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { - try { - // Start task execution in another thread. - if (sys) - ctx.getSystemExecutorService().execute(taskWorker); - else - ctx.getExecutorService().execute(taskWorker); - } - catch (RejectedExecutionException e) { - tasks.remove(sesId); + if (!ctx.clientDisconnected()) { + if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { + try { + // Start task execution in another thread. + if (sys) + ctx.getSystemExecutorService().execute(taskWorker); + else + ctx.getExecutorService().execute(taskWorker); + } + catch (RejectedExecutionException e) { + tasks.remove(sesId); - release(dep); + release(dep); - handleException(new ComputeExecutionRejectedException("Failed to execute task " + - "due to thread pool execution rejection: " + taskName, e), fut); + handleException(new ComputeExecutionRejectedException("Failed to execute task " + + "due to thread pool execution rejection: " + taskName, e), fut); + } } + else + taskWorker.run(); } else - taskWorker.run(); + taskWorker.finishTask(null, disconnectedError(null)); } } else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index eb5fa77..133a31f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1070,10 +1070,17 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { PUBLIC_POOL); } catch (IgniteCheckedException e) { - if (!isDeadNode(nodeId)) - U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" + - nodeId + ", taskName=" + ses.getTaskName() + - ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e); + try { + if (!isDeadNode(nodeId)) + U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" + + nodeId + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e); + } + catch (IgniteClientDisconnectedCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send cancel request to node, client disconnected [nodeId=" + + nodeId + ", taskName=" + ses.getTaskName() + ']'); + } } } } @@ -1169,24 +1176,39 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { } } catch (IgniteCheckedException e) { - boolean deadNode = isDeadNode(res.getNode().id()); + IgniteException fakeErr = null; - // Avoid stack trace if node has left grid. - if (deadNode) - U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " + - "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + - ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); - else - U.error(log, "Failed to send job request: " + req, e); + try { + boolean deadNode = isDeadNode(res.getNode().id()); + + // Avoid stack trace if node has left grid. + if (deadNode) { + U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " + + "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); + + fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e); + } + else + U.error(log, "Failed to send job request: " + req, e); + + } + catch (IgniteClientDisconnectedCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send job request, client disconnected [node=" + node + + ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" + + res.getJobContext().getJobId() + ']'); + + fakeErr = U.convertException(e0); + } GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), res.getJobContext().getJobId(), null, null, null, null, null, null, false); - if (deadNode) - fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + - node, e)); - else - fakeRes.setFakeException(U.convertException(e)); + if (fakeErr == null) + fakeErr = U.convertException(e); + + fakeRes.setFakeException(fakeErr); onResponse(fakeRes); } @@ -1345,8 +1367,9 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { * * @param uid UID of node to check. * @return {@code true} if node is dead, {@code false} is node is alive. + * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected. */ - private boolean isDeadNode(UUID uid) { + private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException { return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f457d6c..66eb596 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -626,6 +626,15 @@ public abstract class IgniteUtils { } }); + m.put(IgniteClientDisconnectedCheckedException.class, new C1() { + @Override public IgniteException apply(IgniteCheckedException e) { + return new IgniteClientDisconnectedException( + ((IgniteClientDisconnectedCheckedException)e).reconnectFuture(), + e.getMessage(), + e); + } + }); + return m; } @@ -673,6 +682,25 @@ public abstract class IgniteUtils { * @return Ignite runtime exception. */ public static IgniteException convertException(IgniteCheckedException e) { + IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class); + + if (e0 != null) { + assert e0.reconnectFuture() != null : e0; + + throw e0; + } + + IgniteClientDisconnectedCheckedException disconnectedErr = + e instanceof IgniteClientDisconnectedCheckedException ? + (IgniteClientDisconnectedCheckedException)e + : e.getCause(IgniteClientDisconnectedCheckedException.class); + + if (disconnectedErr != null) { + assert disconnectedErr.reconnectFuture() != null : disconnectedErr; + + e = disconnectedErr; + } + C1 converter = exceptionConverters.get(e.getClass()); if (converter != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java index c935c4a..a4f7804 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java @@ -59,8 +59,9 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint { * @param outSpace Out space. * @param parent Parent logger. */ - public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace, IpcSharedMemorySpace outSpace, - IgniteLogger parent) { + public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace, + IpcSharedMemorySpace outSpace, + IgniteLogger parent) { assert inSpace != null; assert outSpace != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 6f544e0..f3bcab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -1570,6 +1570,7 @@ public class GridFunc { * @param Type of the collection. * @return Light-weight view on given collection with provided predicate. */ + @SafeVarargs public static Collection view(@Nullable final Collection c, @Nullable final IgnitePredicate... p) { if (isEmpty(c) || isAlwaysFalse(p)) @@ -2706,6 +2707,7 @@ public class GridFunc { * @param Type of the free variable, i.e. the element the predicate is called on. * @return Negated predicate. */ + @SafeVarargs public static IgnitePredicate not(@Nullable final IgnitePredicate... p) { return isAlwaysFalse(p) ? F.alwaysTrue() : isAlwaysTrue(p) ? F.alwaysFalse() : new P1() { @Override public boolean apply(T t) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java index 968d88d..0f6ed5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.util.*; @@ -106,4 +107,18 @@ public interface IgniteSpi { * @throws IgniteSpiException Thrown in case of any error during SPI stop. */ public void spiStop() throws IgniteSpiException; + + /** + * Client node disconnected callback. + * + * @param reconnectFut Future that will be completed when client reconnected. + */ + public void onClientDisconnected(IgniteFuture reconnectFut); + + /** + * Client node reconnected callback. + * + * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. + */ + public void onClientReconnected(boolean clusterRestarted); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 5e557bd..07b39bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.resources.*; @@ -58,9 +59,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Ignite instance. */ protected Ignite ignite; - /** Local node id. */ - protected UUID nodeId; - /** Grid instance name. */ protected String gridName; @@ -73,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Discovery listener. */ private GridLocalEventListener paramsLsnr; + /** Local node. */ + private ClusterNode locNode; + /** * Creates new adapter and initializes it from the current (this) class. * SPI name will be initialized to the simple name of the class @@ -111,7 +112,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** {@inheritDoc} */ @Override public UUID getLocalNodeId() { - return nodeId; + return ignite.cluster().localNode().id(); + } + + /** + * @return Local node. + */ + protected ClusterNode getLocalNode() { + if (locNode != null) + return locNode; + + locNode = getSpiContext().localNode(); + + return locNode; } /** {@inheritDoc} */ @@ -194,17 +207,27 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement spiCtx = new GridDummySpiContext(locNode, true, spiCtx); } + /** {@inheritDoc} */ + @Override public void onClientDisconnected(IgniteFuture reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onClientReconnected(boolean clusterRestarted) { + // No-op. + } + /** * Inject ignite instance. + * + * @param ignite Ignite instance. */ @IgniteInstanceResource protected void injectResources(Ignite ignite) { this.ignite = ignite; - if (ignite != null) { - nodeId = ignite.configuration().getNodeId(); + if (ignite != null) gridName = ignite.name(); - } } /**