Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC9751807B for ; Mon, 8 Jun 2015 10:40:24 +0000 (UTC) Received: (qmail 76128 invoked by uid 500); 8 Jun 2015 10:40:24 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 76094 invoked by uid 500); 8 Jun 2015 10:40:24 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 76085 invoked by uid 99); 8 Jun 2015 10:40:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 10:40:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 08 Jun 2015 10:38:01 +0000 Received: (qmail 74764 invoked by uid 99); 8 Jun 2015 10:39:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 10:39:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24579DFF82; Mon, 8 Jun 2015 10:39:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 08 Jun 2015 10:39:51 -0000 Message-Id: <043fce6b74f44a1c8593652ca0bdedd6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/21] incubator-ignite git commit: # ignite-883 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-883 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/873e01bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/873e01bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/873e01bb Branch: refs/heads/ignite-sprint-5 Commit: 873e01bbb61ed5782635fa81e1c9834f261f50fe Parents: 7a2e898 Author: sboikov Authored: Tue Jun 2 12:29:00 2015 +0300 Committer: sboikov Committed: Wed Jun 3 16:52:47 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 38 +- .../internal/managers/GridManagerAdapter.java | 9 + .../checkpoint/GridCheckpointManager.java | 52 ++- .../processors/cache/GridCacheAdapter.java | 4 + .../processors/cache/GridCacheTtlManager.java | 9 +- .../cache/transactions/IgniteTxManager.java | 3 - .../datastructures/DataStructuresProcessor.java | 66 +-- .../timeout/GridSpiTimeoutObject.java | 59 +++ .../timeout/GridTimeoutProcessor.java | 24 +- .../util/nio/GridCommunicationClient.java | 26 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 23 +- .../org/apache/ignite/spi/IgniteSpiContext.java | 10 + .../ignite/spi/IgniteSpiTimeoutObject.java | 40 ++ .../spi/checkpoint/noop/NoopCheckpointSpi.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 409 ++++++------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 3 - .../ignite/spi/discovery/tcp/ServerImpl.java | 1 - .../spi/discovery/tcp/TcpDiscoverySpi.java | 156 +------ .../IgniteCountDownLatchAbstractSelfTest.java | 102 +++++ .../IgniteCacheClientNearCacheExpiryTest.java | 103 +++++ .../IgniteCacheExpiryPolicyTestSuite.java | 2 + .../testframework/GridSpiTestContext.java | 10 + 22 files changed, 642 insertions(+), 510 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index ed33365..5cbe377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1447,6 +1447,17 @@ public class IgnitionEx { ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi()); } + execSvc = new IgniteThreadPoolExecutor( + "pub-" + cfg.getGridName(), + cfg.getPublicThreadPoolSize(), + cfg.getPublicThreadPoolSize(), + DFLT_PUBLIC_KEEP_ALIVE_TIME, + new LinkedBlockingQueue(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); + + if (!myCfg.isClientMode()) + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads(); + // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. sysExecSvc = new IgniteThreadPoolExecutor( @@ -1456,30 +1467,8 @@ public class IgnitionEx { DFLT_SYSTEM_KEEP_ALIVE_TIME, new LinkedBlockingQueue(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP)); - boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode()); - - if (isClientMode) { - execSvc = new IgniteThreadPoolExecutor( - "pub-" + cfg.getGridName(), - 0, - cfg.getPublicThreadPoolSize(), - 2000, - new LinkedBlockingQueue(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); - } - else { - execSvc = new IgniteThreadPoolExecutor( - "pub-" + cfg.getGridName(), - cfg.getPublicThreadPoolSize(), - cfg.getPublicThreadPoolSize(), - DFLT_PUBLIC_KEEP_ALIVE_TIME, - new LinkedBlockingQueue(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP)); - - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads(); - - // Pre-start all threads as they are guaranteed to be needed. - ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads(); - } + // Pre-start all threads as they are guaranteed to be needed. + ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads(); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -2007,7 +1996,6 @@ public class IgnitionEx { ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setCacheMode(cfg.getCacheMode()); ccfg.setNodeFilter(CacheConfiguration.ALL_NODES); - ccfg.setNearConfiguration(new NearCacheConfiguration()); if (cfg.getCacheMode() == PARTITIONED) ccfg.setBackups(cfg.getBackups()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 1eb7143..bea4256 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -483,6 +484,14 @@ public abstract class GridManagerAdapter implements GridMan return ctx.discovery().tryFailNode(nodeId); } + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); + } + + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); + } + /** * @param e Exception to handle. * @return GridSpiException Converted exception. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java index 2e80b6f..ce2a36c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java @@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter { private final GridMessageListener lsnr = new CheckpointRequestListener(); /** */ - private final ConcurrentMap keyMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap keyMap; /** */ - private final Collection closedSess = new GridBoundedConcurrentLinkedHashSet<>( - MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q); + private final Collection closedSess; /** Grid marshaller. */ private final Marshaller marsh; @@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter { super(ctx, ctx.config().getCheckpointSpi()); marsh = ctx.config().getMarshaller(); + + if (enabled()) { + keyMap = new ConcurrentHashMap8<>(); + + closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS, + MAX_CLOSED_SESS, + 0.75f, + 256, + PER_SEGMENT_Q); + } + else { + keyMap = null; + + closedSess = null; + } } /** {@inheritDoc} */ @@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter { * @return Session IDs. */ public Collection sessionIds() { - return new ArrayList<>(keyMap.keySet()); + return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.emptyList(); } /** @@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter { * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise. * @throws IgniteCheckedException Thrown in case of any errors. */ - public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope, - long timeout, boolean override) throws IgniteCheckedException { + public boolean storeCheckpoint(GridTaskSessionInternal ses, + String key, + Object state, + ComputeTaskSessionScope scope, + long timeout, + boolean override) + throws IgniteCheckedException + { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(String key) { + if (!enabled()) + return false; + assert key != null; boolean rmv = false; @@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter { * @return Whether or not checkpoint was removed. */ public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) { + if (!enabled()) + return false; + assert ses != null; assert key != null; @@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException { + if (!enabled()) + return null; + assert ses != null; assert key != null; @@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter { * @param cleanup Whether cleanup or not. */ public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) { + if (!enabled()) + return; + closedSess.add(ses.getId()); // If on task node. @@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter { @Override public void printMemoryStats() { X.println(">>>"); X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> keyMap: " + keyMap.size()); + X.println(">>> keyMap: " + (keyMap != null ? keyMap.size() : 0)); } /** @@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter { if (log.isDebugEnabled()) log.debug("Received checkpoint request: " + req); + if (!enabled()) + return; + IgniteUuid sesId = req.getSessionId(); if (closedSess.contains(sesId)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9d98ce7..4216895 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -395,6 +395,10 @@ public abstract class GridCacheAdapter implements IgniteInternalCache withExpiryPolicy(ExpiryPolicy plc) { + assert !CU.isUtilityCache(ctx.name()); + assert !CU.isAtomicsCache(ctx.name()); + assert !CU.isMarshallerCache(ctx.name()); + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc); return new GridCacheProxyImpl<>(ctx, this, opCtx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5f9049a..9bd6321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) + boolean cleanupDisabled = cctx.kernalContext().isDaemon() || + !cctx.config().isEagerTtl() || + CU.isAtomicsCache(cctx.name()) || + CU.isMarshallerCache(cctx.name()) || + CU.isUtilityCache(cctx.name()) || + (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null); + + if (cleanupDisabled) return; cleanupWorker = new CleanupWorker(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4666cca..b6c77f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { collectPendingVersions(dhtTxLoc); } - // 3.1 Call dataStructures manager. - cctx.kernalContext().dataStructures().onTxCommitted(tx); - // 4. Unlock write resources. unlockMultiple(tx, tx.writeEntries()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 72911af..27f6a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -32,6 +32,7 @@ import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; +import javax.cache.event.*; import javax.cache.processor.*; import java.io.*; import java.util.*; @@ -40,7 +41,6 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheRebalanceMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*; import static org.apache.ignite.transactions.TransactionConcurrency.*; import static org.apache.ignite.transactions.TransactionIsolation.*; @@ -99,6 +99,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** */ private IgniteInternalCache> utilityDataCache; + /** */ + private UUID qryId; + /** * @param ctx Context. */ @@ -112,7 +115,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void onKernalStart() { + @Override public void onKernalStart() throws IgniteCheckedException { if (ctx.config().isDaemon()) return; @@ -139,10 +142,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { seqView = atomicsCache; - dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); + dsCacheCtx = atomicsCache.context(); + + qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(), + null, + dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(), + false); } } + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + if (qryId != null) + dsCacheCtx.continuousQueries().cancelInternalQuery(qryId); + } + /** * Gets a sequence from cache or creates one if it's not cached. * @@ -906,8 +922,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = cast(dsView.get(key), - GridCacheCountDownLatchValue.class); + GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); // Check that count down hasn't been created in other thread yet. GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class); @@ -1034,28 +1049,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** - * Transaction committed callback for transaction manager. * - * @param tx Committed transaction. */ - public void onTxCommitted(IgniteInternalTx tx) { - if (dsCacheCtx == null) - return; - - if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) { - Collection entries = tx.writeEntries(); - - if (log.isDebugEnabled()) - log.debug("Committed entries: " + entries); - - for (IgniteTxEntry entry : entries) { - // Check updated or created GridCacheInternalKey keys. - if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) { - GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false); - - Object val0 = CU.value(entry.value(), entry.context(), false); + private class DataStructuresEntryListener implements CacheEntryUpdatedListener { + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent evt : evts) { + if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) { + GridCacheInternal val0 = evt.getValue(); if (val0 instanceof GridCacheCountDownLatchValue) { + GridCacheInternalKey key = evt.getKey(); + // Notify latch on changes. GridCacheRemovable latch = dsMap.get(key); @@ -1067,8 +1073,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { latch0.onUpdate(val.get()); if (val.get() == 0 && val.autoDelete()) { - entry.cached().markObsolete(dsCacheCtx.versions().next()); - dsMap.remove(key); latch.onRemoved(); @@ -1080,11 +1084,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { ", actual=" + latch.getClass() + ", value=" + latch + ']'); } } + } + else { + assert evt.getEventType() == EventType.REMOVED : evt; - // Check deleted GridCacheInternal keys. - if (entry.op() == DELETE && entry.key().internal()) { - GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false); + GridCacheInternal key = evt.getKey(); // Entry's val is null if entry deleted. GridCacheRemovable obj = dsMap.remove(key); @@ -1094,6 +1099,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataStructuresEntryListener.class, this); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java new file mode 100644 index 0000000..82267a2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.timeout; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; + +/** + * Wrapper for {@link IgniteSpiTimeoutObject}. + */ +public class GridSpiTimeoutObject implements GridTimeoutObject { + /** */ + @GridToStringInclude + private final IgniteSpiTimeoutObject obj; + + /** + * @param obj SPI object. + */ + public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) { + this.obj = obj; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + obj.onTimeout(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return obj.id(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return obj.endTime(); + } + + /** {@inheritDoc} */ + @Override public final String toString() { + return S.toString(GridSpiTimeoutObject.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java index e9b7717..e4f370c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; @@ -111,8 +112,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { * @return Cancelable to cancel task. */ public CancelableTask schedule(Runnable task, long delay, long period) { - assert delay >= 0; - assert period > 0 || period == -1; + assert delay >= 0 : delay; + assert period > 0 || period == -1 : period; CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period); @@ -203,7 +204,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { */ public class CancelableTask implements GridTimeoutObject, Closeable { /** */ - private final IgniteUuid id = new IgniteUuid(); + private final IgniteUuid id = IgniteUuid.randomUuid(); /** */ private long endTime; @@ -215,12 +216,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { private volatile boolean cancel; /** */ + @GridToStringInclude private final Runnable task; /** + * @param task Task to execute. * @param firstTime First time. * @param period Period. - * @param task Task to execute. */ CancelableTask(Runnable task, long firstTime, long period) { this.task = task; @@ -243,19 +245,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { if (cancel) return; - long startTime = U.currentTimeMillis(); - try { task.run(); } finally { - long executionTime = U.currentTimeMillis() - startTime; - - if (executionTime > 10) { - U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " + - "quickly [executionTime=" + executionTime + ']'); - } - if (!cancel && period > 0) { endTime = U.currentTimeMillis() + period; @@ -273,5 +266,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter { removeTimeoutObject(this); } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CancelableTask.class, this); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 31396fb..2f7fd88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -38,58 +38,58 @@ public interface GridCommunicationClient { * @param handshakeC Handshake. * @throws IgniteCheckedException If handshake failed. */ - void doHandshake(IgniteInClosure2X handshakeC) throws IgniteCheckedException; + public void doHandshake(IgniteInClosure2X handshakeC) throws IgniteCheckedException; /** * @return {@code True} if client has been closed by this call, * {@code false} if failed to close client (due to concurrent reservation or concurrent close). */ - boolean close(); + public boolean close(); /** * Forces client close. */ - void forceClose(); + public void forceClose(); /** * @return {@code True} if client is closed; */ - boolean closed(); + public boolean closed(); /** * @return {@code True} if client was reserved, {@code false} otherwise. */ - boolean reserve(); + public boolean reserve(); /** * Releases this client by decreasing reservations. */ - void release(); + public void release(); /** * @return {@code True} if client was reserved. */ - boolean reserved(); + public boolean reserved(); /** * Gets idle time of this client. * * @return Idle time of this client. */ - long getIdleTime(); + public long getIdleTime(); /** * @param data Data to send. * @throws IgniteCheckedException If failed. */ - void sendMessage(ByteBuffer data) throws IgniteCheckedException; + public void sendMessage(ByteBuffer data) throws IgniteCheckedException; /** * @param data Data to send. * @param len Length. * @throws IgniteCheckedException If failed. */ - void sendMessage(byte[] data, int len) throws IgniteCheckedException; + public void sendMessage(byte[] data, int len) throws IgniteCheckedException; /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). @@ -97,16 +97,16 @@ public interface GridCommunicationClient { * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; + public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; /** * @param timeout Timeout. * @throws IOException If failed. */ - void flushIfNeeded(long timeout) throws IOException; + public void flushIfNeeded(long timeout) throws IOException; /** * @return {@code True} if send is asynchronous. */ - boolean async(); + public boolean async(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index d3c0587..c9c633f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -542,10 +543,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement return U.spiAttribute(this, attrName); } + /** {@inheritDoc} */ + protected void addTimeoutObject(IgniteSpiTimeoutObject obj) { + spiCtx.addTimeoutObject(obj); + } + + /** {@inheritDoc} */ + protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + spiCtx.removeTimeoutObject(obj); + } + /** * Temporarily SPI context. */ - private static class GridDummySpiContext implements IgniteSpiContext { + private class GridDummySpiContext implements IgniteSpiContext { /** */ private final ClusterNode locNode; @@ -716,5 +727,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement @Override public boolean tryFailNode(UUID nodeId) { return false; } + + /** {@inheritDoc} */ + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj)); + } + + /** {@inheritDoc} */ + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 55f46e5..f83326c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -310,4 +310,14 @@ public interface IgniteSpiContext { * @return If node was failed. */ public boolean tryFailNode(UUID nodeId); + + /** + * @param c Timeout object. + */ + public void addTimeoutObject(IgniteSpiTimeoutObject c); + + /** + * @param c Timeout object. + */ + public void removeTimeoutObject(IgniteSpiTimeoutObject c); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java new file mode 100644 index 0000000..f7fbd27f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi; + +import org.apache.ignite.lang.*; + +/** + * + */ +public interface IgniteSpiTimeoutObject { + /** + * @return Unique object ID. + */ + public IgniteUuid id(); + + /** + * @return End time. + */ + public long endTime(); + + /** + * Timeout callback. + */ + public void onTimeout(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java index 460cff3..832d872 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java @@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi } /** {@inheritDoc} */ - @Override - public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException { + @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 19e54c8..b324ab2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -267,7 +267,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Session was closed but there are unacknowledged messages, " + "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); - recoveryWorker.addReconnectRequest(recoveryData); + commWorker.addReconnectRequest(recoveryData); } } else @@ -647,17 +647,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Socket write timeout. */ private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT; - /** Idle client worker. */ - private IdleClientWorker idleClientWorker; - /** Flush client worker. */ private ClientFlushWorker clientFlushWorker; - /** Socket timeout worker. */ - private SocketTimeoutWorker sockTimeoutWorker; - - /** Recovery worker. */ - private RecoveryWorker recoveryWorker; + /** Recovery and idle clients handler. */ + private CommunicationWorker commWorker; /** Clients. */ private final ConcurrentMap clients = GridConcurrentFactory.newMap(); @@ -1274,13 +1268,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.start(); - idleClientWorker = new IdleClientWorker(); - - idleClientWorker.start(); - - recoveryWorker = new RecoveryWorker(); + commWorker = new CommunicationWorker(); - recoveryWorker.start(); + commWorker.start(); if (connBufSize > 0) { clientFlushWorker = new ClientFlushWorker(); @@ -1288,10 +1278,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter clientFlushWorker.start(); } - sockTimeoutWorker = new SocketTimeoutWorker(); - - sockTimeoutWorker.start(); - // Ack start. if (log.isDebugEnabled()) log.debug(startInfo()); @@ -1445,15 +1431,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); // Force closing on stop (safety). for (GridCommunicationClient client : clients.values()) @@ -1461,7 +1443,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Clear resources. nioSrvr = null; - idleClientWorker = null; + commWorker = null; boundTcpPort = -1; @@ -1899,7 +1881,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ) throws IgniteCheckedException { HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); long rcvCnt = 0; @@ -2005,7 +1987,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Ignoring whatever happened after timeout - reporting only timeout event. if (!cancelled) @@ -2041,15 +2023,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (nioSrvr != null) nioSrvr.stop(); - U.interrupt(idleClientWorker); U.interrupt(clientFlushWorker); - U.interrupt(sockTimeoutWorker); - U.interrupt(recoveryWorker); + U.interrupt(commWorker); - U.join(idleClientWorker, log); U.join(clientFlushWorker, log); - U.join(sockTimeoutWorker, log); - U.join(recoveryWorker, log); + U.join(commWorker, log); for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -2156,119 +2134,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private class IdleClientWorker extends IgniteSpiThread { - /** - * - */ - IdleClientWorker() { - super(gridName, "nio-idle-client-collector", log); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"BusyWait"}) - @Override protected void body() throws InterruptedException { - while (!isInterrupted()) { - cleanupRecovery(); - - for (Map.Entry e : clients.entrySet()) { - UUID nodeId = e.getKey(); - - GridCommunicationClient client = e.getValue(); - - ClusterNode node = getSpiContext().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Forcing close of non-existent node connection: " + nodeId); - - client.forceClose(); - - clients.remove(nodeId, client); - - continue; - } - - GridNioRecoveryDescriptor recovery = null; - - if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); - - if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { - RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); - - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + - ", rcvCnt=" + msg.received() + ']'); - - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - - recovery.lastAcknowledged(msg.received()); - - continue; - } - } - - long idleTime = client.getIdleTime(); - - if (idleTime >= idleConnTimeout) { - if (recovery != null && - recovery.nodeAlive(getSpiContext().node(nodeId)) && - !recovery.messagesFutures().isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Node connection is idle, but there are unacknowledged messages, " + - "will wait: " + nodeId); - - continue; - } - - if (log.isDebugEnabled()) - log.debug("Closing idle node connection: " + nodeId); - - if (client.close() || client.closed()) - clients.remove(nodeId, client); - } - } - - Thread.sleep(idleConnTimeout); - } - } - - /** - * - */ - private void cleanupRecovery() { - Set left = null; - - for (Map.Entry e : recoveryDescs.entrySet()) { - if (left != null && left.contains(e.getKey())) - continue; - - GridNioRecoveryDescriptor recoverySnd = e.getValue(); - - if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { - if (left == null) - left = new HashSet<>(); - - left.add(e.getKey()); - } - } - - if (left != null) { - assert !left.isEmpty(); - - for (ClientKey id : left) { - GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); - - if (recoverySnd != null) - recoverySnd.onNodeLeft(); - } - } - } - } - - /** - * - */ private class ClientFlushWorker extends IgniteSpiThread { /** * @@ -2319,157 +2184,164 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * Handles sockets timeouts. + * */ - private class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator() { - @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) { - long time1 = o1.endTime(); - long time2 = o2.endTime(); - - long id1 = o1.id(); - long id2 = o2.id(); - - return time1 < time2 ? -1 : time1 > time2 ? 1 : - id1 < id2 ? -1 : id1 > id2 ? 1 : 0; - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); + private class CommunicationWorker extends IgniteSpiThread { + /** */ + private final BlockingQueue q = new LinkedBlockingQueue<>(); /** * */ - SocketTimeoutWorker() { - super(gridName, "tcp-comm-sock-timeout-worker", log); + private CommunicationWorker() { + super(gridName, "tcp-comm-worker", log); } - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + if (log.isDebugEnabled()) + log.debug("Tcp communication worker has been started."); - timeoutObjs.add(timeoutObj); + while (!isInterrupted()) { + GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS); - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } + if (recoveryDesc != null) + processRecovery(recoveryDesc); + else + processIdle(); } } /** - * @param timeoutObj Timeout object to remove. + * */ - public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) { - assert timeoutObj != null; + private void processIdle() { + cleanupRecovery(); - timeoutObjs.remove(timeoutObj); - } + for (Map.Entry e : clients.entrySet()) { + UUID nodeId = e.getKey(); - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); + GridCommunicationClient client = e.getValue(); - while (!isInterrupted()) { - long now = U.currentTimeMillis(); + ClusterNode node = getSpiContext().node(nodeId); - for (Iterator iter = timeoutObjs.iterator(); iter.hasNext(); ) { - HandshakeTimeoutObject timeoutObj = iter.next(); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Forcing close of non-existent node connection: " + nodeId); + + client.forceClose(); + + clients.remove(nodeId, client); + + continue; + } - if (timeoutObj.endTime() <= now) { - iter.remove(); + GridNioRecoveryDescriptor recovery = null; - timeoutObj.onTimeout(); + if (client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); + + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + + recovery.lastAcknowledged(msg.received()); + + continue; } - else - break; } - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - HandshakeTimeoutObject first = timeoutObjs.firstx(); + long idleTime = client.getIdleTime(); - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); + if (idleTime >= idleConnTimeout) { + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesFutures().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); + continue; } + + if (log.isDebugEnabled()) + log.debug("Closing idle node connection: " + nodeId); + + if (client.close() || client.closed()) + clients.remove(nodeId, client); } } } - } - - /** - * - */ - private class RecoveryWorker extends IgniteSpiThread { - /** */ - private final BlockingQueue q = new LinkedBlockingQueue<>(); /** * */ - private RecoveryWorker() { - super(gridName, "tcp-comm-recovery-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Recovery worker has been started."); + private void cleanupRecovery() { + Set left = null; - while (!isInterrupted()) { - GridNioRecoveryDescriptor recoveryDesc = q.take(); + for (Map.Entry e : recoveryDescs.entrySet()) { + if (left != null && left.contains(e.getKey())) + continue; - assert recoveryDesc != null; + GridNioRecoveryDescriptor recoverySnd = e.getValue(); - ClusterNode node = recoveryDesc.node(); + if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { + if (left == null) + left = new HashSet<>(); - if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) - continue; + left.add(e.getKey()); + } + } - try { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + if (left != null) { + assert !left.isEmpty(); - GridCommunicationClient client = reserveClient(node); + for (ClientKey id : left) { + GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); - client.release(); + if (recoverySnd != null) + recoverySnd.onNodeLeft(); } - catch (IgniteCheckedException | IgniteException e) { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + } + } - addReconnectRequest(recoveryDesc); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + /** + * @param recoveryDesc Recovery descriptor. + */ + private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { + ClusterNode node = recoveryDesc.node(); - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); - } + if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + + GridCommunicationClient client = reserveClient(node); + + client.release(); + } + catch (IgniteCheckedException | IgniteException e) { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + addReconnectRequest(recoveryDesc); + } + else { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", + e); } } } @@ -2497,12 +2369,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * */ - private static class HandshakeTimeoutObject { - /** */ - private static final AtomicLong idGen = new AtomicLong(); - + private static class HandshakeTimeoutObject implements IgniteSpiTimeoutObject { /** */ - private final long id = idGen.incrementAndGet(); + private final IgniteUuid id = IgniteUuid.randomUuid(); /** */ private final T obj; @@ -2533,34 +2402,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return done.compareAndSet(false, true); } - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { + /** {@inheritDoc} */ + @Override public void onTimeout() { if (done.compareAndSet(false, true)) { // Close socket - timeout occurred. if (obj instanceof GridCommunicationClient) ((GridCommunicationClient)obj).forceClose(); else U.closeQuiet((AbstractInterruptibleChannel)obj); - - return true; } - - return false; } - /** - * @return End time. - */ - long endTime() { + /** {@inheritDoc} */ + @Override public long endTime() { return endTime; } - /** - * @return ID. - */ - long id() { + /** {@inheritDoc} */ + @Override public IgniteUuid id() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index e672d64..5b66019 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -116,7 +116,6 @@ class ClientImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); b.append(" Socket reader: ").append(threadStatus(sockReader)).append(U.nl()); b.append(" Socket writer: ").append(threadStatus(sockWriter)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl()); b.append(U.nl()); @@ -524,11 +523,9 @@ class ClientImpl extends TcpDiscoveryImpl { U.interrupt(sockWriter); U.interrupt(msgWorker); - U.interrupt(spi.sockTimeoutWorker); U.join(sockWriter, log); U.join(msgWorker, log); - U.join(spi.sockTimeoutWorker, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 57c13d6..485f57d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1412,7 +1412,6 @@ class ServerImpl extends TcpDiscoveryImpl { b.append(" Message worker: ").append(threadStatus(msgWorker)).append(U.nl()); b.append(" Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl()); b.append(" HB sender: ").append(threadStatus(hbsSnd)).append(U.nl()); - b.append(" Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl()); b.append(" IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl()); b.append(" Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 56fb63f..8365716 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -255,9 +255,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Internal and external addresses of local node. */ protected Collection locNodeAddrs; - /** Socket timeout worker. */ - protected SocketTimeoutWorker sockTimeoutWorker; - /** Start time of the very first grid node. */ protected volatile long gridStartTime; @@ -1118,7 +1115,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); IOException err = null; @@ -1136,7 +1133,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Throw original exception. if (err != null) @@ -1180,7 +1177,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); IOException err = null; @@ -1198,7 +1195,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Throw original exception. if (err != null) @@ -1222,7 +1219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); - sockTimeoutWorker.addTimeoutObject(obj); + addTimeoutObject(obj); OutputStream out = sock.getOutputStream(); @@ -1240,7 +1237,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T boolean cancelled = obj.cancel(); if (cancelled) - sockTimeoutWorker.removeTimeoutObject(obj); + removeTimeoutObject(obj); // Throw original exception. if (err != null) @@ -1599,9 +1596,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T mcastIpFinder.setLocalAddress(locAddr); } - sockTimeoutWorker = new SocketTimeoutWorker(); - sockTimeoutWorker.start(); - impl.spiStart(gridName); } @@ -1611,9 +1605,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T // Safety. ctxInitLatch.countDown(); - U.interrupt(sockTimeoutWorker); - U.join(sockTimeoutWorker, log); - if (ipFinder != null) { try { ipFinder.close(); @@ -1754,117 +1745,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** - * Handles sockets timeouts. - */ - protected class SocketTimeoutWorker extends IgniteSpiThread { - /** Time-based sorted set for timeout objects. */ - private final GridConcurrentSkipListSet timeoutObjs = - new GridConcurrentSkipListSet<>(new Comparator() { - @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) { - int res = Long.compare(o1.endTime(), o2.endTime()); - - if (res != 0) - return res; - - return Long.compare(o1.id(), o2.id()); - } - }); - - /** Mutex. */ - private final Object mux0 = new Object(); - - /** - * - */ - SocketTimeoutWorker() { - super(gridName, "tcp-disco-sock-timeout-worker", log); - - setPriority(threadPri); - } - - /** - * @param timeoutObj Timeout object to add. - */ - @SuppressWarnings({"NakedNotify"}) - public void addTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE; - - timeoutObjs.add(timeoutObj); - - if (timeoutObjs.firstx() == timeoutObj) { - synchronized (mux0) { - mux0.notifyAll(); - } - } - } - - /** - * @param timeoutObj Timeout object to remove. - */ - public void removeTimeoutObject(SocketTimeoutObject timeoutObj) { - assert timeoutObj != null; - - timeoutObjs.remove(timeoutObj); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - if (log.isDebugEnabled()) - log.debug("Socket timeout worker has been started."); - - while (!isInterrupted()) { - long now = U.currentTimeMillis(); - - for (Iterator iter = timeoutObjs.iterator(); iter.hasNext(); ) { - SocketTimeoutObject timeoutObj = iter.next(); - - if (timeoutObj.endTime() <= now) { - iter.remove(); - - if (timeoutObj.onTimeout()) { - LT.warn(log, null, "Socket write has timed out (consider increasing " + - "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - - stats.onSocketTimeout(); - } - } - else - break; - } - - synchronized (mux0) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTimeoutObject(..)' method. - SocketTimeoutObject first = timeoutObjs.firstx(); - - if (first != null) { - long waitTime = first.endTime() - U.currentTimeMillis(); - - if (waitTime > 0) - mux0.wait(waitTime); - else - break; - } - else - mux0.wait(5000); - } - } - } - } - } - - /** * Socket timeout object. */ - private static class SocketTimeoutObject { + private class SocketTimeoutObject implements IgniteSpiTimeoutObject { /** */ - private static final AtomicLong idGen = new AtomicLong(); - - /** */ - private final long id = idGen.incrementAndGet(); + private final IgniteUuid id = IgniteUuid.randomUuid(); /** */ private final Socket sock; @@ -1894,31 +1779,26 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T return done.compareAndSet(false, true); } - /** - * @return {@code True} if object has not yet been canceled. - */ - boolean onTimeout() { + /** {@inheritDoc} */ + @Override public void onTimeout() { if (done.compareAndSet(false, true)) { // Close socket - timeout occurred. U.closeQuiet(sock); - return true; - } + LT.warn(log, null, "Socket write has timed out (consider increasing " + + "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'); - return false; + stats.onSocketTimeout(); + } } - /** - * @return End time. - */ - long endTime() { + /** {@inheritDoc} */ + @Override public long endTime() { return endTime; } - /** - * @return ID. - */ - long id() { + /** {@inheritDoc} */ + @Override public IgniteUuid id() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java index 0f2a898..80e6123 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java @@ -28,6 +28,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; @@ -258,6 +259,107 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics checkRemovedLatch(latch); } + /** + * @throws Exception If failed. + */ + public void testLatchMultinode1() throws Exception { + if (gridCount() == 1) + return; + + IgniteCountDownLatch latch = grid(0).countDownLatch("l1", 10, + true, + true); + + List> futs = new ArrayList<>(); + + final AtomicBoolean countedDown = new AtomicBoolean(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + IgniteCountDownLatch latch = ignite.countDownLatch("l1", 10, + true, + false); + + assertNotNull(latch); + + boolean wait = latch.await(30_000); + + assertTrue(countedDown.get()); + + assertEquals(0, latch.count()); + + assertTrue(wait); + + return null; + } + })); + } + + for (int i = 0; i < 10; i++) { + if (i == 9) + countedDown.set(true); + + latch.countDown(); + } + + for (IgniteInternalFuture fut : futs) + fut.get(30_000); + } + + /** + * @throws Exception If failed. + */ + public void testLatchMultinode2() throws Exception { + if (gridCount() == 1) + return; + + IgniteCountDownLatch latch = grid(0).countDownLatch("l2", gridCount() * 3, + true, + true); + + assertNotNull(latch); + + List> futs = new ArrayList<>(); + + final AtomicInteger cnt = new AtomicInteger(); + + for (int i = 0; i < gridCount(); i++) { + final Ignite ignite = grid(i); + + futs.add(GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + IgniteCountDownLatch latch = ignite.countDownLatch("l2", 10, + true, + false); + + assertNotNull(latch); + + for (int i = 0; i < 3; i++) { + cnt.incrementAndGet(); + + latch.countDown(); + } + + boolean wait = latch.await(30_000); + + assertEquals(gridCount() * 3, cnt.get()); + + assertEquals(0, latch.count()); + + assertTrue(wait); + + return null; + } + })); + } + + for (IgniteInternalFuture fut : futs) + fut.get(30_000); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java new file mode 100644 index 0000000..602ac18 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.expiry; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.expiry.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteCacheClientNearCacheExpiryTest extends IgniteCacheAbstractTest { + /** */ + private static final int NODES = 3; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return NODES; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return new NearCacheConfiguration(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.equals(getTestGridName(NODES - 1))) + cfg.setClientMode(true); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testExpirationOnClient() throws Exception { + Ignite ignite = grid(NODES - 1); + + assertTrue(ignite.configuration().isClientMode()); + + IgniteCache cache = ignite.cache(null); + + assertTrue(((IgniteCacheProxy)cache).context().isNear()); + + for (int i = 0 ; i < 100; i++) + cache.put(i, i); + + CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 500)); + + IgniteCache cacheWithExpiry = cache.withExpiryPolicy(plc); + + for (int i = 100 ; i < 200; i++) { + cacheWithExpiry.put(i, i); + + assertEquals(i, cacheWithExpiry.localPeek(i)); + } + + U.sleep(1000); + + for (int i = 0 ; i < 100; i++) + assertEquals(i, cacheWithExpiry.localPeek(i)); + + for (int i = 100 ; i < 200; i++) + assertNull(cache.localPeek(i)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java index c006f69..c78ec5c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java @@ -50,6 +50,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheTtlCleanupSelfTest.class); + suite.addTestSuite(IgniteCacheClientNearCacheExpiryTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 5867fb8..21f9424 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -501,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext { return false; } + /** {@inheritDoc} */ + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + /** * @param cacheName Cache name. * @return Map representing cache.