Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CEEC717453 for ; Fri, 17 Jul 2015 06:28:47 +0000 (UTC) Received: (qmail 25018 invoked by uid 500); 17 Jul 2015 06:28:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 24986 invoked by uid 500); 17 Jul 2015 06:28:44 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 24976 invoked by uid 99); 17 Jul 2015 06:28:44 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 06:28:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1AF8BC0098 for ; Fri, 17 Jul 2015 06:28:44 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.771 X-Spam-Level: X-Spam-Status: No, score=0.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ZKyNCbdgZ0wy for ; Fri, 17 Jul 2015 06:28:29 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 78F8B4E221 for ; Fri, 17 Jul 2015 06:28:14 +0000 (UTC) Received: (qmail 22466 invoked by uid 99); 17 Jul 2015 06:28:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 06:28:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3397E6830; Fri, 17 Jul 2015 06:28:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 17 Jul 2015 06:28:53 -0000 Message-Id: <7162355b23504a709496651a4a341729@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] incubator-ignite git commit: # ignite-901 client reconnect support # ignite-901 client reconnect support Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/57ac2b3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/57ac2b3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/57ac2b3b Branch: refs/heads/ignite-752 Commit: 57ac2b3bf437c037904624d411fd89b28b22c944 Parents: aef4063 Author: sboikov Authored: Thu Jul 16 13:06:04 2015 +0300 Committer: sboikov Committed: Thu Jul 16 13:06:05 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientDisconnectedException.java | 61 + .../java/org/apache/ignite/IgniteCluster.java | 5 + .../apache/ignite/internal/GridComponent.java | 18 + .../ignite/internal/GridJobSiblingImpl.java | 2 +- .../ignite/internal/GridKernalContext.java | 5 + .../ignite/internal/GridKernalContextImpl.java | 31 +- .../ignite/internal/GridKernalGateway.java | 46 +- .../ignite/internal/GridKernalGatewayImpl.java | 85 +- .../apache/ignite/internal/GridKernalState.java | 3 + .../ignite/internal/GridPluginComponent.java | 11 + ...gniteClientDisconnectedCheckedException.java | 49 + .../apache/ignite/internal/IgniteKernal.java | 222 +++- .../cluster/IgniteClusterAsyncImpl.java | 5 + .../internal/cluster/IgniteClusterImpl.java | 18 + .../internal/managers/GridManagerAdapter.java | 19 +- .../deployment/GridDeploymentCommunication.java | 2 +- .../deployment/GridDeploymentManager.java | 95 +- .../discovery/GridDiscoveryManager.java | 163 ++- .../processors/GridProcessorAdapter.java | 11 + .../affinity/GridAffinityAssignmentCache.java | 26 +- .../cache/CacheOsConflictResolutionManager.java | 6 + .../cache/DynamicCacheChangeBatch.java | 17 + .../processors/cache/GridCacheAdapter.java | 25 +- .../cache/GridCacheAffinityManager.java | 21 +- .../cache/GridCacheConcurrentMap.java | 15 +- .../processors/cache/GridCacheGateway.java | 116 +- .../processors/cache/GridCacheIoManager.java | 8 + .../processors/cache/GridCacheManager.java | 6 + .../cache/GridCacheManagerAdapter.java | 6 + .../processors/cache/GridCacheMvccManager.java | 41 +- .../GridCachePartitionExchangeManager.java | 81 +- .../processors/cache/GridCachePreloader.java | 5 + .../cache/GridCachePreloaderAdapter.java | 5 + .../processors/cache/GridCacheProcessor.java | 311 ++++- .../cache/GridCacheSharedContext.java | 113 +- .../cache/GridCacheSharedManager.java | 11 +- .../cache/GridCacheSharedManagerAdapter.java | 20 +- .../processors/cache/GridCacheUtils.java | 11 + .../processors/cache/IgniteCacheFutureImpl.java | 5 + .../processors/cache/IgniteCacheProxy.java | 2 +- .../CacheDataStructuresManager.java | 35 + .../distributed/GridCacheTxFinishSync.java | 46 + .../distributed/dht/GridDhtCacheAdapter.java | 14 +- .../dht/GridDhtPartitionTopologyImpl.java | 24 + .../dht/GridPartitionedGetFuture.java | 13 +- .../dht/preloader/GridDhtPreloader.java | 16 +- .../distributed/near/GridNearCacheAdapter.java | 8 + .../distributed/near/GridNearGetFuture.java | 13 +- .../cache/dr/GridOsCacheDrManager.java | 7 +- .../query/GridCacheDistributedQueryManager.java | 22 + .../cache/query/GridCacheQueryAdapter.java | 11 +- .../query/GridCacheQueryFutureAdapter.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 5 + .../transactions/IgniteTransactionsImpl.java | 59 +- .../cache/transactions/IgniteTxManager.java | 19 +- .../transactions/TransactionProxyImpl.java | 2 +- .../cache/version/GridCacheVersionManager.java | 9 +- .../clock/GridClockSyncProcessor.java | 6 +- .../processors/cluster/ClusterProcessor.java | 11 + .../continuous/GridContinuousHandler.java | 9 +- .../continuous/GridContinuousProcessor.java | 127 +- .../datastreamer/DataStreamProcessor.java | 24 +- .../datastreamer/DataStreamerImpl.java | 90 +- .../datastructures/DataStructuresProcessor.java | 33 +- .../datastructures/GridCacheAtomicLongImpl.java | 33 +- .../GridCacheAtomicReferenceImpl.java | 34 +- .../GridCacheAtomicSequenceImpl.java | 33 +- .../GridCacheAtomicStampedImpl.java | 33 +- .../GridCacheCountDownLatchImpl.java | 51 +- .../datastructures/GridCacheRemovable.java | 6 +- .../datastructures/GridCacheSetImpl.java | 15 +- .../datastructures/GridCacheSetProxy.java | 47 +- .../processors/job/GridJobProcessor.java | 2 +- .../internal/processors/job/GridJobWorker.java | 2 +- .../processors/query/GridQueryIndexing.java | 7 + .../processors/query/GridQueryProcessor.java | 6 + .../service/GridServiceProcessor.java | 45 +- .../processors/service/GridServiceProxy.java | 13 +- .../processors/task/GridTaskProcessor.java | 55 +- .../processors/task/GridTaskWorker.java | 59 +- .../ignite/internal/util/IgniteUtils.java | 28 + .../shmem/IpcSharedMemoryClientEndpoint.java | 5 +- .../ignite/internal/util/lang/GridFunc.java | 2 + .../java/org/apache/ignite/spi/IgniteSpi.java | 15 + .../org/apache/ignite/spi/IgniteSpiAdapter.java | 37 +- .../communication/tcp/TcpCommunicationSpi.java | 354 ++++-- .../spi/discovery/DiscoverySpiDataExchange.java | 3 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 408 ++++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 134 +- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 9 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 33 +- .../tcp/internal/TcpDiscoveryNode.java | 19 + .../messages/TcpDiscoveryAbstractMessage.java | 3 + .../messages/TcpDiscoveryClientAckResponse.java | 64 + .../messages/TcpDiscoveryHandshakeResponse.java | 14 + .../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +- .../internal/GridUpdateNotifierSelfTest.java | 15 +- .../IgniteClientReconnectAbstractTest.java | 363 ++++++ .../IgniteClientReconnectApiExceptionTest.java | 846 ++++++++++++ .../IgniteClientReconnectAtomicsTest.java | 672 ++++++++++ .../IgniteClientReconnectCacheTest.java | 1202 ++++++++++++++++++ .../IgniteClientReconnectCollectionsTest.java | 443 +++++++ .../IgniteClientReconnectComputeTest.java | 192 +++ ...eClientReconnectContinuousProcessorTest.java | 372 ++++++ ...IgniteClientReconnectDiscoveryStateTest.java | 123 ++ ...niteClientReconnectFailoverAbstractTest.java | 231 ++++ .../IgniteClientReconnectFailoverTest.java | 212 +++ .../IgniteClientReconnectServicesTest.java | 260 ++++ .../internal/IgniteClientReconnectStopTest.java | 106 ++ .../IgniteClientReconnectStreamerTest.java | 233 ++++ .../IgniteSlowClientDetectionSelfTest.java | 1 + .../GridDeploymentManagerStopSelfTest.java | 7 + .../IgniteCacheAbstractStopBusySelfTest.java | 2 +- .../cache/IgniteCacheDynamicStopSelfTest.java | 6 +- .../IgniteTxExceptionAbstractSelfTest.java | 1 + .../IgniteCacheSystemTransactionsSelfTest.java | 2 +- .../GridCacheReplicatedInvalidateSelfTest.java | 3 +- .../loadtests/hashmap/GridCacheTestContext.java | 4 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 562 +++++++- .../multijvm/IgniteClusterProcessProxy.java | 5 + .../IgniteClientReconnectTestSuite.java | 48 + .../processors/query/h2/IgniteH2Indexing.java | 5 + .../query/h2/twostep/GridMergeIndex.java | 45 +- .../h2/twostep/GridReduceQueryExecutor.java | 70 +- ...ClientReconnectCacheQueriesFailoverTest.java | 225 ++++ .../cache/IgniteClientReconnectQueriesTest.java | 427 +++++++ ...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 + .../IgniteCacheWithIndexingTestSuite.java | 1 + 128 files changed, 9751 insertions(+), 815 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java new file mode 100644 index 0000000..2089db0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +/** + * Exception thrown from Ignite API when client node disconnected from cluster. + */ +public class IgniteClientDisconnectedException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteFuture reconnectFut; + + /** + * @param reconnectFut Reconnect future. + * @param msg Error message. + */ + public IgniteClientDisconnectedException(IgniteFuture reconnectFut, String msg) { + this(reconnectFut, msg, null); + } + + /** + * @param reconnectFut Reconnect future. + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IgniteClientDisconnectedException( + IgniteFuture reconnectFut, + String msg, + @Nullable Throwable cause) { + super(msg, cause); + + this.reconnectFut = reconnectFut; + } + + /** + * @return Future that will be completed when client reconnected. + */ + public IgniteFuture reconnectFuture() { + return reconnectFut; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java index 72be3fb..d3ce0e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java @@ -328,6 +328,11 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { */ public void resetMetrics(); + /** + * @return Future that will be completed when client reconnected. + */ + @Nullable public IgniteFuture clientReconnectFuture(); + /** {@inheritDoc} */ @Override public IgniteCluster withAsync(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index fb227cd..65e0644 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -87,6 +88,7 @@ public interface GridComponent { /** * Receives discovery data object from remote nodes (called * on new node during discovery process). + * * @param joiningNodeId Joining node ID. * @param rmtNodeId Remote node ID for which data is provided. * @param data Discovery data object or {@code null} if nothing was @@ -116,4 +118,20 @@ public interface GridComponent { * @return Unique component type for discovery data exchange. */ @Nullable public DiscoveryDataExchangeType discoveryDataType(); + + /** + * Client disconnected callback. + * + * @param reconnectFut Reconnect future. + * @throws IgniteCheckedException If failed. + */ + public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException; + + /** + * Client reconnected callback. + * + * @param clusterRestarted Cluster restarted flag. + * @throws IgniteCheckedException If failed. + */ + public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java index 62adf52..b4e0f01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java @@ -167,7 +167,7 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable { } catch (IgniteCheckedException e) { // Avoid stack trace for left nodes. - if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNode(node.id())) + if (ctx.discovery().node(node.id()) != null && ctx.discovery().pingNodeNoError(node.id())) U.error(ctx.log(GridJobSiblingImpl.class), "Failed to send cancel request to node " + "[nodeId=" + node.id() + ", ses=" + ses + ']', e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index d6542f3..f4da333 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -557,4 +557,9 @@ public interface GridKernalContext extends Iterable { * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). */ public boolean clientNode(); + + /** + * @return {@code True} if local node in disconnected state. + */ + public boolean clientDisconnected(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 8abb135..fd8b50c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.checkpoint.*; import org.apache.ignite.internal.managers.collision.*; @@ -303,6 +304,12 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** Marshaller context. */ private MarshallerContextImpl marshCtx; + /** */ + private ClusterNode locNode; + + /** */ + private volatile boolean disconnected; + /** * No-arg constructor is required by externalization. */ @@ -325,6 +332,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc REST executor service. + * @param plugins Plugin providers. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("TypeMayBeWeakened") @@ -503,7 +511,13 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** {@inheritDoc} */ @Override public UUID localNodeId() { - return cfg.getNodeId(); + if (locNode != null) + return locNode.id(); + + if (discoMgr != null) + locNode = discoMgr.localNode(); + + return locNode != null ? locNode.id() : config().getNodeId(); } /** {@inheritDoc} */ @@ -903,6 +917,21 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public boolean clientDisconnected() { + if (locNode == null) + locNode = discoMgr != null ? discoMgr.localNode() : null; + + return locNode != null ? (locNode.isClient() && disconnected) : false; + } + + /** + * @param disconnected Disconnected flag. + */ + void disconnected(boolean disconnected) { + this.disconnected = disconnected; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java index 0156136..1d50aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; /** * This interface guards access to implementations of public methods that access kernal @@ -39,22 +41,6 @@ import org.apache.ignite.internal.util.tostring.*; @GridToStringExclude public interface GridKernalGateway { /** - * Performs light-weight check on the kernal state at the moment of this call. - *

- * This method should only be used when the kernal state should be checked just once - * at the beginning of the method and the fact that kernal state can change in the middle - * of such method's execution should not matter. - *

- * For example, when a method returns a constant value its implementation doesn't depend - * on the kernal being valid throughout its execution. In such case it is enough to check - * the kernal's state just once at the beginning of this method to provide consistent behavior - * of the API without incurring overhead of lock-based guard methods. - * - * @throws IllegalStateException Thrown in case when no kernal calls are allowed. - */ - public void lightCheck() throws IllegalStateException; - - /** * Should be called on entering every kernal related call * originated directly or indirectly via public API. *

@@ -113,31 +99,29 @@ public interface GridKernalGateway { public void writeUnlock(); /** - * Adds stop listener. Note that the identity set will be used to store listeners for - * performance reasons. Futures can register a listener to be notified when they need to - * be internally interrupted. + * Gets user stack trace through the first call of grid public API. * - * @param lsnr Listener to add. + * @return User stack trace. */ - public void addStopListener(Runnable lsnr); + public String userStackTrace(); /** - * Removes previously added stop listener. - * - * @param lsnr Listener to remove. + * @param timeout Timeout. + * @return {@code True} if write lock has been acquired. + * @throws InterruptedException If interrupted. */ - public void removeStopListener(Runnable lsnr); + public boolean tryWriteLock(long timeout) throws InterruptedException; /** - * Gets user stack trace through the first call of grid public API. + * Disconnected callback. + * + * @return Reconnect future. */ - public String userStackTrace(); + @Nullable public GridFutureAdapter onDisconnected(); /** - * @param timeout Timeout. - * @return {@code True} if write lock has been acquired. - * @throws InterruptedException If interrupted. + * Reconnected callback. */ - public boolean tryWriteLock(long timeout) throws InterruptedException; + public void onReconnected(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index 35bbbed..f6a9e51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -17,13 +17,15 @@ package org.apache.ignite.internal; +import org.apache.ignite.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; -import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * @@ -39,10 +41,10 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { /** */ @GridToStringExclude - private final Collection lsnrs = new GridSetWrapper<>(new IdentityHashMap()); + private IgniteFutureImpl reconnectFut; /** */ - private volatile GridKernalState state = GridKernalState.STOPPED; + private final AtomicReference state = new AtomicReference<>(GridKernalState.STOPPED); /** */ @GridToStringExclude @@ -63,12 +65,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { } /** {@inheritDoc} */ - @Override public void lightCheck() throws IllegalStateException { - if (state != GridKernalState.STARTED) - throw illegalState(); - } - - /** {@inheritDoc} */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "BusyWait"}) @Override public void readLock() throws IllegalStateException { if (stackTrace == null) @@ -76,10 +72,18 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readLock(); + GridKernalState state = this.state.get(); + if (state != GridKernalState.STARTED) { // Unlock just acquired lock. rwLock.readUnlock(); + if (state == GridKernalState.DISCONNECTED) { + assert reconnectFut != null; + + throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); + } + throw illegalState(); } } @@ -90,6 +94,9 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { stackTrace = stackTrace(); rwLock.readLock(); + + if (state.get() == GridKernalState.DISCONNECTED) + throw new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + gridName); } /** {@inheritDoc} */ @@ -137,6 +144,27 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { return false; } + /** {@inheritDoc} */ + @Override public GridFutureAdapter onDisconnected() { + GridFutureAdapter fut = new GridFutureAdapter<>(); + + reconnectFut = new IgniteFutureImpl<>(fut); + + if (!state.compareAndSet(GridKernalState.STARTED, GridKernalState.DISCONNECTED)) { + ((GridFutureAdapter)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped.")); + + return null; + } + + return fut; + } + + /** {@inheritDoc} */ + @Override public void onReconnected() { + if (state.compareAndSet(GridKernalState.DISCONNECTED, GridKernalState.STARTED)) + ((GridFutureAdapter)reconnectFut.internalFuture()).onDone(); + } + /** * Retrieves user stack trace. * @@ -171,46 +199,15 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { assert state != null; // NOTE: this method should always be called within write lock. - this.state = state; + this.state.set(state); - if (state == GridKernalState.STOPPING) { - Runnable[] runs; - - synchronized (lsnrs) { - lsnrs.toArray(runs = new Runnable[lsnrs.size()]); - } - - // In the same thread. - for (Runnable r : runs) - r.run(); - } + if (reconnectFut != null) + ((GridFutureAdapter)reconnectFut.internalFuture()).onDone(new IgniteCheckedException("Node stopped.")); } /** {@inheritDoc} */ @Override public GridKernalState getState() { - return state; - } - - /** {@inheritDoc} */ - @Override public void addStopListener(Runnable lsnr) { - assert lsnr != null; - - if (state == GridKernalState.STARTING || state == GridKernalState.STARTED) - synchronized (lsnrs) { - lsnrs.add(lsnr); - } - else - // Call right away in the same thread. - lsnr.run(); - } - - /** {@inheritDoc} */ - @Override public void removeStopListener(Runnable lsnr) { - assert lsnr != null; - - synchronized (lsnrs) { - lsnrs.remove(lsnr); - } + return state.get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java index fbb8f45..7d63578 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java @@ -32,6 +32,9 @@ public enum GridKernalState { /** Kernal is stopping. */ STOPPING, + /** Kernal is disconnected. */ + DISCONNECTED, + /** Kernal is stopped. *

* This is also the initial state of the kernal. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index b438bc1..55a84c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -64,6 +65,16 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { plugin.onIgniteStop(cancel); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java new file mode 100644 index 0000000..e58530d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +/** + * + */ +public class IgniteClientDisconnectedCheckedException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteFuture reconnectFut; + + /** + * @param reconnectFut Reconnect future. + * @param msg Message. + */ + public IgniteClientDisconnectedCheckedException(IgniteFuture reconnectFut, String msg) { + super(msg); + + this.reconnectFut = reconnectFut; + } + + /** + * @return Reconnect future. + */ + public IgniteFuture reconnectFuture() { + return reconnectFut; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 024dc7b..0d4ce32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.session.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -902,82 +903,87 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @Override public void run() { if (log.isInfoEnabled()) { - ClusterMetrics m = cluster().localNode().metrics(); + try { + ClusterMetrics m = cluster().localNode().metrics(); - double cpuLoadPct = m.getCurrentCpuLoad() * 100; - double avgCpuLoadPct = m.getAverageCpuLoad() * 100; - double gcPct = m.getCurrentGcCpuLoad() * 100; + double cpuLoadPct = m.getCurrentCpuLoad() * 100; + double avgCpuLoadPct = m.getAverageCpuLoad() * 100; + double gcPct = m.getCurrentGcCpuLoad() * 100; - long heapUsed = m.getHeapMemoryUsed(); - long heapMax = m.getHeapMemoryMaximum(); + long heapUsed = m.getHeapMemoryUsed(); + long heapMax = m.getHeapMemoryMaximum(); - long heapUsedInMBytes = heapUsed / 1024 / 1024; - long heapCommInMBytes = m.getHeapMemoryCommitted() / 1024 / 1024; + long heapUsedInMBytes = heapUsed / 1024 / 1024; + long heapCommInMBytes = m.getHeapMemoryCommitted() / 1024 / 1024; - double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1; + double freeHeapPct = heapMax > 0 ? ((double)((heapMax - heapUsed) * 100)) / heapMax : -1; - int hosts = 0; - int nodes = 0; - int cpus = 0; + int hosts = 0; + int nodes = 0; + int cpus = 0; - try { - ClusterMetrics metrics = cluster().metrics(); + try { + ClusterMetrics metrics = cluster().metrics(); - Collection nodes0 = cluster().nodes(); + Collection nodes0 = cluster().nodes(); - hosts = U.neighborhood(nodes0).size(); - nodes = metrics.getTotalNodes(); - cpus = metrics.getTotalCpus(); - } - catch (IgniteException ignore) { - // No-op. - } + hosts = U.neighborhood(nodes0).size(); + nodes = metrics.getTotalNodes(); + cpus = metrics.getTotalCpus(); + } + catch (IgniteException ignore) { + // No-op. + } - int pubPoolActiveThreads = 0; - int pubPoolIdleThreads = 0; - int pubPoolQSize = 0; + int pubPoolActiveThreads = 0; + int pubPoolIdleThreads = 0; + int pubPoolQSize = 0; - if (execSvc instanceof ThreadPoolExecutor) { - ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; + if (execSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; - int poolSize = exec.getPoolSize(); + int poolSize = exec.getPoolSize(); - pubPoolActiveThreads = Math.min(poolSize, exec.getActiveCount()); - pubPoolIdleThreads = poolSize - pubPoolActiveThreads; - pubPoolQSize = exec.getQueue().size(); - } + pubPoolActiveThreads = Math.min(poolSize, exec.getActiveCount()); + pubPoolIdleThreads = poolSize - pubPoolActiveThreads; + pubPoolQSize = exec.getQueue().size(); + } - int sysPoolActiveThreads = 0; - int sysPoolIdleThreads = 0; - int sysPoolQSize = 0; + int sysPoolActiveThreads = 0; + int sysPoolIdleThreads = 0; + int sysPoolQSize = 0; - if (sysExecSvc instanceof ThreadPoolExecutor) { - ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc; + if (sysExecSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc; - int poolSize = exec.getPoolSize(); + int poolSize = exec.getPoolSize(); - sysPoolActiveThreads = Math.min(poolSize, exec.getActiveCount()); - sysPoolIdleThreads = poolSize - sysPoolActiveThreads; - sysPoolQSize = exec.getQueue().size(); - } + sysPoolActiveThreads = Math.min(poolSize, exec.getActiveCount()); + sysPoolIdleThreads = poolSize - sysPoolActiveThreads; + sysPoolQSize = exec.getQueue().size(); + } - String id = U.id8(localNode().id()); + String id = U.id8(localNode().id()); - String msg = NL + - "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + - " ^-- Node [id=" + id + ", name=" + name() + "]" + NL + - " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + - " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + + String msg = NL + + "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + + " ^-- Node [id=" + id + ", name=" + name() + "]" + NL + + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL + - " ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" + + " ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" + dblFmt.format(freeHeapPct) + "%, comm=" + dblFmt.format(heapCommInMBytes) + "MB]" + NL + - " ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" + + " ^-- Public thread pool [active=" + pubPoolActiveThreads + ", idle=" + pubPoolIdleThreads + ", qSize=" + pubPoolQSize + "]" + NL + - " ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" + + " ^-- System thread pool [active=" + sysPoolActiveThreads + ", idle=" + sysPoolIdleThreads + ", qSize=" + sysPoolQSize + "]" + NL + - " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]"; + " ^-- Outbound messages queue [size=" + m.getOutboundMessagesQueueSize() + "]"; - log.info(msg); + log.info(msg); + } + catch (IgniteClientDisconnectedException ignore) { + // No-op. + } } } }, metricsLogFreq, metricsLogFreq); @@ -1676,7 +1682,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { GridKernalState state = gw.getState(); - if (state == STARTED) + if (state == STARTED || state == DISCONNECTED) firstStop = true; else if (state == STARTING) U.warn(log, "Attempt to stop starting grid. This operation " + @@ -1753,7 +1759,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (cache != null) cache.blockGateways(); - assert gw.getState() == STARTED || gw.getState() == STARTING; + assert gw.getState() == STARTED || gw.getState() == STARTING || gw.getState() == DISCONNECTED; // No more kernal calls from this point on. gw.setState(STOPPING); @@ -2186,6 +2192,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { return false; } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { unguard(); } @@ -2801,6 +2810,109 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** + * + */ + public void onDisconnected() { + Throwable err = null; + + GridFutureAdapter reconnectFut = ctx.gateway().onDisconnected(); + + if (reconnectFut == null) { + assert ctx.gateway().getState() != STARTED : ctx.gateway().getState(); + + return; + } + + IgniteFuture userFut = new IgniteFutureImpl<>(reconnectFut); + + ctx.cluster().get().clientReconnectFuture(userFut); + + ctx.disconnected(true); + + List comps = ctx.components(); + + for (ListIterator it = comps.listIterator(comps.size()); it.hasPrevious();) { + GridComponent comp = it.previous(); + + try { + if (!skipDaemon(comp)) + comp.onDisconnected(userFut); + } + catch (IgniteCheckedException e) { + err = e; + } + catch (Throwable e) { + err = e; + + if (e instanceof Error) + throw e; + } + } + + for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) { + cctx.gate().writeLock(); + + cctx.gate().writeUnlock(); + } + + ctx.gateway().writeLock(); + + ctx.gateway().writeUnlock(); + + if (err != null) { + reconnectFut.onDone(err); + + U.error(log, "Failed to reconnect, will stop node", err); + + close(); + } + } + + /** + * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. + */ + public void onReconnected(final boolean clusterRestarted) { + Throwable err = null; + + try { + ctx.disconnected(false); + + for (GridComponent comp : ctx.components()) + comp.onReconnected(clusterRestarted); + + ctx.cache().context().exchange().reconnectExchangeFuture().listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); + + ctx.gateway().onReconnected(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reconnect, will stop node", e); + + close(); + } + } + }); + } + catch (IgniteCheckedException e) { + err = e; + } + catch (Throwable e) { + err = e; + + if (e instanceof Error) + throw e; + } + + if (err != null) { + U.error(log, "Failed to reconnect, will stop node", err); + + close(); + } + } + + /** * Creates optional component. * * @param cls Component interface. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index 26c704c..51cf523 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -287,6 +287,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter } /** {@inheritDoc} */ + @Nullable @Override public IgniteFuture clientReconnectFuture() { + return cluster.clientReconnectFuture(); + } + + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cluster = (IgniteClusterImpl)in.readObject(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index 3c937b0..0287ca7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -52,6 +52,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus @GridToStringExclude private ConcurrentMap nodeLoc; + /** Client reconnect future. */ + private IgniteFuture reconnecFut; + /** * Required by {@link Externalizable}. */ @@ -120,6 +123,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus try { return ctx.discovery().pingNode(nodeId); } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { unguard(); } @@ -501,6 +507,18 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus nodeLoc.clear(); } + /** + * @param reconnecFut Reconnect future. + */ + public void clientReconnectFuture(IgniteFuture reconnecFut) { + this.reconnecFut = reconnecFut; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteFuture clientReconnectFuture() { + return reconnecFut; + } + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { ctx = (GridKernalContext)in.readObject(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 40a5ea5..298ff24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -166,6 +166,18 @@ public abstract class GridManagerAdapter implements GridMan // No-op. } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + for (T t : spis) + t.onClientDisconnected(reconnectFut); + } + + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + for (T t : spis) + t.onClientReconnected(clusterRestarted); + } + /** * Starts wrapped SPI. * @@ -318,7 +330,12 @@ public abstract class GridManagerAdapter implements GridMan @Override public boolean pingNode(UUID nodeId) { A.notNull(nodeId, "nodeId"); - return ctx.discovery().pingNode(nodeId); + try { + return ctx.discovery().pingNode(nodeId); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } @Override public void send(ClusterNode node, Serializable msg, String topic) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java index 443b221..3b886a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java @@ -293,7 +293,7 @@ class GridDeploymentCommunication { log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']'); } catch (IgniteCheckedException e) { - if (ctx.discovery().pingNode(nodeId)) + if (ctx.discovery().pingNodeNoError(nodeId)) U.error(log, "Failed to send peer class loading response to node: " + nodeId, e); else if (log.isDebugEnabled()) log.debug("Failed to send peer class loading response to node " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 75fe98f..75fb41e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -94,13 +94,7 @@ public class GridDeploymentManager extends GridManagerAdapter { comm.start(); - locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm); - ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm); - verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm); - - locStore.start(); - ldrStore.start(); - verStore.start(); + startStores(); if (log.isDebugEnabled()) { log.debug("Local deployment: " + locDep); @@ -110,17 +104,24 @@ public class GridDeploymentManager extends GridManagerAdapter { } /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - GridProtocolHandler.deregisterDeploymentManager(); + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + storesOnKernalStop(); - if (verStore != null) - verStore.stop(); + storesStop(); - if (ldrStore != null) - ldrStore.stop(); + startStores(); + } - if (locStore != null) - locStore.stop(); + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + storesOnKernalStart(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + GridProtocolHandler.deregisterDeploymentManager(); + + storesStop(); if (comm != null) comm.stop(); @@ -135,21 +136,12 @@ public class GridDeploymentManager extends GridManagerAdapter { /** {@inheritDoc} */ @Override public void onKernalStart0() throws IgniteCheckedException { - locStore.onKernalStart(); - ldrStore.onKernalStart(); - verStore.onKernalStart(); + storesOnKernalStart(); } /** {@inheritDoc} */ @Override public void onKernalStop0(boolean cancel) { - if (verStore != null) - verStore.onKernalStop(); - - if (ldrStore != null) - ldrStore.onKernalStop(); - - if (locStore != null) - locStore.onKernalStop(); + storesOnKernalStop(); } /** {@inheritDoc} */ @@ -547,6 +539,57 @@ public class GridDeploymentManager extends GridManagerAdapter { return ldr instanceof GridDeploymentClassLoader; } + + /** + * @throws IgniteCheckedException If failed. + */ + private void startStores() throws IgniteCheckedException { + locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm); + ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm); + verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm); + + locStore.start(); + ldrStore.start(); + verStore.start(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void storesOnKernalStart() throws IgniteCheckedException { + locStore.onKernalStart(); + ldrStore.onKernalStart(); + verStore.onKernalStart(); + } + + /** + * + */ + private void storesOnKernalStop() { + if (verStore != null) + verStore.onKernalStop(); + + if (ldrStore != null) + ldrStore.onKernalStop(); + + if (locStore != null) + locStore.onKernalStop(); + } + + /** + * + */ + private void storesStop() { + if (verStore != null) + verStore.stop(); + + if (ldrStore != null) + ldrStore.stop(); + + if (locStore != null) + locStore.stop(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index b35628c..068d374 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -188,14 +188,14 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** Received custom messages history. */ private final ArrayDeque rcvdCustomMsgs = new ArrayDeque<>(); + /** */ + private final CountDownLatch startLatch = new CountDownLatch(1); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); } - /** */ - private final CountDownLatch startLatch = new CountDownLatch(1); - /** * @return Memory usage of non-heap memory. */ @@ -337,7 +337,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { isLocDaemon = ctx.isDaemon(); - hasRslvrs = !F.isEmpty(ctx.config().getSegmentationResolvers()); + hasRslvrs = !ctx.config().isClientMode() && !F.isEmpty(ctx.config().getSegmentationResolvers()); segChkFreq = ctx.config().getSegmentCheckFrequency(); @@ -380,14 +380,24 @@ public class GridDiscoveryManager extends GridManagerAdapter { } spi.setListener(new DiscoverySpiListener() { + private long gridStartTime; + @Override public void onDiscovery( - int type, - long topVer, - ClusterNode node, - Collection topSnapshot, - Map> snapshots, + final int type, + final long topVer, + final ClusterNode node, + final Collection topSnapshot, + final Map> snapshots, @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { + if (type == EVT_NODE_JOINED && node.isLocal() && ctx.clientDisconnected()) { + discoCacheHist.clear(); + + topHist.clear(); + + topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, null)); + } + DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null : ((CustomMessageWrapper)spiCustomMsg).delegate(); @@ -415,7 +425,9 @@ public class GridDiscoveryManager extends GridManagerAdapter { verChanged = false; } else { - if (type != EVT_NODE_SEGMENTED) { + if (type != EVT_NODE_SEGMENTED && + type != EVT_CLIENT_NODE_DISCONNECTED && + type != EVT_CLIENT_NODE_RECONNECTED) { minorTopVer = 0; verChanged = true; @@ -424,7 +436,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { verChanged = false; } - AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); + final AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -467,11 +479,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { // If this is a local join event, just save it and do not notify listeners. if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { + if (gridStartTime == 0) + gridStartTime = getSpi().getGridStartTime(); + updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()), new DiscoCache(localNode(), getSpi().getRemoteNodes())); - assert startLatch.getCount() == 1; - startLatch.countDown(); DiscoveryEvent discoEvt = new DiscoveryEvent(); @@ -491,6 +504,46 @@ public class GridDiscoveryManager extends GridManagerAdapter { return; } + else if (type == EVT_CLIENT_NODE_DISCONNECTED) { + /* + * Notify all components from discovery thread to avoid concurrent + * reconnect while disconnect handling is in progress. + */ + + assert locNode.isClient() : locNode; + assert node.isClient() : node; + + ((IgniteKernal)ctx.grid()).onDisconnected(); + + locJoinEvt = new GridFutureAdapter<>(); + + registeredCaches.clear(); + } + else if (type == EVT_CLIENT_NODE_RECONNECTED) { + assert locNode.isClient() : locNode; + assert node.isClient() : node; + + boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime(); + + gridStartTime = getSpi().getGridStartTime(); + + ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); + + ctx.cluster().clientReconnectFuture().listen(new CI1>() { + @Override public void apply(IgniteFuture fut) { + try { + fut.get(); + + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, null); + } + catch (IgniteException ignore) { + // No-op. + } + } + }); + + return; + } discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } @@ -967,7 +1020,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { Collection rmtNodes = discoCache.remoteNodes(); - Collection serverNodes = F.view(discoCache.allNodes(), F.not(clientFilter)); + Collection srvNodes = F.view(discoCache.allNodes(), F.not(clientFilter)); Collection clientNodes = F.view(discoCache.allNodes(), clientFilter); @@ -987,7 +1040,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { double heap = U.heapSize(allNodes, 2); if (log.isQuiet()) - U.quiet(false, topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap)); + U.quiet(false, topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap)); if (log.isDebugEnabled()) { String dbg = ""; @@ -997,7 +1050,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { ">>> " + PREFIX + "." + U.nl() + ">>> +----------------+" + U.nl() + ">>> Grid name: " + (ctx.gridName() == null ? "default" : ctx.gridName()) + U.nl() + - ">>> Number of server nodes: " + serverNodes.size() + U.nl() + + ">>> Number of server nodes: " + srvNodes.size() + U.nl() + ">>> Number of client nodes: " + clientNodes.size() + U.nl() + (discoOrdered ? ">>> Topology version: " + topVer + U.nl() : "") + ">>> Topology hash: 0x" + Long.toHexString(hash).toUpperCase() + U.nl(); @@ -1031,20 +1084,20 @@ public class GridDiscoveryManager extends GridManagerAdapter { log.debug(dbg); } else if (log.isInfoEnabled()) - log.info(topologySnapshotMessage(serverNodes.size(), clientNodes.size(), totalCpus, heap)); + log.info(topologySnapshotMessage(srvNodes.size(), clientNodes.size(), totalCpus, heap)); } /** - * @param serverNodesNum Server nodes number. + * @param srvNodesNum Server nodes number. * @param clientNodesNum Client nodes number. * @param totalCpus Total cpu number. * @param heap Heap size. * @return Topology snapshot message. */ - private String topologySnapshotMessage(int serverNodesNum, int clientNodesNum, int totalCpus, double heap) { + private String topologySnapshotMessage(int srvNodesNum, int clientNodesNum, int totalCpus, double heap) { return PREFIX + " [" + (discoOrdered ? "ver=" + topSnap.get().topVer.topologyVersion() + ", " : "") + - "server nodes=" + serverNodesNum + + "server nodes=" + srvNodesNum + ", client nodes=" + clientNodesNum + ", CPUs=" + totalCpus + ", heap=" + heap + "GB" + @@ -1134,8 +1187,9 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** * @param nodeId ID of the node. * @return {@code True} if ping succeeded. + * @throws IgniteClientDisconnectedCheckedException If ping failed. */ - public boolean pingNode(UUID nodeId) { + public boolean pingNode(UUID nodeId) throws IgniteClientDisconnectedCheckedException { assert nodeId != null; if (!busyLock.enterBusy()) @@ -1144,6 +1198,36 @@ public class GridDiscoveryManager extends GridManagerAdapter { try { return getSpi().pingNode(nodeId); } + catch (IgniteException e) { + if (e.hasCause(IgniteClientDisconnectedCheckedException.class)) { + IgniteFuture reconnectFut = ctx.cluster().clientReconnectFuture(); + + throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage()); + } + + throw e; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId ID of the node. + * @return {@code True} if ping succeeded. + */ + public boolean pingNodeNoError(UUID nodeId) { + assert nodeId != null; + + if (!busyLock.enterBusy()) + return false; + + try { + return getSpi().pingNode(nodeId); + } + catch (IgniteException e) { + return false; + } finally { busyLock.leaveBusy(); } @@ -1519,9 +1603,20 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** * @param msg Custom message. + * @throws IgniteCheckedException If failed. */ - public void sendCustomEvent(DiscoveryCustomMessage msg) { - getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); + public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException { + try { + getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); + } + catch (IgniteClientDisconnectedException e) { + IgniteFuture reconnectFut = ctx.cluster().clientReconnectFuture(); + + throw new IgniteClientDisconnectedCheckedException(reconnectFut, e.getMessage()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } } /** @@ -1743,6 +1838,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { else if (type == EVT_NODE_SEGMENTED) evt.message("Node segmented: " + node); + else if (type == EVT_CLIENT_NODE_DISCONNECTED) + evt.message("Client node disconnected: " + node); + + else if (type == EVT_CLIENT_NODE_RECONNECTED) + evt.message("Client node reconnected: " + node); + else assert false; @@ -1755,6 +1856,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { * @param topVer Topology version. * @param node Node. * @param topSnapshot Topology snapshot. + * @param data Custom message. */ void addEvent( int type, @@ -1864,6 +1966,21 @@ public class GridDiscoveryManager extends GridManagerAdapter { break; } + case EVT_CLIENT_NODE_DISCONNECTED: { + // No-op. + + break; + } + + case EVT_CLIENT_NODE_RECONNECTED: { + if (log.isInfoEnabled()) + log.info("Client node reconnected to topology: " + node); + + ackTopology(topVer.topologyVersion(), true); + + break; + } + case EVT_NODE_FAILED: { // Check only if resolvers were configured. if (hasRslvrs) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index a84c48a..8baf95c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -62,6 +63,16 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 6989385..d40128c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -69,7 +69,7 @@ public class GridAffinityAssignmentCache { private IgniteLogger log; /** Node stop flag. */ - private volatile boolean stopping; + private volatile IgniteCheckedException stopErr; /** * Constructs affinity cached calculations. @@ -130,18 +130,28 @@ public class GridAffinityAssignmentCache { /** * Kernal stop callback. + * + * @param err Error. */ - public void onKernalStop() { - stopping = true; - - IgniteCheckedException err = - new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); + public void onKernalStop(IgniteCheckedException err) { + stopErr = err; for (AffinityReadyFuture fut : readyFuts.values()) fut.onDone(err); } /** + * + */ + public void onReconnected() { + affCache.clear(); + + head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE)); + + stopErr = null; + } + + /** * Calculates affinity cache for given topology version. * * @param topVer Topology version to calculate affinity cache for. @@ -312,8 +322,8 @@ public class GridAffinityAssignmentCache { fut.onDone(topVer); } - else if (stopping) - fut.onDone(new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.")); + else if (stopErr != null) + fut.onDone(stopErr); return fut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java index 29e50b6..9e765d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOsConflictResolutionManager.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.lang.*; /** * OS conflict resolver manager. @@ -55,4 +56,9 @@ public class CacheOsConflictResolutionManager implements CacheConflictReso @Override public void printMemoryStats() { // No-op. } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index dfc39c1..1e8184d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -43,6 +43,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** Custom message ID. */ private IgniteUuid id = IgniteUuid.randomUuid(); + /** */ + private boolean clientReconnect; + /** * @param reqs Requests. */ @@ -93,6 +96,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { return false; } + /** + * @param clientReconnect {@code True} if this is discovery data sent on client reconnect. + */ + public void clientReconnect(boolean clientReconnect) { + this.clientReconnect = clientReconnect; + } + + /** + * @return {@code True} if this is discovery data sent on client reconnect. + */ + public boolean clientReconnect() { + return clientReconnect; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index e138520..d2a730a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -212,7 +212,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache ctx, int startSize) { - this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F)); + this(ctx, new GridCacheConcurrentMap(ctx, startSize, 0.75F, null)); } /** @@ -2868,7 +2868,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache, Cache.Entry>() { private IgniteCacheExpiryPolicy expiryPlc = - ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); + ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); @Override public Cache.Entry apply(Cache.Entry lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); @@ -4443,6 +4443,13 @@ public abstract class GridCacheAdapter implements IgniteInternalCache { private final GridCacheContext ctx; /** Stopped flag for dynamic caches. */ - private volatile boolean stopped; + private final AtomicReference state = new AtomicReference<>(State.STARTED); + + /** */ + private IgniteFuture reconnectFut; /** */ private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); @@ -56,11 +63,36 @@ public class GridCacheGateway { rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); + checkState(true, true); + } - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + /** + * @param lock {@code True} if lock is held. + * @param stopErr {@code True} if throw exception if stopped. + * @return {@code True} if cache is in started state. + */ + private boolean checkState(boolean lock, boolean stopErr) { + State state = this.state.get(); + + if (state != State.STARTED) { + if (lock) + rwLock.readUnlock(); + + if (state == State.STOPPED) { + if (stopErr) + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); + else + return false; + } + else { + assert reconnectFut != null; + + throw new CacheException( + new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected: " + ctx.gridName())); + } } + + return true; } /** @@ -71,17 +103,11 @@ public class GridCacheGateway { public boolean enterIfNotStopped() { onEnter(); - // Must unlock in case of unexpected errors to avoid - // deadlocks during kernal stop. + // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop. rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); - - return false; - } + return checkState(true, false); - return true; } /** @@ -92,7 +118,7 @@ public class GridCacheGateway { public boolean enterIfNotStoppedNoLock() { onEnter(); - return !stopped; + return checkState(false, false); } /** @@ -144,11 +170,7 @@ public class GridCacheGateway { rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); - - throw new IllegalStateException("Cache has been stopped: " + ctx.name()); - } + checkState(true, true); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -169,8 +191,7 @@ public class GridCacheGateway { @Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) { onEnter(); - if (stopped) - throw new IllegalStateException("Cache has been stopped: " + ctx.name()); + checkState(false, false); return setOperationContextPerCall(opCtx); } @@ -229,8 +250,42 @@ public class GridCacheGateway { /** * */ - public void block() { - stopped = true; + public void stopped() { + state.set(State.STOPPED); + } + + /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture reconnectFut) { + assert reconnectFut != null; + + this.reconnectFut = reconnectFut; + + state.compareAndSet(State.STARTED, State.DISCONNECTED); + } + + /** + * + */ + public void writeLock(){ + rwLock.writeLock(); + } + + /** + * + */ + public void writeUnlock() { + rwLock.writeUnlock(); + } + + /** + * @param stopped Cache stopped flag. + */ + public void reconnected(boolean stopped) { + State newState = stopped ? State.STOPPED : State.STARTED; + + state.compareAndSet(State.DISCONNECTED, newState); } /** @@ -256,11 +311,24 @@ public class GridCacheGateway { Thread.currentThread().interrupt(); try { - // No-op. - stopped = true; + state.set(State.STOPPED); } finally { rwLock.writeUnlock(); } } + + /** + * + */ + private enum State { + /** */ + STARTED, + + /** */ + DISCONNECTED, + + /** */ + STOPPED + } }