Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EB0DD17E0F for ; Wed, 10 Jun 2015 16:27:39 +0000 (UTC) Received: (qmail 24082 invoked by uid 500); 10 Jun 2015 16:27:39 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 24049 invoked by uid 500); 10 Jun 2015 16:27:39 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 24040 invoked by uid 99); 10 Jun 2015 16:27:39 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:27:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2E8A21A49DB for ; Wed, 10 Jun 2015 16:27:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 7wnen31zOLVU for ; Wed, 10 Jun 2015 16:27:26 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 0DF2743ACE for ; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Received: (qmail 23602 invoked by uid 99); 10 Jun 2015 16:27:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:27:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BF46E0332; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 10 Jun 2015 16:27:50 -0000 Message-Id: <0a3bac9fd3e040f3baa0a47965b59194@git.apache.org> In-Reply-To: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> References: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index eccd9f9..47f222e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -32,6 +32,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; + /** * Affinity cached function. */ @@ -221,6 +223,35 @@ public class GridAffinityAssignmentCache { } /** + * Copies previous affinity assignment when discovery event does not cause affinity assignment changes + * (e.g. client node joins on leaves). + * + * @param evt Event. + * @param topVer Topology version. + */ + public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { + GridAffinityAssignment aff = head.get(); + + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt; + assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT || aff.backupPartitions(evt.eventNode().id()).isEmpty() : evt; + + GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff); + + affCache.put(topVer, assignmentCpy); + head.set(assignmentCpy); + + for (Map.Entry entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing topology ready future (use previous affinity) " + + "[locNodeId=" + ctx.localNodeId() + ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']'); + + entry.getValue().onDone(topVer); + } + } + } + + /** * @return Last calculated affinity version. */ public AffinityTopologyVersion lastVersion() { @@ -375,7 +406,10 @@ public class GridAffinityAssignmentCache { if (cache == null) { throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " + - "calculated [locNodeId=" + ctx.localNodeId() + ", topVer=" + topVer + + "calculated [locNodeId=" + ctx.localNodeId() + + ", cache=" + cacheName + + ", history=" + affCache.keySet() + + ", topVer=" + topVer + ", head=" + head.get().topologyVersion() + ']'); } } @@ -422,6 +456,7 @@ public class GridAffinityAssignmentCache { /** * + * @param reqTopVer Required topology version. */ private AffinityReadyFuture(AffinityTopologyVersion reqTopVer) { this.reqTopVer = reqTopVer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index daa2bc2..aac63c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * * @param cacheName Cache name. * @param key Key to map. + * @param topVer Topology version. * @return Affinity nodes, primary first. * @throws IgniteCheckedException If failed. */ - public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException { + public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, + K key, + AffinityTopologyVersion topVer) + throws IgniteCheckedException + { A.notNull(key, "key"); - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - AffinityInfo affInfo = affinityCache(cacheName, topVer); if (affInfo == null) @@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Map single key to primary and backup nodes. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Affinity nodes, primary first. + * @throws IgniteCheckedException If failed. + */ + public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) + throws IgniteCheckedException + { + return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx()); + } + + /** * Gets affinity key for cache key. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java index 5d6062e..7a3fbee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEvictableEntryImpl.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; import java.util.*; @@ -91,6 +92,36 @@ public class CacheEvictableEntryImpl implements EvictableEntry { } /** {@inheritDoc} */ + public int size() { + try { + GridCacheContext cctx = cached.context(); + + KeyCacheObject key = cached.key(); + + byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext()); + + byte[] valBytes = null; + + if (cctx.useOffheapEntry()) + valBytes = cctx.offheap().get(cctx.swap().spaceName(), cached.partition(), key, keyBytes); + else { + CacheObject cacheObj = cached.valueBytes(); + + if (cacheObj != null) + valBytes = cacheObj.valueBytes(cctx.cacheObjectContext()); + } + + return valBytes == null ? keyBytes.length : keyBytes.length + valBytes.length; + } + catch (GridCacheEntryRemovedException e) { + return 0; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public V getValue() { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 560de97..74ba100 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; + /** * Adapter for cache metrics. */ @@ -63,7 +66,7 @@ public class CacheMetricsImpl implements CacheMetrics { private AtomicLong getTimeNanos = new AtomicLong(); /** Remove time taken nanos. */ - private AtomicLong removeTimeNanos = new AtomicLong(); + private AtomicLong rmvTimeNanos = new AtomicLong(); /** Commit transaction time taken nanos. */ private AtomicLong commitTimeNanos = new AtomicLong(); @@ -71,6 +74,39 @@ public class CacheMetricsImpl implements CacheMetrics { /** Commit transaction time taken nanos. */ private AtomicLong rollbackTimeNanos = new AtomicLong(); + /** Number of reads from off-heap memory. */ + private AtomicLong offHeapGets = new AtomicLong(); + + /** Number of writes to off-heap memory. */ + private AtomicLong offHeapPuts = new AtomicLong(); + + /** Number of removed entries from off-heap memory. */ + private AtomicLong offHeapRemoves = new AtomicLong(); + + /** Number of evictions from off-heap memory. */ + private AtomicLong offHeapEvicts = new AtomicLong(); + + /** Number of off-heap hits. */ + private AtomicLong offHeapHits = new AtomicLong(); + + /** Number of off-heap misses. */ + private AtomicLong offHeapMisses = new AtomicLong(); + + /** Number of reads from swap. */ + private AtomicLong swapGets = new AtomicLong(); + + /** Number of writes to swap. */ + private AtomicLong swapPuts = new AtomicLong(); + + /** Number of removed entries from swap. */ + private AtomicLong swapRemoves = new AtomicLong(); + + /** Number of swap hits. */ + private AtomicLong swapHits = new AtomicLong(); + + /** Number of swap misses. */ + private AtomicLong swapMisses = new AtomicLong(); + /** Cache metrics. */ @GridToStringExclude private transient CacheMetricsImpl delegate; @@ -118,7 +154,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public long getOverflowSize() { try { - return cctx.cache().overflowSize(); + GridCacheAdapter cache = cctx.cache(); + + return cache != null ? cache.overflowSize() : -1; } catch (IgniteCheckedException ignored) { return -1; @@ -126,35 +164,192 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ + @Override public long getOffHeapGets() { + return offHeapGets.get(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapPuts() { + return offHeapPuts.get(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapRemovals() { + return offHeapRemoves.get(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapEvictions() { + return offHeapEvicts.get(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapHits() { + return offHeapHits.get(); + } + + /** {@inheritDoc} */ + @Override public float getOffHeapHitPercentage() { + long hits0 = offHeapHits.get(); + long gets0 = offHeapGets.get(); + + if (hits0 == 0) + return 0; + + return (float) hits0 / gets0 * 100.0f; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapMisses() { + return offHeapMisses.get(); + } + + /** {@inheritDoc} */ + @Override public float getOffHeapMissPercentage() { + long misses0 = offHeapMisses.get(); + long reads0 = offHeapGets.get(); + + if (misses0 == 0) + return 0; + + return (float) misses0 / reads0 * 100.0f; + } + + /** {@inheritDoc} */ @Override public long getOffHeapEntriesCount() { - return cctx.cache().offHeapEntriesCount(); + GridCacheAdapter cache = cctx.cache(); + + return cache != null ? cache.offHeapEntriesCount() : -1; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapPrimaryEntriesCount() { + try { + return cctx.swap().offheapEntriesCount(true, false, cctx.affinity().affinityTopologyVersion()); + } + catch (IgniteCheckedException e) { + return 0; + } + } + + /** {@inheritDoc} */ + @Override public long getOffHeapBackupEntriesCount() { + try { + return cctx.swap().offheapEntriesCount(false, true, cctx.affinity().affinityTopologyVersion()); + } + catch (IgniteCheckedException e) { + return 0; + } } /** {@inheritDoc} */ @Override public long getOffHeapAllocatedSize() { - return cctx.cache().offHeapAllocatedSize(); + GridCacheAdapter cache = cctx.cache(); + + return cache != null ? cache.offHeapAllocatedSize() : -1; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapMaxSize() { + return cctx.config().getOffHeapMaxMemory(); + } + + /** {@inheritDoc} */ + @Override public long getSwapGets() { + return swapGets.get(); + } + + /** {@inheritDoc} */ + @Override public long getSwapPuts() { + return swapPuts.get(); + } + + /** {@inheritDoc} */ + @Override public long getSwapRemovals() { + return swapRemoves.get(); + } + + /** {@inheritDoc} */ + @Override public long getSwapHits() { + return swapHits.get(); + } + + /** {@inheritDoc} */ + @Override public long getSwapMisses() { + return swapMisses.get(); + } + + /** {@inheritDoc} */ + @Override public long getSwapEntriesCount() { + try { + return cctx.cache().swapKeys(); + } + catch (IgniteCheckedException e) { + return 0; + } + } + + /** {@inheritDoc} */ + @Override public long getSwapSize() { + try { + return cctx.cache().swapSize(); + } + catch (IgniteCheckedException e) { + return 0; + } + } + + /** {@inheritDoc} */ + @Override public float getSwapHitPercentage() { + long hits0 = swapHits.get(); + long gets0 = swapGets.get(); + + if (hits0 == 0) + return 0; + + return (float) hits0 / gets0 * 100.0f; + } + + /** {@inheritDoc} */ + @Override public float getSwapMissPercentage() { + long misses0 = swapMisses.get(); + long reads0 = swapGets.get(); + + if (misses0 == 0) + return 0; + + return (float) misses0 / reads0 * 100.0f; } /** {@inheritDoc} */ @Override public int getSize() { - return cctx.cache().size(); + GridCacheAdapter cache = cctx.cache(); + + return cache != null ? cache.size() : 0; } /** {@inheritDoc} */ @Override public int getKeySize() { - return cctx.cache().size(); + return getSize(); } /** {@inheritDoc} */ @Override public boolean isEmpty() { - return cctx.cache().isEmpty(); + GridCacheAdapter cache = cctx.cache(); + + return cache == null || cache.isEmpty(); } /** {@inheritDoc} */ @Override public int getDhtEvictQueueCurrentSize() { - return cctx.isNear() ? - dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1 - : cctx.evicts().evictQueueSize(); + GridCacheContext ctx = cctx.isNear() ? dhtCtx : cctx; + + if (ctx == null) + return -1; + + GridCacheEvictionManager evictMgr = ctx.evicts(); + + return evictMgr != null ? evictMgr.evictQueueSize() : -1; } /** {@inheritDoc} */ @@ -317,11 +512,24 @@ public class CacheMetricsImpl implements CacheMetrics { txCommits.set(0); txRollbacks.set(0); putTimeNanos.set(0); - removeTimeNanos.set(0); + rmvTimeNanos.set(0); getTimeNanos.set(0); commitTimeNanos.set(0); rollbackTimeNanos.set(0); + offHeapGets.set(0); + offHeapPuts.set(0); + offHeapRemoves.set(0); + offHeapHits.set(0); + offHeapMisses.set(0); + offHeapEvicts.set(0); + + swapGets.set(0); + swapPuts.set(0); + swapRemoves.set(0); + swapHits.set(0); + swapMisses.set(0); + if (delegate != null) delegate.clear(); } @@ -402,7 +610,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public float getAverageRemoveTime() { - long timeNanos = removeTimeNanos.get(); + long timeNanos = rmvTimeNanos.get(); long removesCnt = rmCnt.get(); if (timeNanos == 0 || removesCnt == 0) @@ -483,7 +691,6 @@ public class CacheMetricsImpl implements CacheMetrics { delegate.onTxRollback(duration); } - /** * Increments the get time accumulator. * @@ -514,7 +721,7 @@ public class CacheMetricsImpl implements CacheMetrics { * @param duration the time taken in nanoseconds. */ public void addRemoveTimeNanos(long duration) { - removeTimeNanos.addAndGet(duration); + rmvTimeNanos.addAndGet(duration); if (delegate != null) delegate.addRemoveTimeNanos(duration); @@ -526,7 +733,7 @@ public class CacheMetricsImpl implements CacheMetrics { * @param duration the time taken in nanoseconds. */ public void addRemoveAndGetTimeNanos(long duration) { - removeTimeNanos.addAndGet(duration); + rmvTimeNanos.addAndGet(duration); getTimeNanos.addAndGet(duration); if (delegate != null) @@ -548,37 +755,153 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public String getKeyType() { - return cctx.config().getKeyType().getName(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null ? ccfg.getKeyType().getName() : null; } /** {@inheritDoc} */ @Override public String getValueType() { - return cctx.config().getValueType().getName(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null ? ccfg.getValueType().getName() : null; } /** {@inheritDoc} */ @Override public boolean isReadThrough() { - return cctx.config().isReadThrough(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null && ccfg.isReadThrough(); } /** {@inheritDoc} */ @Override public boolean isWriteThrough() { - return cctx.config().isWriteThrough(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null && ccfg.isWriteThrough(); } /** {@inheritDoc} */ @Override public boolean isStoreByValue() { - return cctx.config().isStoreByValue(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null && ccfg.isStoreByValue(); } /** {@inheritDoc} */ @Override public boolean isStatisticsEnabled() { - return cctx.config().isStatisticsEnabled(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null && ccfg.isStatisticsEnabled(); } /** {@inheritDoc} */ @Override public boolean isManagementEnabled() { - return cctx.config().isManagementEnabled(); + CacheConfiguration ccfg = cctx.config(); + + return ccfg != null && ccfg.isManagementEnabled(); + } + + /** + * Off-heap read callback. + * + * @param hit Hit or miss flag. + */ + public void onOffHeapRead(boolean hit) { + offHeapGets.incrementAndGet(); + + if (hit) + offHeapHits.incrementAndGet(); + else + offHeapMisses.incrementAndGet(); + + if (delegate != null) + delegate.onOffHeapRead(hit); + } + + /** + * Off-heap write callback. + */ + public void onOffHeapWrite() { + offHeapPuts.incrementAndGet(); + + if (delegate != null) + delegate.onOffHeapWrite(); + } + + /** + * Off-heap remove callback. + */ + public void onOffHeapRemove() { + offHeapRemoves.incrementAndGet(); + + if (delegate != null) + delegate.onOffHeapRemove(); + } + + /** + * Off-heap evict callback. + */ + public void onOffHeapEvict() { + offHeapEvicts.incrementAndGet(); + + if (delegate != null) + delegate.onOffHeapRemove(); + } + + /** + * Swap read callback. + * + * @param hit Hit or miss flag. + */ + public void onSwapRead(boolean hit) { + swapGets.incrementAndGet(); + + if (hit) + swapHits.incrementAndGet(); + else + swapMisses.incrementAndGet(); + + if (delegate != null) + delegate.onSwapRead(hit); + } + + /** + * Swap write callback. + */ + public void onSwapWrite() { + onSwapWrite(1); + } + + /** + * Swap write callback. + * + * @param cnt Amount of entries. + */ + public void onSwapWrite(int cnt) { + swapPuts.addAndGet(cnt); + + if (delegate != null) + delegate.onSwapWrite(cnt); + } + + /** + * Swap remove callback. + */ + public void onSwapRemove() { + onSwapRemove(1); + } + + /** + * Swap remove callback. + * + * @param cnt Amount of entries. + */ + public void onSwapRemove(int cnt) { + swapRemoves.addAndGet(cnt); + + if (delegate != null) + delegate.onSwapRemove(cnt); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java index e9d547c..966027a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java @@ -49,16 +49,116 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public long getOffHeapGets() { + return cache.metrics0().getOffHeapGets(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapPuts() { + return cache.metrics0().getOffHeapPuts(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapRemovals() { + return cache.metrics0().getOffHeapRemovals(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapEvictions() { + return cache.metrics0().getOffHeapEvictions(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapHits() { + return cache.metrics0().getOffHeapHits(); + } + + /** {@inheritDoc} */ + @Override public float getOffHeapHitPercentage() { + return cache.metrics0().getOffHeapHitPercentage(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapMisses() { + return cache.metrics0().getOffHeapMisses(); + } + + /** {@inheritDoc} */ + @Override public float getOffHeapMissPercentage() { + return cache.metrics0().getOffHeapMissPercentage(); + } + + /** {@inheritDoc} */ @Override public long getOffHeapEntriesCount() { return cache.metrics0().getOffHeapEntriesCount(); } /** {@inheritDoc} */ + @Override public long getOffHeapPrimaryEntriesCount() { + return cache.metrics0().getOffHeapPrimaryEntriesCount(); + } + + /** {@inheritDoc} */ + @Override public long getOffHeapBackupEntriesCount() { + return cache.metrics0().getOffHeapBackupEntriesCount(); + } + + /** {@inheritDoc} */ @Override public long getOffHeapAllocatedSize() { return cache.metrics0().getOffHeapAllocatedSize(); } /** {@inheritDoc} */ + @Override public long getOffHeapMaxSize() { + return cache.metrics0().getOffHeapMaxSize(); + } + + /** {@inheritDoc} */ + @Override public long getSwapGets() { + return cache.metrics0().getSwapGets(); + } + + /** {@inheritDoc} */ + @Override public long getSwapPuts() { + return cache.metrics0().getSwapPuts(); + } + + /** {@inheritDoc} */ + @Override public long getSwapRemovals() { + return cache.metrics0().getSwapRemovals(); + } + + /** {@inheritDoc} */ + @Override public long getSwapHits() { + return cache.metrics0().getSwapHits(); + } + + /** {@inheritDoc} */ + @Override public long getSwapMisses() { + return cache.metrics0().getSwapMisses(); + } + + /** {@inheritDoc} */ + @Override public float getSwapHitPercentage() { + return cache.metrics0().getSwapHitPercentage(); + } + + /** {@inheritDoc} */ + @Override public float getSwapMissPercentage() { + return cache.metrics0().getSwapMissPercentage(); + } + + /** {@inheritDoc} */ + @Override public long getSwapEntriesCount() { + return cache.metrics0().getSwapEntriesCount(); + } + + /** {@inheritDoc} */ + @Override public long getSwapSize() { + return cache.metrics0().getSwapSize(); + } + + /** {@inheritDoc} */ @Override public int getSize() { return cache.metrics0().getSize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index 4fe152a..cf16d9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -61,7 +61,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { private float getAvgTimeNanos = 0; /** Remove time taken nanos. */ - private float removeAvgTimeNanos = 0; + private float rmvAvgTimeNanos = 0; /** Commit transaction time taken nanos. */ private float commitAvgTimeNanos = 0; @@ -75,12 +75,60 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Number of entries that was swapped to disk. */ private long overflowSize; + /** Number of reads from off-heap. */ + private long offHeapGets; + + /** Number of writes to off-heap. */ + private long offHeapPuts; + + /** Number of removed entries from off-heap. */ + private long offHeapRemoves; + + /** Number of evictions from off-heap. */ + private long offHeapEvicts; + + /** Off-heap hits number. */ + private long offHeapHits; + + /** Off-heap misses number. */ + private long offHeapMisses; + /** Number of entries stored in off-heap memory. */ - private long offHeapEntriesCount; + private long offHeapEntriesCnt; + + /** Number of primary entries stored in off-heap memory. */ + private long offHeapPrimaryEntriesCnt; + + /** Number of backup entries stored in off-heap memory. */ + private long offHeapBackupEntriesCnt; /** Memory size allocated in off-heap. */ private long offHeapAllocatedSize; + /** Off-heap memory maximum size*/ + private long offHeapMaxSize; + + /** Number of reads from swap. */ + private long swapGets; + + /** Number of writes to swap. */ + private long swapPuts; + + /** Number of removed entries from swap. */ + private long swapRemoves; + + /** Number of entries stored in swap. */ + private long swapEntriesCnt; + + /** Swap hits number. */ + private long swapHits; + + /** Swap misses number. */ + private long swapMisses; + + /** Swap size. */ + private long swapSize; + /** Number of non-{@code null} values in the cache. */ private int size; @@ -91,7 +139,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { private boolean isEmpty; /** Gets current size of evict queue used to batch up evictions. */ - private int dhtEvictQueueCurrentSize; + private int dhtEvictQueueCurrSize; /** Transaction per-thread map size. */ private int txThreadMapSize; @@ -106,7 +154,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { private int txPrepareQueueSize; /** Start version counts map size. */ - private int txStartVersionCountsSize; + private int txStartVerCountsSize; /** Number of cached committed transaction IDs. */ private int txCommittedVersionsSize; @@ -127,7 +175,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { private int txDhtPrepareQueueSize; /** DHT start version counts map size. */ - private int txDhtStartVersionCountsSize; + private int txDhtStartVerCountsSize; /** Number of cached committed DHT transaction IDs. */ private int txDhtCommittedVersionsSize; @@ -142,34 +190,34 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { private int writeBehindFlushSize; /** Count of worker threads. */ - private int writeBehindFlushThreadCount; + private int writeBehindFlushThreadCnt; /** Flush frequency in milliseconds. */ - private long writeBehindFlushFrequency; + private long writeBehindFlushFreq; /** Maximum size of batch. */ private int writeBehindStoreBatchSize; /** Count of cache overflow events since start. */ - private int writeBehindTotalCriticalOverflowCount; + private int writeBehindTotalCriticalOverflowCnt; /** Count of cache overflow events since start. */ - private int writeBehindCriticalOverflowCount; + private int writeBehindCriticalOverflowCnt; /** Count of entries in store-retry state. */ - private int writeBehindErrorRetryCount; + private int writeBehindErrorRetryCnt; /** Total count of entries in cache store internal buffer. */ - private int writeBehindBufferSize; + private int writeBehindBufSize; /** */ private String keyType; /** */ - private String valueType; + private String valType; /** */ - private boolean isStoreByValue; + private boolean isStoreByVal; /** */ private boolean isStatisticsEnabled; @@ -207,45 +255,64 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { putAvgTimeNanos = m.getAveragePutTime(); getAvgTimeNanos = m.getAverageGetTime(); - removeAvgTimeNanos = m.getAverageRemoveTime(); + rmvAvgTimeNanos = m.getAverageRemoveTime(); commitAvgTimeNanos = m.getAverageTxCommitTime(); rollbackAvgTimeNanos = m.getAverageTxRollbackTime(); cacheName = m.name(); overflowSize = m.getOverflowSize(); - offHeapEntriesCount = m.getOffHeapEntriesCount(); + + offHeapGets = m.getOffHeapGets(); + offHeapPuts = m.getOffHeapPuts(); + offHeapRemoves = m.getOffHeapRemovals(); + offHeapEvicts = m.getOffHeapEvictions(); + offHeapHits = m.getOffHeapHits(); + offHeapMisses = m.getOffHeapMisses(); + offHeapEntriesCnt = m.getOffHeapEntriesCount(); + offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount(); + offHeapBackupEntriesCnt = m.getOffHeapBackupEntriesCount(); offHeapAllocatedSize = m.getOffHeapAllocatedSize(); + offHeapMaxSize = m.getOffHeapMaxSize(); + + swapGets = m.getSwapGets(); + swapPuts = m.getSwapPuts(); + swapRemoves = m.getSwapRemovals(); + swapHits = m.getSwapHits(); + swapMisses = m.getSwapMisses(); + swapEntriesCnt = m.getSwapEntriesCount(); + swapSize = m.getSwapSize(); + size = m.getSize(); keySize = m.getKeySize(); isEmpty = m.isEmpty(); - dhtEvictQueueCurrentSize = m.getDhtEvictQueueCurrentSize(); + dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize(); txThreadMapSize = m.getTxThreadMapSize(); txXidMapSize = m.getTxXidMapSize(); txCommitQueueSize = m.getTxCommitQueueSize(); txPrepareQueueSize = m.getTxPrepareQueueSize(); - txStartVersionCountsSize = m.getTxStartVersionCountsSize(); + txStartVerCountsSize = m.getTxStartVersionCountsSize(); txCommittedVersionsSize = m.getTxCommittedVersionsSize(); txRolledbackVersionsSize = m.getTxRolledbackVersionsSize(); txDhtThreadMapSize = m.getTxDhtThreadMapSize(); txDhtXidMapSize = m.getTxDhtXidMapSize(); txDhtCommitQueueSize = m.getTxDhtCommitQueueSize(); txDhtPrepareQueueSize = m.getTxDhtPrepareQueueSize(); - txDhtStartVersionCountsSize = m.getTxDhtStartVersionCountsSize(); + txDhtStartVerCountsSize = m.getTxDhtStartVersionCountsSize(); txDhtCommittedVersionsSize = m.getTxDhtCommittedVersionsSize(); txDhtRolledbackVersionsSize = m.getTxDhtRolledbackVersionsSize(); isWriteBehindEnabled = m.isWriteBehindEnabled(); writeBehindFlushSize = m.getWriteBehindFlushSize(); - writeBehindFlushThreadCount = m.getWriteBehindFlushThreadCount(); - writeBehindFlushFrequency = m.getWriteBehindFlushFrequency(); + writeBehindFlushThreadCnt = m.getWriteBehindFlushThreadCount(); + writeBehindFlushFreq = m.getWriteBehindFlushFrequency(); writeBehindStoreBatchSize = m.getWriteBehindStoreBatchSize(); - writeBehindTotalCriticalOverflowCount = m.getWriteBehindTotalCriticalOverflowCount(); - writeBehindCriticalOverflowCount = m.getWriteBehindCriticalOverflowCount(); - writeBehindErrorRetryCount = m.getWriteBehindErrorRetryCount(); - writeBehindBufferSize = m.getWriteBehindBufferSize(); + writeBehindTotalCriticalOverflowCnt = m.getWriteBehindTotalCriticalOverflowCount(); + writeBehindCriticalOverflowCnt = m.getWriteBehindCriticalOverflowCount(); + writeBehindErrorRetryCnt = m.getWriteBehindErrorRetryCount(); + writeBehindBufSize = m.getWriteBehindBufferSize(); keyType = m.getKeyType(); - valueType = m.getValueType(); - isStoreByValue = m.isStoreByValue(); + valType = m.getValueType(); + isStoreByVal = m.isStoreByValue(); isStatisticsEnabled = m.isStatisticsEnabled(); isManagementEnabled = m.isManagementEnabled(); isReadThrough = m.isReadThrough(); @@ -263,21 +330,23 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { isEmpty = loc.isEmpty(); isWriteBehindEnabled = loc.isWriteBehindEnabled(); writeBehindFlushSize = loc.getWriteBehindFlushSize(); - writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount(); - writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency(); + writeBehindFlushThreadCnt = loc.getWriteBehindFlushThreadCount(); + writeBehindFlushFreq = loc.getWriteBehindFlushFrequency(); writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize(); - writeBehindBufferSize = loc.getWriteBehindBufferSize(); + writeBehindBufSize = loc.getWriteBehindBufferSize(); size = loc.getSize(); keySize = loc.getKeySize(); keyType = loc.getKeyType(); - valueType = loc.getValueType(); - isStoreByValue = loc.isStoreByValue(); + valType = loc.getValueType(); + isStoreByVal = loc.isStoreByValue(); isStatisticsEnabled = loc.isStatisticsEnabled(); isManagementEnabled = loc.isManagementEnabled(); isReadThrough = loc.isReadThrough(); isWriteThrough = loc.isWriteThrough(); + offHeapMaxSize = loc.getOffHeapMaxSize(); + for (CacheMetrics e : metrics) { reads += e.getCacheGets(); puts += e.getCachePuts(); @@ -290,7 +359,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { putAvgTimeNanos += e.getAveragePutTime(); getAvgTimeNanos += e.getAverageGetTime(); - removeAvgTimeNanos += e.getAverageRemoveTime(); + rmvAvgTimeNanos += e.getAverageRemoveTime(); commitAvgTimeNanos += e.getAverageTxCommitTime(); rollbackAvgTimeNanos += e.getAverageTxRollbackTime(); @@ -299,19 +368,35 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { else overflowSize = -1; - offHeapEntriesCount += e.getOffHeapEntriesCount(); + offHeapGets += e.getOffHeapGets(); + offHeapPuts += e.getOffHeapPuts(); + offHeapRemoves += e.getOffHeapRemovals(); + offHeapEvicts += e.getOffHeapEvictions(); + offHeapHits += e.getOffHeapHits(); + offHeapMisses += e.getOffHeapMisses(); + offHeapEntriesCnt += e.getOffHeapEntriesCount(); + offHeapPrimaryEntriesCnt += e.getOffHeapPrimaryEntriesCount(); + offHeapBackupEntriesCnt += e.getOffHeapBackupEntriesCount(); offHeapAllocatedSize += e.getOffHeapAllocatedSize(); + swapGets += e.getSwapGets(); + swapPuts += e.getSwapPuts(); + swapRemoves += e.getSwapRemovals(); + swapHits += e.getSwapHits(); + swapMisses += e.getSwapMisses(); + swapEntriesCnt += e.getSwapEntriesCount(); + swapSize += e.getSwapSize(); + if (e.getDhtEvictQueueCurrentSize() > -1) - dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize(); + dhtEvictQueueCurrSize += e.getDhtEvictQueueCurrentSize(); else - dhtEvictQueueCurrentSize = -1; + dhtEvictQueueCurrSize = -1; txThreadMapSize += e.getTxThreadMapSize(); txXidMapSize += e.getTxXidMapSize(); txCommitQueueSize += e.getTxCommitQueueSize(); txPrepareQueueSize += e.getTxPrepareQueueSize(); - txStartVersionCountsSize += e.getTxStartVersionCountsSize(); + txStartVerCountsSize += e.getTxStartVersionCountsSize(); txCommittedVersionsSize += e.getTxCommittedVersionsSize(); txRolledbackVersionsSize += e.getTxRolledbackVersionsSize(); @@ -336,9 +421,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { txDhtPrepareQueueSize = -1; if (e.getTxDhtStartVersionCountsSize() > -1) - txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize(); + txDhtStartVerCountsSize += e.getTxDhtStartVersionCountsSize(); else - txDhtStartVersionCountsSize = -1; + txDhtStartVerCountsSize = -1; if (e.getTxDhtCommittedVersionsSize() > -1) txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize(); @@ -351,19 +436,19 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { txDhtRolledbackVersionsSize = -1; if (e.getWriteBehindTotalCriticalOverflowCount() > -1) - writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount(); + writeBehindTotalCriticalOverflowCnt += e.getWriteBehindTotalCriticalOverflowCount(); else - writeBehindTotalCriticalOverflowCount = -1; + writeBehindTotalCriticalOverflowCnt = -1; if (e.getWriteBehindCriticalOverflowCount() > -1) - writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount(); + writeBehindCriticalOverflowCnt += e.getWriteBehindCriticalOverflowCount(); else - writeBehindCriticalOverflowCount = -1; + writeBehindCriticalOverflowCnt = -1; if (e.getWriteBehindErrorRetryCount() > -1) - writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount(); + writeBehindErrorRetryCnt += e.getWriteBehindErrorRetryCount(); else - writeBehindErrorRetryCount = -1; + writeBehindErrorRetryCnt = -1; } int size = metrics.size(); @@ -371,7 +456,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { if (size > 1) { putAvgTimeNanos /= size; getAvgTimeNanos /= size; - removeAvgTimeNanos /= size; + rmvAvgTimeNanos /= size; commitAvgTimeNanos /= size; rollbackAvgTimeNanos /= size; } @@ -435,7 +520,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public float getAverageRemoveTime() { - return removeAvgTimeNanos; + return rmvAvgTimeNanos; } /** {@inheritDoc} */ @@ -469,8 +554,63 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public long getOffHeapGets() { + return offHeapGets; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapPuts() { + return offHeapPuts; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapRemovals() { + return offHeapRemoves; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapEvictions() { + return offHeapEvicts; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapHits() { + return offHeapHits; + } + + /** {@inheritDoc} */ + @Override public float getOffHeapHitPercentage() { + if (offHeapHits == 0 || offHeapGets == 0) + return 0; + + return (float) offHeapHits / offHeapGets * 100.0f; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapMisses() { + return offHeapMisses; + } + + /** {@inheritDoc} */ + @Override public float getOffHeapMissPercentage() { + if (offHeapMisses == 0 || offHeapGets == 0) + return 0; + + return (float) offHeapMisses / offHeapGets * 100.0f; + } + /** {@inheritDoc} */ @Override public long getOffHeapEntriesCount() { - return offHeapEntriesCount; + return offHeapEntriesCnt; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapPrimaryEntriesCount() { + return offHeapPrimaryEntriesCnt; + } + + /** {@inheritDoc} */ + @Override public long getOffHeapBackupEntriesCount() { + return offHeapBackupEntriesCnt; } /** {@inheritDoc} */ @@ -479,6 +619,62 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public long getOffHeapMaxSize() { + return offHeapMaxSize; + } + + /** {@inheritDoc} */ + @Override public long getSwapGets() { + return swapGets; + } + + /** {@inheritDoc} */ + @Override public long getSwapPuts() { + return swapPuts; + } + + /** {@inheritDoc} */ + @Override public long getSwapRemovals() { + return swapRemoves; + } + + /** {@inheritDoc} */ + @Override public long getSwapHits() { + return swapHits; + } + + /** {@inheritDoc} */ + @Override public long getSwapMisses() { + return swapMisses; + } + + /** {@inheritDoc} */ + @Override public float getSwapHitPercentage() { + if (swapHits == 0 || swapGets == 0) + return 0; + + return (float) swapHits / swapGets * 100.0f; + } + + /** {@inheritDoc} */ + @Override public float getSwapMissPercentage() { + if (swapMisses == 0 || swapGets == 0) + return 0; + + return (float) swapMisses / swapGets * 100.0f; + } + + /** {@inheritDoc} */ + @Override public long getSwapEntriesCount() { + return swapEntriesCnt; + } + + /** {@inheritDoc} */ + @Override public long getSwapSize() { + return swapSize; + } + + /** {@inheritDoc} */ @Override public int getSize() { return size; } @@ -495,7 +691,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public int getDhtEvictQueueCurrentSize() { - return dhtEvictQueueCurrentSize; + return dhtEvictQueueCurrSize; } /** {@inheritDoc} */ @@ -520,7 +716,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public int getTxStartVersionCountsSize() { - return txStartVersionCountsSize; + return txStartVerCountsSize; } /** {@inheritDoc} */ @@ -555,7 +751,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public int getTxDhtStartVersionCountsSize() { - return txDhtStartVersionCountsSize; + return txDhtStartVerCountsSize; } /** {@inheritDoc} */ @@ -580,12 +776,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public int getWriteBehindFlushThreadCount() { - return writeBehindFlushThreadCount; + return writeBehindFlushThreadCnt; } /** {@inheritDoc} */ @Override public long getWriteBehindFlushFrequency() { - return writeBehindFlushFrequency; + return writeBehindFlushFreq; } /** {@inheritDoc} */ @@ -595,22 +791,22 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public int getWriteBehindTotalCriticalOverflowCount() { - return writeBehindTotalCriticalOverflowCount; + return writeBehindTotalCriticalOverflowCnt; } /** {@inheritDoc} */ @Override public int getWriteBehindCriticalOverflowCount() { - return writeBehindCriticalOverflowCount; + return writeBehindCriticalOverflowCnt; } /** {@inheritDoc} */ @Override public int getWriteBehindErrorRetryCount() { - return writeBehindErrorRetryCount; + return writeBehindErrorRetryCnt; } /** {@inheritDoc} */ @Override public int getWriteBehindBufferSize() { - return writeBehindBufferSize; + return writeBehindBufSize; } /** {@inheritDoc} */ @@ -620,12 +816,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** {@inheritDoc} */ @Override public String getValueType() { - return valueType; + return valType; } /** {@inheritDoc} */ @Override public boolean isStoreByValue() { - return isStoreByValue; + return isStoreByVal; } /** {@inheritDoc} */ @@ -666,31 +862,49 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { out.writeFloat(putAvgTimeNanos); out.writeFloat(getAvgTimeNanos); - out.writeFloat(removeAvgTimeNanos); + out.writeFloat(rmvAvgTimeNanos); out.writeFloat(commitAvgTimeNanos); out.writeFloat(rollbackAvgTimeNanos); out.writeLong(overflowSize); - out.writeLong(offHeapEntriesCount); + out.writeLong(offHeapGets); + out.writeLong(offHeapPuts); + out.writeLong(offHeapRemoves); + out.writeLong(offHeapEvicts); + out.writeLong(offHeapHits); + out.writeLong(offHeapMisses); + out.writeLong(offHeapEntriesCnt); + out.writeLong(offHeapPrimaryEntriesCnt); + out.writeLong(offHeapBackupEntriesCnt); out.writeLong(offHeapAllocatedSize); - out.writeInt(dhtEvictQueueCurrentSize); + out.writeLong(offHeapMaxSize); + + out.writeLong(swapGets); + out.writeLong(swapPuts); + out.writeLong(swapRemoves); + out.writeLong(swapHits); + out.writeLong(swapMisses); + out.writeLong(swapEntriesCnt); + out.writeLong(swapSize); + + out.writeInt(dhtEvictQueueCurrSize); out.writeInt(txThreadMapSize); out.writeInt(txXidMapSize); out.writeInt(txCommitQueueSize); out.writeInt(txPrepareQueueSize); - out.writeInt(txStartVersionCountsSize); + out.writeInt(txStartVerCountsSize); out.writeInt(txCommittedVersionsSize); out.writeInt(txRolledbackVersionsSize); out.writeInt(txDhtThreadMapSize); out.writeInt(txDhtXidMapSize); out.writeInt(txDhtCommitQueueSize); out.writeInt(txDhtPrepareQueueSize); - out.writeInt(txDhtStartVersionCountsSize); + out.writeInt(txDhtStartVerCountsSize); out.writeInt(txDhtCommittedVersionsSize); out.writeInt(txDhtRolledbackVersionsSize); - out.writeInt(writeBehindTotalCriticalOverflowCount); - out.writeInt(writeBehindCriticalOverflowCount); - out.writeInt(writeBehindErrorRetryCount); + out.writeInt(writeBehindTotalCriticalOverflowCnt); + out.writeInt(writeBehindCriticalOverflowCnt); + out.writeInt(writeBehindErrorRetryCnt); } /** {@inheritDoc} */ @@ -706,30 +920,48 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { putAvgTimeNanos = in.readFloat(); getAvgTimeNanos = in.readFloat(); - removeAvgTimeNanos = in.readFloat(); + rmvAvgTimeNanos = in.readFloat(); commitAvgTimeNanos = in.readFloat(); rollbackAvgTimeNanos = in.readFloat(); overflowSize = in.readLong(); - offHeapEntriesCount = in.readLong(); + offHeapGets = in.readLong(); + offHeapPuts = in.readLong(); + offHeapRemoves = in.readLong(); + offHeapEvicts = in.readLong(); + offHeapHits = in.readLong(); + offHeapMisses = in.readLong(); + offHeapEntriesCnt = in.readLong(); + offHeapPrimaryEntriesCnt = in.readLong(); + offHeapBackupEntriesCnt = in.readLong(); offHeapAllocatedSize = in.readLong(); - dhtEvictQueueCurrentSize = in.readInt(); + offHeapMaxSize = in.readLong(); + + swapGets = in.readLong(); + swapPuts = in.readLong(); + swapRemoves = in.readLong(); + swapHits = in.readLong(); + swapMisses = in.readLong(); + swapEntriesCnt = in.readLong(); + swapSize = in.readLong(); + + dhtEvictQueueCurrSize = in.readInt(); txThreadMapSize = in.readInt(); txXidMapSize = in.readInt(); txCommitQueueSize = in.readInt(); txPrepareQueueSize = in.readInt(); - txStartVersionCountsSize = in.readInt(); + txStartVerCountsSize = in.readInt(); txCommittedVersionsSize = in.readInt(); txRolledbackVersionsSize = in.readInt(); txDhtThreadMapSize = in.readInt(); txDhtXidMapSize = in.readInt(); txDhtCommitQueueSize = in.readInt(); txDhtPrepareQueueSize = in.readInt(); - txDhtStartVersionCountsSize = in.readInt(); + txDhtStartVerCountsSize = in.readInt(); txDhtCommittedVersionsSize = in.readInt(); txDhtRolledbackVersionsSize = in.readInt(); - writeBehindTotalCriticalOverflowCount = in.readInt(); - writeBehindCriticalOverflowCount = in.readInt(); - writeBehindErrorRetryCount = in.readInt(); + writeBehindTotalCriticalOverflowCnt = in.readInt(); + writeBehindCriticalOverflowCnt = in.readInt(); + writeBehindErrorRetryCnt = in.readInt(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index ac3660e..dfc39c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -17,16 +17,18 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** * Cache change batch. */ -public class DynamicCacheChangeBatch implements Serializable { +public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** */ private static final long serialVersionUID = 0L; @@ -38,6 +40,9 @@ public class DynamicCacheChangeBatch implements Serializable { @GridToStringInclude private Map> clientNodes; + /** Custom message ID. */ + private IgniteUuid id = IgniteUuid.randomUuid(); + /** * @param reqs Requests. */ @@ -47,6 +52,11 @@ public class DynamicCacheChangeBatch implements Serializable { this.reqs = reqs; } + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + /** * @return Collection of change requests. */ @@ -69,6 +79,21 @@ public class DynamicCacheChangeBatch implements Serializable { } /** {@inheritDoc} */ + @Override public boolean incrementMinorTopologyVersion() { + return true; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 6f6f422..9c6cc43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -61,7 +61,11 @@ public class DynamicCacheDescriptor { /** Cache plugin manager. */ private final CachePluginManager pluginMgr; + /** */ + private boolean updatesAllowed = true; + /** + * @param ctx Context. * @param cacheCfg Cache configuration. * @param cacheType Cache type. * @param template {@code True} if this is template configuration. @@ -76,6 +80,7 @@ public class DynamicCacheDescriptor { this.cacheType = cacheType; this.template = template; this.deploymentId = deploymentId; + pluginMgr = new CachePluginManager(ctx, cacheCfg); } @@ -206,6 +211,20 @@ public class DynamicCacheDescriptor { rmtCfgs = null; } + /** + * @return Updates allowed flag. + */ + public boolean updatesAllowed() { + return updatesAllowed; + } + + /** + * @param updatesAllowed Updates allowed flag. + */ + public void updatesAllowed(boolean updatesAllowed) { + this.updatesAllowed = updatesAllowed; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName())); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d390037..d8d029e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -348,7 +348,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache preloader(); + public abstract GridCachePreloader preloader(); /** {@inheritDoc} */ @Override public Affinity affinity() { @@ -395,6 +395,10 @@ public abstract class GridCacheAdapter implements IgniteInternalCache withExpiryPolicy(ExpiryPolicy plc) { + assert !CU.isUtilityCache(ctx.name()); + assert !CU.isAtomicsCache(ctx.name()); + assert !CU.isMarshallerCache(ctx.name()); + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc); return new GridCacheProxyImpl<>(ctx, this, opCtx); @@ -902,7 +906,12 @@ public abstract class GridCacheAdapter implements IgniteInternalCache keySet() { - return keySet((CacheEntryPredicate[]) null); + return keySet((CacheEntryPredicate[])null); + } + + /** {@inheritDoc} */ + @Override public Set keySetx() { + return keySetx((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -1215,11 +1224,22 @@ public abstract class GridCacheAdapter implements IgniteInternalCache getAllOutTx(List keys) throws IgniteCheckedException { + @Nullable @Override public Map getAllOutTx(Set keys) throws IgniteCheckedException { + return getAllOutTxAsync(keys).get(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture> getAllOutTxAsync(Set keys) { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true, false) - .get(); + return getAllAsync(keys, + !ctx.config().isReadFromBackup(), + /*skip tx*/true, + null, + null, + taskName, + !ctx.keepPortable(), + false); } /** @@ -3249,7 +3269,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache keySetx(@Nullable CacheEntryPredicate... filter) { + return map.keySetx(filter); + } + + /** * @param filter Primary key set. * @return Primary key set. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index fe7efd5..ea17df1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -140,6 +140,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { * * @param topVer Topology version to calculate affinity for. * @param discoEvt Discovery event that causes this topology change. + * @return Affinity assignments. */ public List> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { assert !cctx.isLocal(); @@ -148,6 +149,19 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** + * Copies previous affinity assignment when discovery event does not cause affinity assignment changes + * (e.g. client node joins on leaves). + * + * @param evt Event. + * @param topVer Topology version. + */ + public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { + assert !cctx.isLocal(); + + aff.clientEventTopologyChange(evt, topVer); + } + + /** * @return Partition count. */ public int partitions() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index bd3e0f2..db5eed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -626,7 +626,19 @@ public class GridCacheConcurrentMap { public Set keySet(CacheEntryPredicate... filter) { checkWeakQueue(); - return new KeySet<>(this, filter); + return new KeySet<>(this, filter, false); + } + + /** + * Key set including internal keys. + * + * @param filter Filter. + * @return Set of the keys contained in this map. + */ + public Set keySetx(CacheEntryPredicate... filter) { + checkWeakQueue(); + + return new KeySet<>(this, filter, true); } /** @@ -1921,7 +1933,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public void clear() { - ctx.cache().clearLocally0(new KeySet(map, filter)); + ctx.cache().clearLocally0(new KeySet(map, filter, false)); } /** {@inheritDoc} */ @@ -2171,11 +2183,12 @@ public class GridCacheConcurrentMap { /** * @param map Base map. * @param filter Key filter. + * @param internal Whether to allow internal keys. */ - private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) { + private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter, boolean internal) { assert map != null; - set = new Set0<>(map, nonInternal(filter)); + set = new Set0<>(map, internal ? filter : nonInternal(filter)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 2eeaed6..8a4e3b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -196,6 +196,9 @@ public class GridCacheContext implements Externalizable { /** Dynamic cache deployment ID. */ private IgniteUuid dynamicDeploymentId; + /** Updates allowed flag. */ + private boolean updatesAllowed; + /** * Empty constructor required for {@link Externalizable}. */ @@ -209,6 +212,7 @@ public class GridCacheContext implements Externalizable { * @param cacheCfg Cache configuration. * @param cacheType Cache type. * @param affNode {@code True} if local node is affinity node. + * @param updatesAllowed Updates allowed flag. * @param evtMgr Cache event manager. * @param swapMgr Cache swap manager. * @param storeMgr Store manager. @@ -230,6 +234,7 @@ public class GridCacheContext implements Externalizable { CacheConfiguration cacheCfg, CacheType cacheType, boolean affNode, + boolean updatesAllowed, /* * Managers in starting order! @@ -271,6 +276,7 @@ public class GridCacheContext implements Externalizable { this.cacheCfg = cacheCfg; this.cacheType = cacheType; this.affNode = affNode; + this.updatesAllowed = updatesAllowed; /* * Managers in starting order! @@ -348,7 +354,7 @@ public class GridCacheContext implements Externalizable { public void awaitStarted() throws IgniteCheckedException { U.await(startLatch); - GridCachePreloader prldr = preloader(); + GridCachePreloader prldr = preloader(); if (prldr != null) prldr.startFuture().get(); @@ -361,7 +367,7 @@ public class GridCacheContext implements Externalizable { if (startLatch.getCount() != 0) return false; - GridCachePreloader prldr = preloader(); + GridCachePreloader prldr = preloader(); return prldr == null || prldr.startFuture().isDone(); } @@ -682,7 +688,7 @@ public class GridCacheContext implements Externalizable { /** * @return Preloader. */ - public GridCachePreloader preloader() { + public GridCachePreloader preloader() { return cache().preloader(); } @@ -1469,9 +1475,6 @@ public class GridCacheContext implements Externalizable { Collection dhtNodeIds = new ArrayList<>(dhtRemoteNodes); Collection nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes); - if (!F.isEmpty(nearNodeIds)) - U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds); - entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds); } @@ -1809,6 +1812,13 @@ public class GridCacheContext implements Externalizable { } /** + * @return Updates allowed. + */ + public boolean updatesAllowed() { + return updatesAllowed; + } + + /** * Nulling references to potentially leak-prone objects. */ public void cleanup() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 8d3d089..3857b35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.eviction.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; @@ -943,4 +944,9 @@ public interface GridCacheEntryEx { * @return {@code True} if value was removed, {@code false} otherwise. */ public boolean removeMeta(UUID name, V val); + + /** + * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. + */ + public void onUnlock(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 9d680ef..d9d151c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -127,7 +127,7 @@ public class GridCacheGateway { try { GridCacheAdapter cache = ctx.cache(); - GridCachePreloader preldr = cache != null ? cache.preloader() : null; + GridCachePreloader preldr = cache != null ? cache.preloader() : null; if (preldr == null) throw new IllegalStateException("Grid is in invalid state to perform this operation. " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 02f16c0..eef9fde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -472,7 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.miniId(), false, 0, - req.classError()); + req.classError(), + null); sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } @@ -488,7 +489,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { req.miniId(), req.version(), req.version(), - null, null, null); + null, + null, + null, + null); res.error(req.classError()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 92035af..4680994 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -61,6 +61,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { private static final byte IS_UNSWAPPED_MASK = 0x02; /** */ + private static final byte IS_OFFHEAP_PTR_MASK = 0x04; + + /** */ public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator(); /** @@ -433,6 +436,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (e.offheapPointer() > 0) { offHeapPointer(e.offheapPointer()); + flags |= IS_OFFHEAP_PTR_MASK; + if (needVal) { CacheObject val = cctx.fromOffheap(offHeapPointer(), false); @@ -498,7 +503,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return; } - if (val == null && cctx.offheapTiered() && hasOffHeapPointer()) { + if (cctx.offheapTiered() && hasOffHeapPointer()) { if (log.isDebugEnabled()) log.debug("Value did not change, skip write swap entry: " + this); @@ -509,10 +514,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } IgniteUuid valClsLdrId = null; + IgniteUuid keyClsLdrId = null; - if (val != null) { - valClsLdrId = cctx.deploy().getClassLoaderId( - val.value(cctx.cacheObjectContext(), false).getClass().getClassLoader()); + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + if (val != null) { + valClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + } + + keyClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))); } IgniteBiTuple valBytes = valueBytes0(); @@ -523,7 +534,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ver, ttlExtras(), expireTime, - cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))), + keyClsLdrId, valClsLdrId); if (log.isDebugEnabled()) @@ -3617,6 +3628,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return true; } + else + evictFailed(prev); } } else { @@ -3660,8 +3673,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return true; } - else + else { + evictFailed(prevVal); + return false; + } } } } @@ -3680,6 +3696,27 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return false; } + /** + * @param prevVal Previous value. + * @throws IgniteCheckedException If failed. + */ + private void evictFailed(@Nullable CacheObject prevVal) throws IgniteCheckedException { + if (cctx.offheapTiered() && ((flags & IS_OFFHEAP_PTR_MASK) != 0)) { + flags &= ~IS_OFFHEAP_PTR_MASK; + + if (prevVal != null) { + cctx.swap().removeOffheap(key()); + + value(prevVal); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (qryMgr != null) + qryMgr.onUnswap(key, prevVal); + } + } + } + /** {@inheritDoc} */ @Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer) throws IgniteCheckedException { @@ -3692,10 +3729,17 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (!isStartVersion() && hasValueUnlocked()) { IgniteUuid valClsLdrId = null; + IgniteUuid keyClsLdrId = null; - if (val != null) - valClsLdrId = cctx.deploy().getClassLoaderId( - U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + if (val != null) { + valClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + } + + keyClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))); + } IgniteBiTuple valBytes = valueBytes0(); @@ -3706,7 +3750,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ver, ttlExtras(), expireTimeExtras(), - cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))), + keyClsLdrId, valClsLdrId); } @@ -4100,6 +4144,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** {@inheritDoc} */ + @Override public void onUnlock() { + // No-op. + } + + /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. return o == this; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c05e4b4..c528e08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -955,6 +955,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param topVer Topology version. + * @return Locked keys. + */ + public Map> unfinishedLocks(AffinityTopologyVersion topVer) { + Map> cands = new HashMap<>(); + + for (FinishLockFuture fut : finishFuts) { + if (fut.topologyVersion().equals(topVer)) + cands.putAll(fut.pendingLocks()); + } + + return cands; + } + + /** * Creates a future that will wait for all explicit locks acquired on given topology * version to be released. * @@ -1041,8 +1056,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { finishFuts.add(finishFut); finishFut.listen(new CI1>() { - @Override - public void apply(IgniteInternalFuture e) { + @Override public void apply(IgniteInternalFuture e) { finishFuts.remove(finishFut); // This call is required to make sure that the concurrent queue @@ -1117,6 +1131,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Topology version. + */ + AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Pending locks. + */ + Map> pendingLocks() { + return pendingLocks; + } + + /** * @return Filter. */ private IgnitePredicate versionFilter() {