Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EE3FC200C62 for ; Wed, 26 Apr 2017 14:45:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ECA60160B95; Wed, 26 Apr 2017 12:45:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C5210160BA8 for ; Wed, 26 Apr 2017 14:45:09 +0200 (CEST) Received: (qmail 96767 invoked by uid 500); 26 Apr 2017 12:45:09 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 96711 invoked by uid 99); 26 Apr 2017 12:45:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 12:45:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C13D9DF989; Wed, 26 Apr 2017 12:45:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 26 Apr 2017 12:45:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] ignite git commit: ignite-4799 TcpDiscoverySpi: removed missedHeartbeats properties, heartbeatFrequency (instead use IgiteConfiguration.metricsUpdateFrequency). Added IgiteConfiguration.clientFailureDetectionTimeout. archived-at: Wed, 26 Apr 2017 12:45:12 -0000 ignite-4799 TcpDiscoverySpi: removed missedHeartbeats properties, heartbeatFrequency (instead use IgiteConfiguration.metricsUpdateFrequency). Added IgiteConfiguration.clientFailureDetectionTimeout. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6998785a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6998785a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6998785a Branch: refs/heads/ignite-2.0 Commit: 6998785a8861387b7ad83527a381dc5b772cf76f Parents: c829aac Author: Alexander Belyak Authored: Wed Apr 26 15:43:42 2017 +0300 Committer: sboikov Committed: Wed Apr 26 15:43:42 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/gridify/AbstractAopTest.java | 4 +- .../gridify/ExternalNonSpringAopSelfTest.java | 6 +- .../clients/src/test/resources/spring-cache.xml | 1 - .../apache/ignite/cluster/ClusterMetrics.java | 4 +- .../org/apache/ignite/cluster/ClusterNode.java | 11 +- .../configuration/IgniteConfiguration.java | 52 ++- .../org/apache/ignite/events/EventType.java | 2 +- .../processors/job/GridJobProcessor.java | 18 +- .../utils/PlatformConfigurationUtils.java | 32 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 32 ++ .../spi/IgniteSpiOperationTimeoutHelper.java | 8 +- .../jobstealing/JobStealingCollisionSpi.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 6 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 31 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 142 ++++---- .../spi/discovery/tcp/TcpDiscoverySpi.java | 143 ++------ .../spi/discovery/tcp/TcpDiscoverySpiMBean.java | 26 +- .../tcp/internal/TcpDiscoveryNode.java | 33 +- .../TcpDiscoveryClientHeartbeatMessage.java | 72 ---- .../TcpDiscoveryClientMetricsUpdateMessage.java | 72 ++++ .../messages/TcpDiscoveryHeartbeatMessage.java | 338 ------------------- .../TcpDiscoveryMetricsUpdateMessage.java | 338 +++++++++++++++++++ .../adaptive/AdaptiveLoadBalancingSpi.java | 12 +- .../resources/META-INF/classnames.properties | 8 +- .../core/src/test/config/load/dsi-load-base.xml | 3 +- .../src/test/config/load/merge-sort-base.xml | 7 +- .../config/streamer/spring-streamer-base.xml | 5 +- .../java/org/apache/ignite/GridTestJob.java | 19 ++ .../java/org/apache/ignite/GridTestTask.java | 18 +- .../internal/ClusterNodeMetricsSelfTest.java | 10 +- .../ignite/internal/GridAffinityMappedTest.java | 5 +- .../internal/GridAffinityP2PSelfTest.java | 3 +- .../ignite/internal/GridAffinitySelfTest.java | 3 +- .../GridCancelledJobsMetricsSelfTest.java | 4 +- ...ridFailFastNodeFailureDetectionSelfTest.java | 4 +- .../GridJobCollisionCancelSelfTest.java | 2 +- .../GridDiscoveryManagerAliveCacheSelfTest.java | 4 +- .../GridCacheAbstractFailoverSelfTest.java | 4 +- .../cache/GridCacheAbstractSelfTest.java | 4 +- .../cache/GridCacheMvccManagerSelfTest.java | 3 +- .../cache/IgniteCacheAbstractTest.java | 10 +- .../binary/BinaryMetadataUpdatesFlowTest.java | 4 +- .../CacheLateAffinityAssignmentTest.java | 3 +- .../GridCacheNodeFailureAbstractTest.java | 3 +- .../distributed/IgniteCache150ClientsTest.java | 2 +- .../IgniteCacheNearRestartRollbackSelfTest.java | 2 +- ...dCacheColocatedTxSingleThreadedSelfTest.java | 2 +- .../dht/GridCacheDhtPreloadDelayedSelfTest.java | 2 +- .../GridCacheDhtPreloadMessageCountTest.java | 2 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 2 +- .../near/GridCacheNearMultiGetSelfTest.java | 2 +- .../near/GridCacheNearMultiNodeSelfTest.java | 2 +- ...achePartitionedTxSingleThreadedSelfTest.java | 2 +- .../cache/query/IndexingSpiQuerySelfTest.java | 4 +- .../service/GridServiceClientNodeTest.java | 7 +- ...ridSingleSplitsNewNodesAbstractLoadTest.java | 11 +- ...idSingleSplitsNewNodesMulticastLoadTest.java | 9 +- .../p2p/GridP2PSameClassLoaderSelfTest.java | 2 +- .../discovery/AbstractDiscoverySelfTest.java | 19 +- ...lientDiscoverySpiFailureTimeoutSelfTest.java | 245 +++++++++++++- .../tcp/TcpClientDiscoverySpiSelfTest.java | 79 ++++- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 18 +- .../tcp/TcpDiscoverySpiConfigSelfTest.java | 4 +- .../TcpDiscoverySpiFailureTimeoutSelfTest.java | 51 +-- .../testframework/junits/GridAbstractTest.java | 10 +- .../webapp/META-INF/ignite-webapp-config.xml | 1 - .../Cache/CacheMetricsTest.cs | 3 +- .../IgniteConfigurationSerializerTest.cs | 1 - .../IgniteConfigurationTest.cs | 1 - .../Discovery/Tcp/TcpDiscoverySpi.cs | 15 - .../IgniteConfigurationSection.xsd | 5 - .../Datagrid/MultiTieredCacheExample.cs | 2 +- .../ignite/p2p/GridP2PDisabledSelfTest.java | 4 +- .../webapp2/META-INF/ignite-webapp-config.xml | 1 - 74 files changed, 1135 insertions(+), 886 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java ---------------------------------------------------------------------- diff --git a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java b/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java index 2008eff..33f2cdd 100644 --- a/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java +++ b/modules/aop/src/test/java/org/apache/ignite/gridify/AbstractAopTest.java @@ -54,9 +54,9 @@ public abstract class AbstractAopTest extends GridCommonAbstractTest { cfg.setDeploymentSpi(new LocalDeploymentSpi()); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + cfg.setMetricsUpdateFrequency(500); cfg.setDeploymentMode(depMode); return cfg; @@ -738,4 +738,4 @@ public abstract class AbstractAopTest extends GridCommonAbstractTest { return true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java b/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java index b53501b..44fa48d 100644 --- a/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java +++ b/modules/aop/src/test/java/org/test/gridify/ExternalNonSpringAopSelfTest.java @@ -524,9 +524,7 @@ public class ExternalNonSpringAopSelfTest extends GridCommonAbstractTest { IgniteConfiguration cfg = super.getConfiguration(); cfg.setDeploymentSpi(new LocalDeploymentSpi()); - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setHeartbeatFrequency(500); - - cfg.setDeploymentMode(depMode); + cfg.setMetricsUpdateFrequency(500); cfg.setDeploymentMode(depMode); @@ -539,4 +537,4 @@ public class ExternalNonSpringAopSelfTest extends GridCommonAbstractTest { @Override public String getTestIgniteInstanceName() { return "ExternalAopTarget"; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/clients/src/test/resources/spring-cache.xml ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/spring-cache.xml b/modules/clients/src/test/resources/spring-cache.xml index 4dbae6e..8cbc688 100644 --- a/modules/clients/src/test/resources/spring-cache.xml +++ b/modules/clients/src/test/resources/spring-cache.xml @@ -148,7 +148,6 @@ - --> http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java index 50c09be..7dd4707 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterMetrics.java @@ -29,8 +29,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; *

* Node metrics for any node can be accessed via {@link ClusterNode#metrics()} * method. Keep in mind that there will be a certain network delay (usually - * equal to heartbeat delay) for the accuracy of node metrics. However, when accessing - * metrics on local node {@link IgniteCluster#localNode() Grid.localNode().getMetrics()} + * equal to metrics update delay) for the accuracy of node metrics. However, when accessing + * metrics on local node {@link IgniteCluster#localNode() IgniteCluster.localNode().getMetrics()} * the metrics are always accurate and up to date. *

* Local node metrics are registered as {@code MBean} and can be accessed from http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index bfc395d..e122ff6 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -87,9 +87,8 @@ import org.jetbrains.annotations.Nullable; *

Cluster Node Metrics

* Cluster node metrics (see {@link #metrics()}) are updated frequently for all nodes * and can be used to get dynamic information about a node. The frequency of update - * is often directly related to the heartbeat exchange between nodes. So if, for example, - * default {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} is used, - * the metrics data will be updated every {@code 2} seconds by default. + * is controlled by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency()} parameter. + * The metrics data will be updated every {@code 2} seconds by default. *

* Grid node metrics provide information that can frequently change, * such as Heap and Non-Heap memory utilization, CPU load, number of active and waiting @@ -145,9 +144,9 @@ public interface ClusterNode { * method and use it during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} or during collision * resolution. *

- * Node metrics are updated with some delay which is directly related to heartbeat - * frequency. For example, when used with default - * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds. + * Node metrics are updated with some delay which is controlled by + * {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency()} parameter. + * By default the update will happen every {@code 2} seconds. * * @return Runtime metrics snapshot for this node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 17927b9..9f68399 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -200,6 +200,10 @@ public class IgniteConfiguration { @SuppressWarnings("UnnecessaryBoxing") public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000); + /** Default failure detection timeout for client nodes in millis. */ + @SuppressWarnings("UnnecessaryBoxing") + public static final Long DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT = new Long(30_000); + /** Optional local Ignite instance name. */ private String igniteInstanceName; @@ -386,6 +390,9 @@ public class IgniteConfiguration { /** Failure detection timeout. */ private Long failureDetectionTimeout = DFLT_FAILURE_DETECTION_TIMEOUT; + /** Failure detection timeout for client nodes. */ + private Long clientFailureDetectionTimeout = DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT; + /** Property names to include into node attributes. */ private String[] includeProps; @@ -491,6 +498,7 @@ public class IgniteConfiguration { cacheSanityCheckEnabled = cfg.isCacheSanityCheckEnabled(); callbackPoolSize = cfg.getAsyncCallbackPoolSize(); classLdr = cfg.getClassLoader(); + clientFailureDetectionTimeout = cfg.getClientFailureDetectionTimeout(); clientMode = cfg.isClientMode(); connectorCfg = cfg.getConnectorConfiguration(); consistentId = cfg.getConsistentId(); @@ -1288,20 +1296,13 @@ public class IgniteConfiguration { } /** - * Gets job metrics update frequency in milliseconds. + * Gets Ignite metrics update frequency in milliseconds. *

* Updating metrics too frequently may have negative performance impact. *

- * The following values are accepted: - *

    - *
  • {@code -1} job metrics are never updated.
  • - *
  • {@code 0} job metrics are updated on each job start and finish.
  • - *
  • Positive value defines the actual update frequency. If not provided, then default value - * {@link #DFLT_METRICS_UPDATE_FREQ} is used.
  • - *
* If not provided, then default value {@link #DFLT_METRICS_UPDATE_FREQ} is used. * - * @return Job metrics update frequency in milliseconds. + * @return Metrics update frequency in milliseconds. * @see #DFLT_METRICS_UPDATE_FREQ */ public long getMetricsUpdateFrequency() { @@ -1309,15 +1310,13 @@ public class IgniteConfiguration { } /** - * Sets job metrics update frequency in milliseconds. + * Sets Ignite metrics update frequency in milliseconds. *

- * If set to {@code -1} job metrics are never updated. - * If set to {@code 0} job metrics are updated on each job start and finish. * Positive value defines the actual update frequency. * If not provided, then default value * {@link #DFLT_METRICS_UPDATE_FREQ} is used. * - * @param metricsUpdateFreq Job metrics update frequency in milliseconds. + * @param metricsUpdateFreq Metrics update frequency in milliseconds. * @return {@code this} for chaining. */ public IgniteConfiguration setMetricsUpdateFrequency(long metricsUpdateFreq) { @@ -1835,6 +1834,33 @@ public class IgniteConfiguration { } /** + * Returns failure detection timeout for client nodes used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + *

+ * Default is {@link #DFLT_CLIENT_FAILURE_DETECTION_TIMEOUT}. + * + * @see #setClientFailureDetectionTimeout(long) + * @return Failure detection timeout for client nodes in milliseconds. + */ + public Long getClientFailureDetectionTimeout() { + return clientFailureDetectionTimeout; + } + + /** + * Sets failure detection timeout to use in {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. + *

+ * Failure detection timeout is used to determine how long the communication or discovery SPIs should wait before + * considering a remote connection failed. + * + * @param clientFailureDetectionTimeout Failure detection timeout in milliseconds. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setClientFailureDetectionTimeout(long clientFailureDetectionTimeout) { + this.clientFailureDetectionTimeout = clientFailureDetectionTimeout; + + return this; + } + + /** * Returns failure detection timeout used by {@link TcpDiscoverySpi} and {@link TcpCommunicationSpi}. *

* Default is {@link #DFLT_FAILURE_DETECTION_TIMEOUT}. http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index e506371..1960692 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -136,7 +136,7 @@ public interface EventType { * Built-in event type: node metrics updated. *
* Generated when node's metrics are updated. In most cases this callback - * is invoked with every heartbeat received from a node (including local node). + * is invoked with every metrics update received from a node (including local node). *

* NOTE: all types in range from 1 to 1000 are reserved for * internal Ignite events and should not be used by user-defined events. http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index 369ca22..e0bc4d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -855,8 +855,7 @@ public class GridJobProcessor extends GridProcessorAdapter { } }); - if (metricsUpdateFreq > -1L) - updateJobMetrics(); + updateJobMetrics(); } finally { handlingCollision.set(Boolean.FALSE); @@ -867,24 +866,21 @@ public class GridJobProcessor extends GridProcessorAdapter { * */ private void updateJobMetrics() { - assert metricsUpdateFreq > -1L; + assert metricsUpdateFreq > 0L; - if (metricsUpdateFreq == 0L) + long now = U.currentTimeMillis(); + long lastUpdate = metricsLastUpdateTstamp.get(); + + if (now - lastUpdate > metricsUpdateFreq && metricsLastUpdateTstamp.compareAndSet(lastUpdate, now)) updateJobMetrics0(); - else { - long now = U.currentTimeMillis(); - long lastUpdate = metricsLastUpdateTstamp.get(); - if (now - lastUpdate > metricsUpdateFreq && metricsLastUpdateTstamp.compareAndSet(lastUpdate, now)) - updateJobMetrics0(); - } } /** * */ private void updateJobMetrics0() { - assert metricsUpdateFreq > -1L; + assert metricsUpdateFreq > 0L; GridJobMetricsSnapshot m = new GridJobMetricsSnapshot(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 4186eb9..eb3e716 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -531,7 +531,8 @@ public class PlatformConfigurationUtils { if (in.readBoolean()) cfg.setClientMode(in.readBoolean()); int[] eventTypes = in.readIntArray(); - if (eventTypes != null) cfg.setIncludeEventTypes(eventTypes); + if (eventTypes != null) + cfg.setIncludeEventTypes(eventTypes); if (in.readBoolean()) cfg.setMetricsExpireTime(in.readLong()); if (in.readBoolean()) @@ -556,8 +557,10 @@ public class PlatformConfigurationUtils { cfg.setDaemon(in.readBoolean()); if (in.readBoolean()) cfg.setLateAffinityAssignment(in.readBoolean()); - if (in.readBoolean()) + if (in.readBoolean()) { + cfg.setClientFailureDetectionTimeout(in.readLong()); cfg.setFailureDetectionTimeout(in.readLong()); + } readCacheConfigurations(in, cfg); readDiscoveryConfiguration(in, cfg); @@ -752,12 +755,9 @@ public class PlatformConfigurationUtils { disco.setReconnectCount(in.readInt()); disco.setLocalPort(in.readInt()); disco.setLocalPortRange(in.readInt()); - disco.setMaxMissedHeartbeats(in.readInt()); - disco.setMaxMissedClientHeartbeats(in.readInt()); disco.setStatisticsPrintFrequency(in.readLong()); disco.setIpFinderCleanFrequency(in.readLong()); disco.setThreadPriority(in.readInt()); - disco.setHeartbeatFrequency(in.readLong()); disco.setTopHistorySize(in.readInt()); cfg.setDiscoverySpi(disco); @@ -960,7 +960,8 @@ public class PlatformConfigurationUtils { w.writeLong(cfg.getMetricsUpdateFrequency()); w.writeBoolean(true); w.writeInt(cfg.getNetworkSendRetryCount()); - w.writeBoolean(true);w.writeLong(cfg.getNetworkSendRetryDelay()); + w.writeBoolean(true); + w.writeLong(cfg.getNetworkSendRetryDelay()); w.writeBoolean(true); w.writeLong(cfg.getNetworkTimeout()); w.writeString(cfg.getWorkDirectory()); @@ -970,6 +971,7 @@ public class PlatformConfigurationUtils { w.writeBoolean(true); w.writeBoolean(cfg.isLateAffinityAssignment()); w.writeBoolean(true); + w.writeLong(cfg.getClientFailureDetectionTimeout()); w.writeLong(cfg.getFailureDetectionTimeout()); CacheConfiguration[] cacheCfg = cfg.getCacheConfiguration(); @@ -1063,17 +1065,17 @@ public class PlatformConfigurationUtils { else w.writeBoolean(false); - EventStorageSpi eventStorageSpi = cfg.getEventStorageSpi(); + EventStorageSpi evtStorageSpi = cfg.getEventStorageSpi(); - if (eventStorageSpi == null) { + if (evtStorageSpi == null) w.writeByte((byte) 0); - } else if (eventStorageSpi instanceof NoopEventStorageSpi) { + else if (evtStorageSpi instanceof NoopEventStorageSpi) w.writeByte((byte) 1); - } else if (eventStorageSpi instanceof MemoryEventStorageSpi) { + else if (evtStorageSpi instanceof MemoryEventStorageSpi) { w.writeByte((byte) 2); - w.writeLong(((MemoryEventStorageSpi)eventStorageSpi).getExpireCount()); - w.writeLong(((MemoryEventStorageSpi)eventStorageSpi).getExpireAgeMs()); + w.writeLong(((MemoryEventStorageSpi)evtStorageSpi).getExpireCount()); + w.writeLong(((MemoryEventStorageSpi)evtStorageSpi).getExpireAgeMs()); } writeMemoryConfiguration(w, cfg.getMemoryConfiguration()); @@ -1135,9 +1137,8 @@ public class PlatformConfigurationUtils { w.writeInt(ttl); } } - else { + else w.writeBoolean(false); - } w.writeLong(tcp.getSocketTimeout()); w.writeLong(tcp.getAckTimeout()); @@ -1151,12 +1152,9 @@ public class PlatformConfigurationUtils { w.writeInt(tcp.getReconnectCount()); w.writeInt(tcp.getLocalPort()); w.writeInt(tcp.getLocalPortRange()); - w.writeInt(tcp.getMaxMissedHeartbeats()); - w.writeInt(tcp.getMaxMissedClientHeartbeats()); w.writeLong(tcp.getStatisticsPrintFrequency()); w.writeLong(tcp.getIpFinderCleanFrequency()); w.writeInt(tcp.getThreadPriority()); - w.writeLong(tcp.getHeartbeatFrequency()); w.writeInt((int)tcp.getTopHistorySize()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index ec56f4f..81f5c28 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -95,6 +95,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { private boolean failureDetectionTimeoutEnabled = true; /** + * Failure detection timeout for client nodes. Initialized with the value of + * {@link IgniteConfiguration#getClientFailureDetectionTimeout()}. + */ + private long clientFailureDetectionTimeout; + + /** * Failure detection timeout. Initialized with the value of * {@link IgniteConfiguration#getFailureDetectionTimeout()}. */ @@ -648,12 +654,29 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { // Because U.currentTimeInMillis() is updated once in 10 milliseconds. log.warning("Failure detection timeout is too low, it may lead to unpredictable behaviour " + "[failureDetectionTimeout=" + failureDetectionTimeout + ']'); + else if (failureDetectionTimeout <= ignite.configuration().getMetricsUpdateFrequency()) + log.warning("'IgniteConfiguration.failureDetectionTimeout' should be greater then " + + "'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking."); } // Intentionally compare references using '!=' below else if (ignite.configuration().getFailureDetectionTimeout() != IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)"); + clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout(); + + if (clientFailureDetectionTimeout <= 0) + throw new IgniteSpiException("Invalid client failure detection timeout value: " + + clientFailureDetectionTimeout); + else if (clientFailureDetectionTimeout <= 10) + // Because U.currentTimeInMillis() is updated once in 10 milliseconds. + log.warning("Client failure detection timeout is too low, it may lead to unpredictable behaviour " + + "[clientFailureDetectionTimeout=" + clientFailureDetectionTimeout + ']'); + + if (clientFailureDetectionTimeout < ignite.configuration().getMetricsUpdateFrequency()) + throw new IgniteSpiException("Inconsistent configuration " + + "('IgniteConfiguration.clientFailureDetectionTimeout' must be greater or equal to " + + "'IgniteConfiguration.metricsUpdateFrequency')."); } /** @@ -675,6 +698,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { } /** + * Returns client failure detection timeout set to use for network related operations. + * + * @return client failure detection timeout in milliseconds or {@code 0} if the timeout is disabled. + */ + public long clientFailureDetectionTimeout() { + return clientFailureDetectionTimeout; + } + + /** * Returns failure detection timeout set to use for network related operations. * * @return failure detection timeout in milliseconds or {@code 0} if the timeout is disabled. http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java index e17b0dd..c685ea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java @@ -45,10 +45,12 @@ public class IgniteSpiOperationTimeoutHelper { * Constructor. * * @param adapter SPI adapter. + * @param srvOp {@code True} if communicates with server node. */ - public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter) { + public IgniteSpiOperationTimeoutHelper(IgniteSpiAdapter adapter, boolean srvOp) { failureDetectionTimeoutEnabled = adapter.failureDetectionTimeoutEnabled(); - failureDetectionTimeout = adapter.failureDetectionTimeout(); + failureDetectionTimeout = srvOp ? adapter.failureDetectionTimeout() : + adapter.clientFailureDetectionTimeout(); } /** @@ -99,4 +101,4 @@ public class IgniteSpiOperationTimeoutHelper { return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException || X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java index 8a02225..6f2c099 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/collision/jobstealing/JobStealingCollisionSpi.java @@ -88,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; * {@link org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi JobStealingFailoverSpi}. * Also note that job metrics update should be enabled in order for this SPI * to work properly (i.e. {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsUpdateFrequency() IgniteConfiguration#getMetricsUpdateFrequency()} - * should be set to {@code 0} or greater value). + * should be set to positive value). * The responsibility of Job Stealing Failover SPI is to properly route stolen * jobs to the nodes that initially requested (stole) these jobs. The * SPI maintains a counter of how many times a jobs was stolen and http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1fedf83..be897d6 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2727,7 +2727,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati long connTimeout0 = connTimeout; - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, + !node.isClient()); while (true) { GridCommunicationClient client; @@ -2918,7 +2919,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati int attempt = 1; - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, + !node.isClient()); while (!conn) { // Reconnection on handshake timeout. try { http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 34ee414..b5b4c77 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -81,7 +81,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientHeartbeatMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; @@ -89,7 +89,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessa import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; @@ -272,9 +272,9 @@ class ClientImpl extends TcpDiscoveryImpl { } timer.schedule( - new HeartbeatSender(), - spi.hbFreq, - spi.hbFreq); + new MetricsSender(), + spi.metricsUpdateFreq, + spi.metricsUpdateFreq); spi.printStartInfo(); } @@ -597,7 +597,7 @@ class ClientImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); while (true) { boolean openSock = false; @@ -861,13 +861,14 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * Heartbeat sender. + * Metrics sender. */ - private class HeartbeatSender extends TimerTask { + private class MetricsSender extends TimerTask { /** {@inheritDoc} */ @Override public void run() { if (!spi.getSpiContext().isStopping() && sockWriter.isOnline()) { - TcpDiscoveryClientHeartbeatMessage msg = new TcpDiscoveryClientHeartbeatMessage(getLocalNodeId(), + TcpDiscoveryClientMetricsUpdateMessage msg = new TcpDiscoveryClientMetricsUpdateMessage( + getLocalNodeId(), spi.metricsProvider.metrics()); msg.client(true); @@ -1829,8 +1830,8 @@ class ClientImpl extends TcpDiscoveryImpl { processNodeLeftMessage((TcpDiscoveryNodeLeftMessage)msg); else if (msg instanceof TcpDiscoveryNodeFailedMessage) processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); - else if (msg instanceof TcpDiscoveryHeartbeatMessage) - processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); + else if (msg instanceof TcpDiscoveryMetricsUpdateMessage) + processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg); else if (msg instanceof TcpDiscoveryClientReconnectMessage) processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); else if (msg instanceof TcpDiscoveryCustomEventMessage) @@ -2152,7 +2153,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param msg Message. */ - private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { + private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { if (spi.getSpiContext().isStopping()) return; @@ -2160,16 +2161,16 @@ class ClientImpl extends TcpDiscoveryImpl { assert msg.senderNodeId() != null; if (log.isDebugEnabled()) - log.debug("Received heartbeat response: " + msg); + log.debug("Received metrics response: " + msg); } else { long tstamp = U.currentTimeMillis(); if (msg.hasMetrics()) { - for (Map.Entry e : msg.metrics().entrySet()) { + for (Map.Entry e : msg.metrics().entrySet()) { UUID nodeId = e.getKey(); - TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue(); + TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 47c13e1..6a10ec2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -106,7 +106,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientAckResponse; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientHeartbeatMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; @@ -116,7 +116,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHandshakeResponse; -import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryHeartbeatMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; @@ -612,7 +612,8 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, + clientNodeId == null); if (F.contains(spi.locNodeAddrs, addr)) { if (clientNodeId == null) @@ -991,7 +992,9 @@ class ServerImpl extends TcpDiscoveryImpl { for (InetSocketAddress addr : addrs) { try { - Integer res = sendMessageDirectly(joinReq, addr); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + + Integer res = sendMessageDirectly(joinReq, addr, timeoutHelper); assert res != null; @@ -1104,10 +1107,12 @@ class ServerImpl extends TcpDiscoveryImpl { * * @param msg Message to send. * @param addr Address to send message to. + * @param timeoutHelper Operation timeout helper. * @return Response read from the recipient or {@code null} if no response is supposed. * @throws IgniteSpiException If an error occurs. */ - @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr) + @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, + IgniteSpiOperationTimeoutHelper timeoutHelper) throws IgniteSpiException { assert msg != null; assert addr != null; @@ -1124,8 +1129,6 @@ class ServerImpl extends TcpDiscoveryImpl { UUID locNodeId = getLocalNodeId(); - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); - int reconCnt = 0; while (true){ @@ -1731,7 +1734,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @return {@code True} if recordable in debug mode. */ private boolean recordable(TcpDiscoveryAbstractMessage msg) { - return !(msg instanceof TcpDiscoveryHeartbeatMessage) && + return !(msg instanceof TcpDiscoveryMetricsUpdateMessage) && !(msg instanceof TcpDiscoveryStatusCheckMessage) && !(msg instanceof TcpDiscoveryDiscardMessage) && !(msg instanceof TcpDiscoveryConnectionCheckMessage); @@ -1759,7 +1762,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message. * @param nodeId Node ID. */ - private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + private static void removeMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { msg.removeMetrics(nodeId); msg.removeCacheMetrics(nodeId); } @@ -2384,11 +2387,11 @@ class ServerImpl extends TcpDiscoveryImpl { /** Last time status message has been sent. */ private long lastTimeStatusMsgSent; - /** Incoming heartbeats check frequency. */ - private long hbCheckFreq = (long)spi.maxMissedHbs * spi.hbFreq + 50; + /** Incoming metrics check frequency. */ + private long metricsCheckFreq = 3 * spi.metricsUpdateFreq + 50; - /** Last time heartbeat message has been sent. */ - private long lastTimeHbMsgSent; + /** Last time metrics update message has been sent. */ + private long lastTimeMetricsUpdateMsgSent; /** Time when the last status message has been sent. */ private long lastTimeConnCheckMsgSent; @@ -2483,7 +2486,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (spi.failureDetectionTimeoutEnabled()) connCheckThreshold = spi.failureDetectionTimeout(); else - connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.getHeartbeatFrequency()); + connCheckThreshold = Math.min(spi.getSocketTimeout(), spi.metricsUpdateFreq); for (int i = 3; i > 0; i--) { connCheckFreq = connCheckThreshold / i; @@ -2502,7 +2505,7 @@ class ServerImpl extends TcpDiscoveryImpl { * @param msg Message to process. */ @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) { - sendHeartbeatMessage(); + sendMetricsUpdateMessage(); DebugLogger log = messageLogger(msg); @@ -2555,8 +2558,8 @@ class ServerImpl extends TcpDiscoveryImpl { else if (msg instanceof TcpDiscoveryNodeFailedMessage) processNodeFailedMessage((TcpDiscoveryNodeFailedMessage)msg); - else if (msg instanceof TcpDiscoveryHeartbeatMessage) - processHeartbeatMessage((TcpDiscoveryHeartbeatMessage)msg); + else if (msg instanceof TcpDiscoveryMetricsUpdateMessage) + processMetricsUpdateMessage((TcpDiscoveryMetricsUpdateMessage)msg); else if (msg instanceof TcpDiscoveryStatusCheckMessage) processStatusCheckMessage((TcpDiscoveryStatusCheckMessage)msg); @@ -2594,9 +2597,9 @@ class ServerImpl extends TcpDiscoveryImpl { checkConnection(); - sendHeartbeatMessage(); + sendMetricsUpdateMessage(); - checkHeartbeatsReceiving(); + checkMetricsReceiving(); checkPendingCustomMessages(); @@ -2750,7 +2753,7 @@ class ServerImpl extends TcpDiscoveryImpl { while (true) { if (sock == null) { if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); nextNodeExists = false; @@ -2918,7 +2921,7 @@ class ServerImpl extends TcpDiscoveryImpl { pendingMsgs.discardId, pendingMsgs.customDiscardId); if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); try { spi.writeToSocket(sock, out, pendingMsg, timeoutHelper.nextTimeoutChunk( @@ -2958,7 +2961,7 @@ class ServerImpl extends TcpDiscoveryImpl { long tstamp = U.currentTimeMillis(); if (timeoutHelper == null) - timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); if (!failedNodes.isEmpty()) { for (TcpDiscoveryNode failedNode : failedNodes) { @@ -3817,7 +3820,9 @@ class ServerImpl extends TcpDiscoveryImpl { for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) { try { - sendMessageDirectly(msg, addr); + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true); + + sendMessageDirectly(msg, addr, timeoutHelper); node.lastSuccessfulAddress(addr); @@ -3853,7 +3858,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (node != null) { node.clientRouterNodeId(msg.routerNodeId()); - node.aliveCheck(spi.maxMissedClientHbs); + node.clientAliveTime(spi.clientFailureDetectionTimeout()); } if (isLocalNodeCoordinator()) { @@ -4083,7 +4088,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (msg.client()) - node.aliveCheck(spi.maxMissedClientHbs); + node.clientAliveTime(spi.clientFailureDetectionTimeout()); boolean topChanged = ring.add(node); @@ -4830,7 +4835,7 @@ class ServerImpl extends TcpDiscoveryImpl { } if (locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null && - U.currentTimeMillis() - locNode.lastUpdateTime() < spi.hbFreq) { + U.currentTimeMillis() - locNode.lastUpdateTime() < spi.metricsUpdateFreq) { if (log.isDebugEnabled()) log.debug("Status check message discarded (local node receives updates)."); @@ -4873,11 +4878,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Processes regular heartbeat message. + * Processes regular metrics update message. * - * @param msg Heartbeat message. + * @param msg Metrics update message. */ - private void processHeartbeatMessage(TcpDiscoveryHeartbeatMessage msg) { + private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { assert msg != null; assert !msg.client(); @@ -4886,7 +4891,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (ring.node(msg.creatorNodeId()) == null) { if (log.isDebugEnabled()) - log.debug("Discarding heartbeat message issued by unknown node [msg=" + msg + + log.debug("Discarding metrics update message issued by unknown node [msg=" + msg + ", ring=" + ring + ']'); return; @@ -4894,14 +4899,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (isLocalNodeCoordinator() && !locNodeId.equals(msg.creatorNodeId())) { if (log.isDebugEnabled()) - log.debug("Discarding heartbeat message issued by non-coordinator node: " + msg); + log.debug("Discarding metrics update message issued by non-coordinator node: " + msg); return; } if (!isLocalNodeCoordinator() && locNodeId.equals(msg.creatorNodeId())) { if (log.isDebugEnabled()) - log.debug("Discarding heartbeat message issued by local node (node is no more coordinator): " + + log.debug("Discarding metrics update message issued by local node (node is no more coordinator): " + msg); return; @@ -4909,7 +4914,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) { if (log.isTraceEnabled()) - log.trace("Discarding heartbeat message that has made two passes: " + msg); + log.trace("Discarding metrics update message that has made two passes: " + msg); return; } @@ -4918,10 +4923,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (spiStateCopy() == CONNECTED) { if (msg.hasMetrics()) { - for (Map.Entry e : msg.metrics().entrySet()) { + for (Map.Entry e : msg.metrics().entrySet()) { UUID nodeId = e.getKey(); - TcpDiscoveryHeartbeatMessage.MetricsSet metricsSet = e.getValue(); + TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); @@ -4960,11 +4965,11 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryNode clientNode : ring.clientNodes()) { if (clientNode.visible()) { if (clientNodeIds.contains(clientNode.id())) - clientNode.aliveCheck(spi.maxMissedClientHbs); + clientNode.clientAliveTime(spi.clientFailureDetectionTimeout()); else { - int aliveCheck = clientNode.decrementAliveCheck(); + boolean aliveCheck = clientNode.isClientAlive(); - if (aliveCheck <= 0 && isLocalNodeCoordinator()) { + if (!aliveCheck && isLocalNodeCoordinator()) { boolean failedNode; synchronized (mux) { @@ -4972,6 +4977,12 @@ class ServerImpl extends TcpDiscoveryImpl { } if (!failedNode) { + U.warn(log, "Failing client node due to not receiving metrics updates " + + "from client node within " + + "'IgniteConfiguration.clientFailureDetectionTimeout' " + + "(consider increasing configuration property) " + + "[timeout=" + spi.clientFailureDetectionTimeout() + ", node=" + clientNode + ']'); + TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage( locNodeId, clientNode.id(), clientNode.internalOrder()); @@ -5027,7 +5038,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Message. */ - private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + private boolean hasMetrics(TcpDiscoveryMetricsUpdateMessage msg, UUID nodeId) { return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId); } @@ -5338,34 +5349,34 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Sends heartbeat message if needed. + * Sends metrics update message if needed. */ - private void sendHeartbeatMessage() { - long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis(); + private void sendMetricsUpdateMessage() { + long elapsed = (lastTimeMetricsUpdateMsgSent + spi.metricsUpdateFreq) - U.currentTimeMillis(); if (elapsed > 0 || !isLocalNodeCoordinator()) return; - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId()); + TcpDiscoveryMetricsUpdateMessage msg = new TcpDiscoveryMetricsUpdateMessage(getConfiguredNodeId()); msg.verify(getLocalNodeId()); msgWorker.addMessage(msg); - lastTimeHbMsgSent = U.currentTimeMillis(); + lastTimeMetricsUpdateMsgSent = U.currentTimeMillis(); } /** - * Check the last time a heartbeat message received. If the time is bigger than {@code hbCheckTimeout} than - * {@link TcpDiscoveryStatusCheckMessage} is sent across the ring. + * Checks the last time a metrics update message received. If the time is bigger than {@code metricsCheckFreq} + * than {@link TcpDiscoveryStatusCheckMessage} is sent across the ring. */ - private void checkHeartbeatsReceiving() { + private void checkMetricsReceiving() { if (lastTimeStatusMsgSent < locNode.lastUpdateTime()) lastTimeStatusMsgSent = locNode.lastUpdateTime(); long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime); - long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis(); + long elapsed = (updateTime + metricsCheckFreq) - U.currentTimeMillis(); if (elapsed > 0) return; @@ -5548,6 +5559,8 @@ class ServerImpl extends TcpDiscoveryImpl { ClientMessageWorker clientMsgWrk = null; + boolean srvSock; + try { InputStream in; @@ -5618,7 +5631,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); IgniteSpiOperationTimeoutHelper timeoutHelper = - new IgniteSpiOperationTimeoutHelper(spi); + new IgniteSpiOperationTimeoutHelper(spi, true); if (req.clientNodeId() != null) { ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); @@ -5638,6 +5651,8 @@ class ServerImpl extends TcpDiscoveryImpl { // Handshake. TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; + srvSock = !req.client(); + UUID nodeId = req.creatorNodeId(); this.nodeId = nodeId; @@ -5648,8 +5663,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (req.client()) res.clientAck(true); - spi.writeToSocket(sock, res, spi.failureDetectionTimeoutEnabled() ? - spi.failureDetectionTimeout() : spi.getSocketTimeout()); + spi.writeToSocket(sock, res, spi.getEffectiveSocketTimeout(srvSock)); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. @@ -5764,8 +5778,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() : - spi.getSocketTimeout(); + long sockTimeout = spi.getEffectiveSocketTimeout(srvSock); while (!isInterrupted()) { try { @@ -5950,10 +5963,10 @@ class ServerImpl extends TcpDiscoveryImpl { continue; } - TcpDiscoveryClientHeartbeatMessage heartbeatMsg = null; + TcpDiscoveryClientMetricsUpdateMessage metricsUpdateMsg = null; - if (msg instanceof TcpDiscoveryClientHeartbeatMessage) - heartbeatMsg = (TcpDiscoveryClientHeartbeatMessage)msg; + if (msg instanceof TcpDiscoveryClientMetricsUpdateMessage) + metricsUpdateMsg = (TcpDiscoveryClientMetricsUpdateMessage)msg; else msgWorker.addMessage(msg); @@ -5968,8 +5981,8 @@ class ServerImpl extends TcpDiscoveryImpl { else spi.writeToSocket(msg, sock, RES_OK, sockTimeout); - if (heartbeatMsg != null) - processClientHeartbeatMessage(heartbeatMsg); + if (metricsUpdateMsg != null) + processClientMetricsUpdateMessage(metricsUpdateMsg); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -6037,11 +6050,11 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Processes client heartbeat message. + * Processes client metrics update message. * - * @param msg Heartbeat message. + * @param msg Client metrics update message. */ - private void processClientHeartbeatMessage(TcpDiscoveryClientHeartbeatMessage msg) { + private void processClientMetricsUpdateMessage(TcpDiscoveryClientMetricsUpdateMessage msg) { assert msg.client(); ClientMessageWorker wrk = clientMsgWorkers.get(msg.creatorNodeId()); @@ -6049,7 +6062,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (wrk != null) wrk.metrics(msg.metrics()); else if (log.isDebugEnabled()) - log.debug("Received heartbeat message from unknown client node: " + msg); + log.debug("Received client metrics update message from unknown client node: " + msg); } /** @@ -6286,7 +6299,7 @@ class ServerImpl extends TcpDiscoveryImpl { + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? - spi.failureDetectionTimeout() : spi.getSocketTimeout()); + spi.clientFailureDetectionTimeout() : spi.getSocketTimeout()); } } else { @@ -6296,8 +6309,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert topologyInitialized(msg) : msg; - spi.writeToSocket(sock, msg, msgBytes, spi.failureDetectionTimeoutEnabled() ? - spi.failureDetectionTimeout() : spi.getSocketTimeout()); + spi.writeToSocket(sock, msg, msgBytes, spi.getEffectiveSocketTimeout(false)); } boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage && http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 25804c7..46d6f06 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -138,7 +138,6 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive * settings are recommended (which allows failure detection time ~200ms): *

    - *
  • Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms
  • *
  • Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms
  • *
  • Message acknowledgement timeout (see {@link #setAckTimeout(long)}) - 50ms
  • *
@@ -166,8 +165,6 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; *
  • Local port to bind to (see {@link #setLocalPort(int)})
  • *
  • Local port range to try binding to if previous ports are in use * (see {@link #setLocalPortRange(int)})
  • - *
  • Heartbeat frequency (see {@link #setHeartbeatFrequency(long)})
  • - *
  • Max missed heartbeats (see {@link #setMaxMissedHeartbeats(int)})
  • *
  • Number of times node tries to (re)establish connection to another node * (see {@link #setReconnectCount(int)})
  • *
  • Network timeout (see {@link #setNetworkTimeout(long)})
  • @@ -241,8 +238,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Default value for thread priority (value is 10). */ public static final int DFLT_THREAD_PRI = 10; - /** Default heartbeat messages issuing frequency (value is 2000ms). */ - public static final long DFLT_HEARTBEAT_FREQ = 2000; + /** + * Default metrics update messages issuing frequency + * (value is {@link IgniteConfiguration#DFLT_METRICS_UPDATE_FREQ}). + */ + public static final long DFLT_METRICS_UPDATE_FREQ = IgniteConfiguration.DFLT_METRICS_UPDATE_FREQ; /** Default size of topology snapshots history. */ public static final int DFLT_TOP_HISTORY_SIZE = 1000; @@ -262,12 +262,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Default reconnect attempts count (value is 10). */ public static final int DFLT_RECONNECT_CNT = 10; - /** Default max heartbeats count node can miss without initiating status check (value is 1). */ - public static final int DFLT_MAX_MISSED_HEARTBEATS = 1; - - /** Default max heartbeats count node can miss without failing client node (value is 5). */ - public static final int DFLT_MAX_MISSED_CLIENT_HEARTBEATS = 5; - /** Default IP finder clean frequency in milliseconds (value is 60,000ms). */ public static final long DFLT_IP_FINDER_CLEAN_FREQ = 60 * 1000; @@ -302,8 +296,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Thread priority for all threads started by SPI. */ protected int threadPri = DFLT_THREAD_PRI; - /** Heartbeat messages issuing frequency. */ - protected long hbFreq = DFLT_HEARTBEAT_FREQ; + /** Metrics update messages issuing frequency. */ + protected long metricsUpdateFreq = DFLT_METRICS_UPDATE_FREQ; /** Size of topology snapshots history. */ protected int topHistSize = DFLT_TOP_HISTORY_SIZE; @@ -361,12 +355,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** Maximum message acknowledgement timeout. */ private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT; - /** Max heartbeats count node can miss without initiating status check. */ - protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS; - - /** Max heartbeats count node can miss without failing client node. */ - protected int maxMissedClientHbs = DFLT_MAX_MISSED_CLIENT_HEARTBEATS; - /** IP finder clean frequency. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) protected long ipFinderCleanFreq = DFLT_IP_FINDER_CLEAN_FREQ; @@ -731,56 +719,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** - * Gets max heartbeats count node can miss without initiating status check. - * - * @return Max missed heartbeats. - */ - public int getMaxMissedHeartbeats() { - return maxMissedHbs; - } - - /** - * Sets max heartbeats count node can miss without initiating status check. - *

    - * If not provided, default value is {@link #DFLT_MAX_MISSED_HEARTBEATS}. - *

    - * Affected server nodes only. - * - * @param maxMissedHbs Max missed heartbeats. - * @return {@code this} for chaining. - */ - @IgniteSpiConfiguration(optional = true) - public TcpDiscoverySpi setMaxMissedHeartbeats(int maxMissedHbs) { - this.maxMissedHbs = maxMissedHbs; - - return this; - } - - /** - * Gets max heartbeats count node can miss without failing client node. - * - * @return Max missed client heartbeats. - */ - public int getMaxMissedClientHeartbeats() { - return maxMissedClientHbs; - } - - /** - * Sets max heartbeats count node can miss without failing client node. - *

    - * If not provided, default value is {@link #DFLT_MAX_MISSED_CLIENT_HEARTBEATS}. - * - * @param maxMissedClientHbs Max missed client heartbeats. - * @return {@code this} for chaining. - */ - @IgniteSpiConfiguration(optional = true) - public TcpDiscoverySpi setMaxMissedClientHeartbeats(int maxMissedClientHbs) { - this.maxMissedClientHbs = maxMissedClientHbs; - - return this; - } - - /** * Gets statistics print frequency. * * @return Statistics print frequency in milliseconds. @@ -966,22 +904,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** - * Sets delay between issuing of heartbeat messages. SPI sends heartbeat messages - * in configurable time interval to other nodes to notify them about its state. - *

    - * If not provided, default value is {@link #DFLT_HEARTBEAT_FREQ}. - * - * @param hbFreq Heartbeat frequency in milliseconds. - * @return {@code this} for chaining. - */ - @IgniteSpiConfiguration(optional = true) - public TcpDiscoverySpi setHeartbeatFrequency(long hbFreq) { - this.hbFreq = hbFreq; - - return this; - } - - /** * @return Size of topology snapshots history. */ public long getTopHistorySize() { @@ -1180,6 +1102,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** + * Gets effective or resulting socket timeout with considering failure detection timeout + * + * @param srvrOperation {@code True} if socket connect to server node, + * {@code False} if socket connect to client node. + * @return Resulting socket timeout. + */ + public long getEffectiveSocketTimeout(boolean srvrOperation) { + if (failureDetectionTimeoutEnabled()) + return srvrOperation ? failureDetectionTimeout() : clientFailureDetectionTimeout(); + else + return sockTimeout; + } + + /** * Gets message acknowledgement timeout. * * @return Message acknowledgement timeout. @@ -1207,19 +1143,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - public long getHeartbeatFrequency() { - return hbFreq; - } - - /** * Gets {@link org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder} (string representation). * * @return IPFinder (string representation). - */public String getIpFinderFormatted() { + */ + public String getIpFinderFormatted() { return ipFinder.toString(); } @@ -1939,6 +1867,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { impl = new ServerImpl(this); } + metricsUpdateFreq = ignite.configuration().getMetricsUpdateFrequency(); + if (!failureDetectionTimeoutEnabled()) { assertParameter(sockTimeout > 0, "sockTimeout > 0"); assertParameter(ackTimeout > 0, "ackTimeout > 0"); @@ -1948,14 +1878,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { assertParameter(netTimeout > 0, "networkTimeout > 0"); assertParameter(ipFinder != null, "ipFinder != null"); - assertParameter(hbFreq > 0, "heartbeatFreq > 0"); + assertParameter(metricsUpdateFreq > 0, "metricsUpdateFreq > 0" + + " (inited from igniteConfiguration.metricsUpdateFrequency)"); assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0"); assertParameter(locPort > 1023, "localPort > 1023"); assertParameter(locPortRange >= 0, "localPortRange >= 0"); assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff"); - assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0"); - assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0"); assertParameter(threadPri > 0, "threadPri > 0"); assertParameter(statsPrintFreq >= 0, "statsPrintFreq >= 0"); @@ -2000,8 +1929,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { log.debug(configInfo("ipFinder", ipFinder)); log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq)); - log.debug(configInfo("heartbeatFreq", hbFreq)); - log.debug(configInfo("maxMissedHeartbeats", maxMissedHbs)); + log.debug(configInfo("metricsUpdateFreq", metricsUpdateFreq)); log.debug(configInfo("statsPrintFreq", statsPrintFreq)); } @@ -2336,21 +2264,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { } /** {@inheritDoc} */ - @Override public long getHeartbeatFrequency() { - return TcpDiscoverySpi.this.getHeartbeatFrequency(); - } - - /** {@inheritDoc} */ - @Override public int getMaxMissedHeartbeats() { - return TcpDiscoverySpi.this.getMaxMissedHeartbeats(); - } - - /** {@inheritDoc} */ - @Override public int getMaxMissedClientHeartbeats() { - return TcpDiscoverySpi.this.getMaxMissedClientHeartbeats(); - } - - /** {@inheritDoc} */ @Override public long getStatisticsPrintFrequency() { return TcpDiscoverySpi.this.getStatisticsPrintFrequency(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java index 1427929..a05ecde 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiMBean.java @@ -28,14 +28,6 @@ import org.jetbrains.annotations.Nullable; */ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { /** - * Gets delay between heartbeat messages sent by coordinator. - * - * @return Time period in milliseconds. - */ - @MXBeanDescription("Heartbeat frequency.") - public long getHeartbeatFrequency(); - - /** * Gets current SPI state. * * @return Current SPI state. @@ -84,22 +76,6 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { public int getLocalPortRange(); /** - * Gets max heartbeats count node can miss without initiating status check. - * - * @return Max missed heartbeats. - */ - @MXBeanDescription("Max missed heartbeats.") - public int getMaxMissedHeartbeats(); - - /** - * Gets max heartbeats count node can miss without failing client node. - * - * @return Max missed client heartbeats. - */ - @MXBeanDescription("Max missed client heartbeats.") - public int getMaxMissedClientHeartbeats(); - - /** * Gets thread priority. All threads within SPI will be started with it. * * @return Thread priority. @@ -281,4 +257,4 @@ public interface TcpDiscoverySpiMBean extends IgniteSpiManagementMBean { */ @MXBeanDescription("Client mode.") public boolean isClientMode() throws IllegalStateException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index d778854..6882821 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -102,7 +102,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** Node order in the topology (internal). */ private volatile long intOrder; - /** The most recent time when heartbeat message was received from the node. */ + /** The most recent time when metrics update message was received from the node. */ @GridToStringExclude private volatile long lastUpdateTime = U.currentTimeMillis(); @@ -123,9 +123,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** Version. */ private IgniteProductVersion ver; - /** Alive check (used by clients). */ + /** Alive check time (used by clients). */ @GridToStringExclude - private transient int aliveCheck; + private transient long aliveCheckTime; /** Client router node ID. */ @GridToStringExclude @@ -291,9 +291,8 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated * and provide up to date information about caches. *

    - * Cache metrics are updated with some delay which is directly related to heartbeat - * frequency. For example, when used with default - * {@link TcpDiscoverySpi} the update will happen every {@code 2} seconds. + * Cache metrics are updated with some delay which is directly related to metrics update + * frequency. For example, by default the update will happen every {@code 2} seconds. * * @return Runtime metrics snapshots for this node. */ @@ -414,7 +413,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** * Gets node last update time. * - * @return Time of the last heartbeat. + * @return Time of the last metrics update. */ public long lastUpdateTime() { return lastUpdateTime; @@ -473,23 +472,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste } /** - * Decrements alive check value and returns new one. + * Test alive check time value. * - * @return Alive check value. + * @return {@code True} if client alive, {@code False} otherwise. */ - public int decrementAliveCheck() { - assert isClient(); + public boolean isClientAlive() { + assert isClient() : this; - return --aliveCheck; + return (aliveCheckTime - U.currentTimeMillis()) >= 0; } /** - * @param aliveCheck Alive check value. + * Set client alive time. + * + * @param aliveTime Alive time interval. */ - public void aliveCheck(int aliveCheck) { - assert isClient(); + public void clientAliveTime(long aliveTime) { + assert isClient() : this; - this.aliveCheck = aliveCheck; + this.aliveCheckTime = U.currentTimeMillis() + aliveTime; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java deleted file mode 100644 index ade5468..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientHeartbeatMessage.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp.messages; - -import java.util.UUID; -import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.internal.ClusterMetricsSnapshot; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * Heartbeat message. - *

    - * Client sends his heartbeats in this message. - */ -public class TcpDiscoveryClientHeartbeatMessage extends TcpDiscoveryAbstractMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final byte[] metrics; - - /** - * Constructor. - * - * @param creatorNodeId Creator node. - * @param metrics Metrics. - */ - public TcpDiscoveryClientHeartbeatMessage(UUID creatorNodeId, ClusterMetrics metrics) { - super(creatorNodeId); - - this.metrics = ClusterMetricsSnapshot.serialize(metrics); - } - - /** - * Gets metrics map. - * - * @return Metrics map. - */ - public ClusterMetrics metrics() { - return ClusterMetricsSnapshot.deserialize(metrics, 0); - } - - /** {@inheritDoc} */ - @Override public boolean highPriority() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean traceLogLevel() { - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(TcpDiscoveryClientHeartbeatMessage.class, this, "super", super.toString()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6998785a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java new file mode 100644 index 0000000..b56cd01 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientMetricsUpdateMessage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.UUID; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.internal.ClusterMetricsSnapshot; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Metrics update message. + *

    + * Client sends his metrics in this message. + */ +public class TcpDiscoveryClientMetricsUpdateMessage extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final byte[] metrics; + + /** + * Constructor. + * + * @param creatorNodeId Creator node. + * @param metrics Metrics. + */ + public TcpDiscoveryClientMetricsUpdateMessage(UUID creatorNodeId, ClusterMetrics metrics) { + super(creatorNodeId); + + this.metrics = ClusterMetricsSnapshot.serialize(metrics); + } + + /** + * Gets metrics map. + * + * @return Metrics map. + */ + public ClusterMetrics metrics() { + return ClusterMetricsSnapshot.deserialize(metrics, 0); + } + + /** {@inheritDoc} */ + @Override public boolean highPriority() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean traceLogLevel() { + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientMetricsUpdateMessage.class, this, "super", super.toString()); + } +} \ No newline at end of file