Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E0DC518519 for ; Thu, 22 Oct 2015 09:12:28 +0000 (UTC) Received: (qmail 45385 invoked by uid 500); 22 Oct 2015 09:12:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 45308 invoked by uid 500); 22 Oct 2015 09:12:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 43369 invoked by uid 99); 22 Oct 2015 09:12:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Oct 2015 09:12:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E613DE13D7; Thu, 22 Oct 2015 09:12:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 22 Oct 2015 09:12:52 -0000 Message-Id: <82f95710a0864eada412a6741c79608c@git.apache.org> In-Reply-To: <44ce4830038b44da8e2e97768621fcf9@git.apache.org> References: <44ce4830038b44da8e2e97768621fcf9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/51] [abbrv] ignite git commit: IGNITE-1622 - Fixed cache.clear() with near cache IGNITE-1622 - Fixed cache.clear() with near cache Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a228c246 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a228c246 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a228c246 Branch: refs/heads/ignite-1282 Commit: a228c246ae58894d0939887218252c2bde882fce Parents: c9eb539 Author: Valentin Kulichenko Authored: Thu Oct 15 16:02:38 2015 -0700 Committer: Valentin Kulichenko Committed: Thu Oct 15 16:02:38 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 357 +++++++++++-------- .../cache/GridCacheClearAllRunnable.java | 18 +- .../cache/GridCacheConcurrentMap.java | 4 +- .../processors/cache/GridCacheProxyImpl.java | 14 +- .../processors/cache/IgniteCacheProxy.java | 2 +- .../processors/cache/IgniteInternalCache.java | 19 +- .../distributed/dht/GridDhtCacheAdapter.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 21 +- .../near/GridNearCacheClearAllRunnable.java | 9 +- .../cache/GridCacheClearSelfTest.java | 308 ++++++++++++++++ .../dht/GridCacheDhtEntrySelfTest.java | 2 +- .../IgniteCacheFullApiSelfTestSuite.java | 8 +- 12 files changed, 587 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index ae987b7..417b396 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -129,6 +129,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; @@ -149,7 +150,6 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; -import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; @@ -1057,44 +1057,52 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> splitClearLocally() { - assert CLEAR_ALL_SPLIT_THRESHOLD > 0; + public List> splitClearLocally(boolean srv, boolean near, boolean readers) { + if ((isNear() && near) || (!isNear() && srv)) { + int keySize = size(); - int keySize = size(); + int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0), + Runtime.getRuntime().availableProcessors()); - int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0), - Runtime.getRuntime().availableProcessors()); + if (cnt == 0) + cnt = 1; // Still perform cleanup since there could be entries in swap. - if (cnt == 0) - cnt = 1; // Still perform cleanup since there could be entries in swap. + GridCacheVersion obsoleteVer = ctx.versions().next(); - GridCacheVersion obsoleteVer = ctx.versions().next(); - - List> res = new ArrayList<>(cnt); + List> res = new ArrayList<>(cnt); - for (int i = 0; i < cnt; i++) - res.add(new GridCacheClearAllRunnable<>(this, obsoleteVer, i, cnt)); + for (int i = 0; i < cnt; i++) + res.add(new GridCacheClearAllRunnable<>(this, obsoleteVer, i, cnt, readers)); - return res; + return res; + } + else + return null; } /** {@inheritDoc} */ @Override public boolean clearLocally(K key) { - return clearLocally0(key); + return clearLocally0(key, false); } /** {@inheritDoc} */ - @Override public void clearLocallyAll(Set keys) { - clearLocally0(keys); + @Override public void clearLocallyAll(Set keys, boolean srv, boolean near, boolean readers) { + if (keys != null && ((isNear() && near) || (!isNear() && srv))) { + for (K key : keys) + clearLocally0(key, readers); + } } /** {@inheritDoc} */ - @Override public void clearLocally() { + @Override public void clearLocally(boolean srv, boolean near, boolean readers) { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - List> jobs = splitClearLocally(); + List> jobs = splitClearLocally(srv, near, readers); if (!F.isEmpty(jobs)) { ExecutorService execSvc = null; @@ -1128,135 +1136,102 @@ public abstract class GridCacheAdapter implements IgniteInternalCache keys, boolean readers) { - if (F.isEmpty(keys)) - return; - - GridCacheVersion obsoleteVer = ctx.versions().next(); - - for (KeyCacheObject key : keys) { - GridCacheEntryEx e = peekEx(key); - - try { - if (e != null) - e.clear(obsoleteVer, readers, null); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to clearLocally entry (will continue to clearLocally other entries): " + e, - ex); - } - } - } - - /** - * Clears entry from cache. - * - * @param obsoleteVer Obsolete version to set. - * @param key Key to clearLocally. - * @param filter Optional filter. - * @return {@code True} if cleared. - */ - private boolean clearLocally(GridCacheVersion obsoleteVer, K key, @Nullable CacheEntryPredicate[] filter) { - try { - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - - GridCacheEntryEx entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); - - if (entry != null) - return entry.clear(obsoleteVer, false, filter); - } - catch (GridDhtInvalidPartitionException ignored) { - return false; - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to clearLocally entry for key: " + key, ex); - } - - return false; - } - /** {@inheritDoc} */ @Override public void clear() throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocally(); - - clearRemotes(0, null); + clear((Set)null); } /** {@inheritDoc} */ @Override public void clear(K key) throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocally(key); - - clearRemotes(0, Collections.singleton(key)); + clear(Collections.singleton(key)); } /** {@inheritDoc} */ @Override public void clearAll(Set keys) throws IgniteCheckedException { - // Clear local cache synchronously. - clearLocallyAll(keys); + clear(keys); + } - clearRemotes(0, keys); + /** {@inheritDoc} */ + @Override public IgniteInternalFuture clearAsync() { + return clearAsync((Set)null); } /** {@inheritDoc} */ @Override public IgniteInternalFuture clearAsync(K key) { - return clearKeysAsync(Collections.singleton(key)); + return clearAsync(Collections.singleton(key)); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture clearAsync(Set keys) { - return clearKeysAsync(keys); + @Override public IgniteInternalFuture clearAllAsync(Set keys) { + return clearAsync(keys); } /** - * @param timeout Timeout for clearLocally all task in milliseconds (0 for never). - * Set it to larger value for large caches. - * @param keys Keys to clear or {@code null} if all cache should be cleared. - * @throws IgniteCheckedException In case of cache could not be cleared on any of the nodes. + * @param keys Keys to clear. + * @throws IgniteCheckedException In case of error. */ - private void clearRemotes(long timeout, @Nullable final Set keys) throws IgniteCheckedException { - // Send job to remote nodes only. - Collection nodes = - ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes(); - - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - - ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - - ctx.kernalContext().task().execute( - new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get(); - } + private void clear(@Nullable Set keys) throws IgniteCheckedException { + executeClearTask(keys, false).get(); + executeClearTask(keys, true).get(); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture clearAsync() { - return clearKeysAsync(null); + /** + * @param keys Keys to clear or {@code null} if all cache should be cleared. + * @return Future. + */ + private IgniteInternalFuture clearAsync(@Nullable final Set keys) { + return executeClearTask(keys, false).chain(new CX1, Object>() { + @Override public Object applyx(IgniteInternalFuture fut) throws IgniteCheckedException { + executeClearTask(keys, true).get(); + + return null; + } + }); } /** - * @param keys Keys to clear or {@code null} if all cache should be cleared. + * @param keys Keys to clear. + * @param near Near cache flag. * @return Future. */ - private IgniteInternalFuture clearKeysAsync(final Set keys) { - Collection nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); + private IgniteInternalFuture executeClearTask(@Nullable Set keys, boolean near) { + Collection srvNodes = ctx.grid().cluster().forCacheNodes(name(), !near, near, false).nodes(); - if (!nodes.isEmpty()) { - ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); + if (!srvNodes.isEmpty()) { + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, srvNodes); return ctx.kernalContext().task().execute( - new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null); + new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys, near), null); } else return new GridFinishedFuture<>(); } /** + * @param keys Keys. + * @param readers Readers flag. + */ + public void clearLocally(Collection keys, boolean readers) { + if (F.isEmpty(keys)) + return; + + GridCacheVersion obsoleteVer = ctx.versions().next(); + + for (KeyCacheObject key : keys) { + GridCacheEntryEx e = peekEx(key); + + try { + if (e != null) + e.clear(obsoleteVer, readers, null); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to clearLocally entry (will continue to clearLocally other entries): " + e, + ex); + } + } + } + + /** * @param entry Removes entry from cache if currently mapped value is the same as passed. */ public void removeEntry(GridCacheEntryEx entry) { @@ -4427,39 +4402,33 @@ public abstract class GridCacheAdapter implements IgniteInternalCache keys, - @Nullable CacheEntryPredicate... filter) { + private boolean clearLocally0(K key, boolean readers) { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - if (F.isEmpty(keys)) - return; - if (keyCheck) - validateCacheKeys(keys); + validateCacheKey(key); GridCacheVersion obsoleteVer = ctx.versions().next(); - for (K k : keys) - clearLocally(obsoleteVer, k, filter); - } - - /** - * @param key Key. - * @param filter Filters to evaluate. - * @return {@code True} if cleared. - */ - public boolean clearLocally0(K key, @Nullable CacheEntryPredicate... filter) { - A.notNull(key, "key"); + try { + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - if (keyCheck) - validateCacheKey(key); + GridCacheEntryEx entry = ctx.isSwapOrOffheapEnabled() ? entryEx(cacheKey) : peekEx(cacheKey); - ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); + if (entry != null) + return entry.clear(obsoleteVer, readers, null); + } + catch (GridDhtInvalidPartitionException ignored) { + // No-op. + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to clearLocally entry for key: " + key, ex); + } - return clearLocally(ctx.versions().next(), key, filter); + return false; } /** {@inheritDoc} */ @@ -5178,10 +5147,24 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache extends GlobalClearKeySetJob { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param keys Keys to clear. + */ + private GlobalClearKeySetNearJob(String cacheName, AffinityTopologyVersion topVer, Set keys) { + super(cacheName, topVer, keys); + } + + /** + * @return Whether to clear server cache. + */ + protected boolean clearServerCache() { + return false; + } + + /** + * @return Whether to clear near cache. + */ + protected boolean clearNearCache() { + return true; + } } /** @@ -5972,6 +6032,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache keys; + /** Near cache flag. */ + private final boolean near; + /** * @param cacheName Cache name. * @param topVer Affinity topology version. * @param keys Keys to clear. + * @param near Near cache flag. */ - public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set keys) { + public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set keys, boolean near) { this.cacheName = cacheName; this.topVer = topVer; this.keys = keys; + this.near = near; } /** {@inheritDoc} */ @Nullable @Override public Map map(List subgrid, @Nullable Object arg) throws IgniteException { - Map jobs = new HashMap(); + Map jobs = new HashMap<>(); for (ClusterNode node : subgrid) { - jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) : - new GlobalClearKeySetJob(cacheName, topVer, keys), - node); + ComputeJob job; + + if (near && node.version().compareTo(NEAR_JOB_SINCE) >= 0) { + job = keys == null ? new GlobalClearAllNearJob(cacheName, topVer) : + new GlobalClearKeySetNearJob<>(cacheName, topVer, keys); + } + else { + job = keys == null ? new GlobalClearAllJob(cacheName, topVer) : + new GlobalClearKeySetJob<>(cacheName, topVer, keys); + } + + jobs.put(job, node); } return jobs; http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index feafc58..77c5a55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; /** - * Base runnable for {@link GridCacheAdapter#clearLocally()} routine. + * Base runnable for {@link IgniteInternalCache#clearLocally(boolean, boolean, boolean)} routine. */ public class GridCacheClearAllRunnable implements Runnable { /** Cache to be cleared. */ @@ -43,6 +43,9 @@ public class GridCacheClearAllRunnable implements Runnable { /** Mods count across all spawned clearLocally runnables. */ protected final int totalCnt; + /** Whether to clear readers. */ + protected final boolean readers; + /** Cache context. */ protected final GridCacheContext ctx; @@ -57,7 +60,8 @@ public class GridCacheClearAllRunnable implements Runnable { * @param id Mod for the given runnable. * @param totalCnt Mods count across all spawned clearLocally runnables. */ - public GridCacheClearAllRunnable(GridCacheAdapter cache, GridCacheVersion obsoleteVer, int id, int totalCnt) { + public GridCacheClearAllRunnable(GridCacheAdapter cache, GridCacheVersion obsoleteVer, + int id, int totalCnt, boolean readers) { assert cache != null; assert obsoleteVer != null; assert id >= 0; @@ -68,6 +72,7 @@ public class GridCacheClearAllRunnable implements Runnable { this.obsoleteVer = obsoleteVer; this.id = id; this.totalCnt = totalCnt; + this.readers = readers; ctx = cache.context(); log = ctx.logger(getClass()); @@ -138,7 +143,7 @@ public class GridCacheClearAllRunnable implements Runnable { */ protected void clearEntry(GridCacheEntryEx e) { try { - e.clear(obsoleteVer, false, CU.empty0()); + e.clear(obsoleteVer, readers, CU.empty0()); } catch (IgniteCheckedException ex) { U.error(log, "Failed to clearLocally entry from cache (will continue to clearLocally other entries): " + e, ex); @@ -172,6 +177,13 @@ public class GridCacheClearAllRunnable implements Runnable { return totalCnt; } + /** + * @return Whether to clean readers. + */ + public boolean readers() { + return readers; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheClearAllRunnable.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index a1fc585..1be7c07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -1968,7 +1968,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(new KeySet(map, filter, false)); + ctx.cache().clearLocallyAll(new KeySet(map, filter, false), true, true, false); } /** {@inheritDoc} */ @@ -2413,4 +2413,4 @@ public class GridCacheConcurrentMap { set = (Set0)in.readObject(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 4d26bd8..cd779f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -937,11 +937,11 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ - @Override public void clearLocally() { + @Override public void clearLocally(boolean srv, boolean near, boolean readers) { CacheOperationContext prev = gate.enter(opCtx); try { - delegate.clearLocally(); + delegate.clearLocally(srv, near, readers); } finally { gate.leave(prev); @@ -985,11 +985,11 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ - @Override public IgniteInternalFuture clearAsync(Set keys) { + @Override public IgniteInternalFuture clearAllAsync(Set keys) { CacheOperationContext prev = gate.enter(opCtx); try { - return delegate.clearAsync(keys); + return delegate.clearAllAsync(keys); } finally { gate.leave(prev); @@ -1009,11 +1009,11 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ - @Override public void clearLocallyAll(Set keys) { + @Override public void clearLocallyAll(Set keys, boolean srv, boolean near, boolean readers) { CacheOperationContext prev = gate.enter(opCtx); try { - delegate.clearLocallyAll(keys); + delegate.clearLocallyAll(keys, srv, near, readers); } finally { gate.leave(prev); @@ -1536,4 +1536,4 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte @Override public String toString() { return S.toString(GridCacheProxyImpl.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index ae96f23..c563e59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1311,7 +1311,7 @@ public class IgniteCacheProxy extends AsyncSupportAdapter extends Iterable> { * Note that this operation is local as it merely clears * entries from local cache. It does not remove entries from * remote caches or from underlying persistent storage. + * + * @param srv Whether to clear server cache. + * @param near Whether to clear near cache. + * @param readers Whether to clear readers. */ - public void clearLocally(); + public void clearLocally(boolean srv, boolean near, boolean readers); /** * Clears an entry from this cache and swap storage only if the entry @@ -958,8 +962,11 @@ public interface IgniteInternalCache extends Iterable> { * remote caches or from underlying persistent storage. * * @param keys Keys to clearLocally. + * @param srv Whether to clear server cache. + * @param near Whether to clear near cache. + * @param readers Whether to clear readers. */ - public void clearLocallyAll(Set keys); + public void clearLocallyAll(Set keys, boolean srv, boolean near, boolean readers); /** * Clears key on all nodes that store it's data. That is, caches are cleared on remote @@ -976,7 +983,7 @@ public interface IgniteInternalCache extends Iterable> { /** * Clears keys on all nodes that store it's data. That is, caches are cleared on remote - * nodes and local node, as opposed to {@link IgniteInternalCache#clearLocallyAll(Set)} method which only + * nodes and local node, as opposed to {@link IgniteInternalCache#clearLocallyAll(Set, boolean, boolean, boolean)} method which only * clears local node's cache. *

* Ignite will make the best attempt to clear caches on all nodes. If some caches @@ -989,7 +996,7 @@ public interface IgniteInternalCache extends Iterable> { /** * Clears cache on all nodes that store it's data. That is, caches are cleared on remote - * nodes and local node, as opposed to {@link IgniteInternalCache#clearLocally()} method which only + * nodes and local node, as opposed to {@link IgniteInternalCache#clearLocally(boolean, boolean, boolean)} method which only * clears local node's cache. *

* Ignite will make the best attempt to clear caches on all nodes. If some caches @@ -1015,7 +1022,7 @@ public interface IgniteInternalCache extends Iterable> { * @param keys Keys to clear. * @return Clear future. */ - public IgniteInternalFuture clearAsync(Set keys); + public IgniteInternalFuture clearAllAsync(Set keys); /** * Removes given key mapping from cache. If cache previously contained value for the given key, @@ -1802,4 +1809,4 @@ public interface IgniteInternalCache extends Iterable> { * @return Future to be completed whenever loading completes. */ public IgniteInternalFuture localLoadCacheAsync(@Nullable IgniteBiPredicate p, @Nullable Object... args); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 3ce9ee9..333bce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -972,8 +972,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public List> splitClearLocally() { - return ctx.affinityNode() ? super.splitClearLocally() : + @Override public List> splitClearLocally(boolean srv, boolean near, boolean readers) { + return ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) : Collections.>emptyList(); } @@ -1184,4 +1184,4 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap return topVer; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 01c3e2b..fe519a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -99,7 +99,7 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public GridCacheMapEntry create( GridCacheContext ctx, - AffinityTopologyVersion topVer, + AffinityTopologyVersion topVer, KeyCacheObject key, int hash, CacheObject val, @@ -450,16 +450,15 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public boolean clearLocally0(K key, @Nullable CacheEntryPredicate[] filter) { - return super.clearLocally0(key, filter) | dht().clearLocally0(key, filter); + @Override public boolean clearLocally(K key) { + return super.clearLocally(key) | dht().clearLocally(key); } /** {@inheritDoc} */ - @Override public void clearLocally0(Collection keys, - @Nullable CacheEntryPredicate[] filter) { - super.clearLocally0(keys, filter); + @Override public void clearLocallyAll(Set keys, boolean srv, boolean near, boolean readers) { + super.clearLocallyAll(keys, srv, near, readers); - dht().clearLocally0(keys, filter); + dht().clearLocallyAll(keys, srv, near, readers); } /** {@inheritDoc} */ @@ -532,13 +531,13 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public List> splitClearLocally() { + @Override public List> splitClearLocally(boolean srv, boolean near, boolean readers) { assert configuration().getNearConfiguration() != null; if (ctx.affinityNode()) { GridCacheVersion obsoleteVer = ctx.versions().next(); - List> dhtJobs = dht().splitClearLocally(); + List> dhtJobs = dht().splitClearLocally(srv, near, readers); List> res = new ArrayList<>(dhtJobs.size()); @@ -548,7 +547,7 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda return res; } else - return super.splitClearLocally(); + return super.splitClearLocally(srv, near, readers); } /** @@ -662,4 +661,4 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda @Override public String toString() { return S.toString(GridNearCacheAdapter.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheClearAllRunnable.java index 675ea8d..eea0b6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheClearAllRunnable.java @@ -19,11 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; /** - * Runnable for {@link GridCacheAdapter#clearLocally()} routine for near cache. + * Runnable for {@link IgniteInternalCache#clearLocally(boolean, boolean, boolean)} routine for near cache. */ public class GridNearCacheClearAllRunnable extends GridCacheClearAllRunnable { /** Runnable for DHT cache. */ @@ -38,9 +39,7 @@ public class GridNearCacheClearAllRunnable extends GridCacheClearAllRunnab */ public GridNearCacheClearAllRunnable(GridCacheAdapter cache, GridCacheVersion obsoleteVer, GridCacheClearAllRunnable dhtJob) { - super(cache, obsoleteVer, dhtJob.id(), dhtJob.totalCount()); - - assert dhtJob != null; + super(cache, obsoleteVer, dhtJob.id(), dhtJob.totalCount(), dhtJob.readers()); this.dhtJob = dhtJob; } @@ -61,4 +60,4 @@ public class GridNearCacheClearAllRunnable extends GridCacheClearAllRunnab @Override public String toString() { return S.toString(GridNearCacheClearAllRunnable.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java new file mode 100644 index 0000000..5e14f14 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearSelfTest.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collections; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; + +/** + * Tests for cache clear. + */ +public class GridCacheClearSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + startGrid("client1"); + startGrid("client2"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testClearPartitioned() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearPartitionedOffHeap() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearPartitionedNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearPartitionedOffHeapNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearReplicated() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearReplicatedOffHeap() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearReplicatedNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearReplicatedOffHeapNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyPartitioned() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, false, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyPartitionedOffHeap() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, false, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyPartitionedNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, true, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyPartitionedOffHeapNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, true, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyReplicated() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, false, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyReplicatedOffHeap() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, false, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyReplicatedNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, true, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeyReplicatedOffHeapNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, true, Collections.singleton(3)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysPartitioned() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, false, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysPartitionedOffHeap() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, false, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysPartitionedNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.ONHEAP_TIERED, true, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysPartitionedOffHeapNear() throws Exception { + testClear(CacheMode.PARTITIONED, CacheMemoryMode.OFFHEAP_TIERED, true, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysReplicated() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, false, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysReplicatedOffHeap() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, false, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysReplicatedNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.ONHEAP_TIERED, true, F.asSet(2, 6, 9)); + } + + /** + * @throws Exception If failed. + */ + public void testClearKeysReplicatedOffHeapNear() throws Exception { + testClear(CacheMode.REPLICATED, CacheMemoryMode.OFFHEAP_TIERED, true, F.asSet(2, 6, 9)); + } + + /** + * @param cacheMode Cache mode. + * @param memMode Memory mode. + * @param near Near cache flag. + * @param keys Keys to clear. + */ + private void testClear(CacheMode cacheMode, CacheMemoryMode memMode, boolean near, @Nullable Set keys) { + Ignite client1 = client1(); + Ignite client2 = client2(); + + try { + CacheConfiguration cfg = new CacheConfiguration<>("cache"); + + cfg.setCacheMode(cacheMode); + cfg.setMemoryMode(memMode); + + IgniteCache cache1 = near ? + client1.createCache(cfg, new NearCacheConfiguration()) : + client1.createCache(cfg); + + IgniteCache cache2 = near ? + client2.createNearCache("cache", new NearCacheConfiguration()) : + client2.cache("cache"); + + for (int i = 0; i < 10; i++) + cache1.put(i, i); + + for (int i = 0; i < 10; i++) + cache2.get(i); + + assertEquals(10, cache1.size(CachePeekMode.PRIMARY)); + assertEquals(10, cache2.size(CachePeekMode.PRIMARY)); + assertEquals(near ? 10 : 0, cache1.localSize(CachePeekMode.NEAR)); + assertEquals(near ? 10 : 0, cache2.localSize(CachePeekMode.NEAR)); + + if (F.isEmpty(keys)) + cache1.clear(); + else if (keys.size() == 1) + cache1.clear(F.first(keys)); + else + cache1.clearAll(keys); + + int expSize = F.isEmpty(keys) ? 0 : 10 - keys.size(); + + assertEquals(expSize, cache1.size(CachePeekMode.PRIMARY)); + assertEquals(expSize, cache2.size(CachePeekMode.PRIMARY)); + assertEquals(near ? expSize : 0, cache1.localSize(CachePeekMode.NEAR)); + assertEquals(near ? expSize : 0, cache2.localSize(CachePeekMode.NEAR)); + } + finally { + client1.destroyCache("cache"); + } + } + + /** + * @return Client 1. + */ + private Ignite client1() { + return grid("client1"); + } + + /** + * @return Client 2. + */ + private Ignite client2() { + return grid("client2"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java index 26548b9..62fee5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtEntrySelfTest.java @@ -314,4 +314,4 @@ public class GridCacheDhtEntrySelfTest extends GridCommonAbstractTest { return F.t(primary, other); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a228c246/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java index ff53250..c2f27fe 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFullApiSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.GridCacheClearSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearEnabledFullApiSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearEnabledPrimaryWriteOrderFullApiSelfTest; @@ -204,11 +205,14 @@ public class IgniteCacheFullApiSelfTestSuite extends TestSuite { suite.addTestSuite(GridCachePartitionedNearDisabledOffHeapTieredMultiNodeFullApiSelfTest.class); suite.addTestSuite(GridCachePartitionedNearDisabledAtomicOffHeapTieredMultiNodeFullApiSelfTest.class); - // Multithreaded + // Multithreaded. suite.addTestSuite(GridCacheLocalFullApiMultithreadedSelfTest.class); suite.addTestSuite(GridCacheReplicatedFullApiMultithreadedSelfTest.class); suite.addTestSuite(GridCachePartitionedFullApiMultithreadedSelfTest.class); + // Other. + suite.addTestSuite(GridCacheClearSelfTest.class); + return suite; } -} \ No newline at end of file +}