Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B2369200C8C for ; Tue, 6 Jun 2017 17:05:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0D2E160BC3; Tue, 6 Jun 2017 15:05:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 412DA160BEA for ; Tue, 6 Jun 2017 17:05:52 +0200 (CEST) Received: (qmail 59664 invoked by uid 500); 6 Jun 2017 15:05:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59420 invoked by uid 99); 6 Jun 2017 15:05:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 15:05:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 10681DFEF3; Tue, 6 Jun 2017 15:05:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 06 Jun 2017 15:05:57 -0000 Message-Id: <884b947248e946fb874e4e218529fd21@git.apache.org> In-Reply-To: <45bcf916120348db8a16a0dd155d74ad@git.apache.org> References: <45bcf916120348db8a16a0dd155d74ad@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/22] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches archived-at: Tue, 06 Jun 2017 15:05:54 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 6fb557a..cba9477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -19,7 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -30,30 +33,33 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@ -70,6 +76,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Key partition. */ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable, GridReservable { + /** */ + private static final GridCacheMapEntryFactory ENTRY_FACTORY = new GridCacheMapEntryFactory() { + @Override public GridCacheMapEntry create( + GridCacheContext ctx, + AffinityTopologyVersion topVer, + KeyCacheObject key + ) { + return new GridDhtCacheEntry(ctx, topVer, key); + } + }; + /** Maximum size for delete queue. */ public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000); @@ -100,29 +117,48 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @GridToStringExclude private final GridFutureAdapter rent; - /** Context. */ - private final GridCacheContext cctx; + /** */ + @GridToStringExclude + private final GridCacheSharedContext ctx; + + /** */ + @GridToStringExclude + private final CacheGroupContext grp; /** Create time. */ @GridToStringExclude private final long createTime = U.currentTimeMillis(); /** Eviction history. */ + @GridToStringExclude private volatile Map evictHist = new HashMap<>(); /** Lock. */ + @GridToStringExclude private final ReentrantLock lock = new ReentrantLock(); + /** */ + @GridToStringExclude + private final ConcurrentMap cacheMaps; + + /** */ + @GridToStringExclude + private final CacheMapHolder singleCacheEntryMap; + /** Remove queue. */ + @GridToStringExclude private final ConcurrentLinkedDeque8 rmvQueue = new ConcurrentLinkedDeque8<>(); /** Group reservations. */ + @GridToStringExclude private final CopyOnWriteArrayList reservations = new CopyOnWriteArrayList<>(); /** */ + @GridToStringExclude private final CacheDataStore store; /** Partition updates. */ + @GridToStringExclude private final ConcurrentNavigableMap updates = new ConcurrentSkipListMap<>(); /** Last applied update. */ @@ -133,18 +169,30 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements private volatile boolean shouldBeRenting; /** - * @param cctx Context. + * @param ctx Context. + * @param grp Cache group. * @param id Partition ID. - * @param entryFactory Entry factory. */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - GridDhtLocalPartition(GridCacheContext cctx, int id, GridCacheMapEntryFactory entryFactory) { - super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions())); + GridDhtLocalPartition(GridCacheSharedContext ctx, + CacheGroupContext grp, + int id) { + super(ENTRY_FACTORY); this.id = id; - this.cctx = cctx; + this.ctx = ctx; + this.grp = grp; + + log = U.logger(ctx.kernalContext(), logRef, this); - log = U.logger(cctx.kernalContext(), logRef, this); + if (grp.sharedGroup()) { + singleCacheEntryMap = null; + cacheMaps = new ConcurrentHashMap<>(); + } + else { + singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap()); + cacheMaps = null; + } rent = new GridFutureAdapter() { @Override public String toString() { @@ -152,15 +200,15 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } }; - int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : - Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); + int delQueueSize = grp.systemCache() ? 100 : + Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); rmvQueueMaxSize = U.ceilPow2(delQueueSize); rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000); try { - store = cctx.offheap().createCacheDataStore(id); + store = grp.offheap().createCacheDataStore(id); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -169,6 +217,62 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @return Entries map. + */ + private ConcurrentMap createEntriesMap() { + return new ConcurrentHashMap8<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()), + 0.75f, + Runtime.getRuntime().availableProcessors() * 2); + } + + /** {@inheritDoc} */ + @Override public int internalSize() { + if (grp.sharedGroup()) { + int size = 0; + + for (CacheMapHolder hld : cacheMaps.values()) + size += hld.map.size(); + + return size; + } + + return singleCacheEntryMap.map.size(); + } + + /** {@inheritDoc} */ + @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + if (grp.sharedGroup()) + return cacheMapHolder(cctx); + + return singleCacheEntryMap; + } + + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + } + + /** + * @param cctx Cache context. + * @return Map holder. + */ + private CacheMapHolder cacheMapHolder(GridCacheContext cctx) { + assert grp.sharedGroup(); + + CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed()); + + if (hld != null) + return hld; + + CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap())); + + if (old != null) + hld = old; + + return hld; + } + + /** * @return Data store. */ public CacheDataStore dataStore() { @@ -235,10 +339,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if partition is empty. */ public boolean isEmpty() { - if (cctx.allowFastEviction()) + if (grp.allowFastEviction()) return internalSize() == 0; - return store.size() == 0 && internalSize() == 0; + return store.fullSize() == 0 && internalSize() == 0; } /** @@ -307,6 +411,20 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Version. + */ + private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { + CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + + GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; + + if (entry != null && entry.markObsoleteVersion(ver)) + removeEntry(entry); + } + + /** * */ public void cleanupRemoveQueue() { @@ -314,10 +432,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements RemovedEntryHolder item = rmvQueue.pollFirst(); if (item != null) - cctx.dht().removeVersionedEntry(item.key(), item.version()); + removeVersionedEntry(item.cacheId(), item.key(), item.version()); } - if (!cctx.isDrEnabled()) { + if (!grp.isDrEnabled()) { RemovedEntryHolder item = rmvQueue.peekFirst(); while (item != null && item.expireTime() < U.currentTimeMillis()) { @@ -326,7 +444,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (item == null) break; - cctx.dht().removeVersionedEntry(item.key(), item.version()); + removeVersionedEntry(item.cacheId(), item.key(), item.version()); item = rmvQueue.peekFirst(); } @@ -334,13 +452,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId cacheId Cache ID. * @param key Removed key. * @param ver Removed version. */ - public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) { + public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) { cleanupRemoveQueue(); - rmvQueue.add(new RemovedEntryHolder(key, ver, rmvdEntryTtl)); + rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl)); } /** @@ -438,7 +557,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override protected void release(int sizeChange, GridCacheEntryEx e) { + @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup() && sizeChange != 0) + hld.size.addAndGet(sizeChange); + release0(sizeChange); } @@ -486,16 +608,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code true} if cas succeeds. */ private boolean casState(long state, GridDhtPartitionState toState) { - if (cctx.shared().database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled()) { synchronized (this) { boolean update = this.state.compareAndSet(state, setPartState(state, toState)); if (update) try { - cctx.shared().wal().log(new PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter())); + ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter())); } catch (IgniteCheckedException e) { - log.error("Error while writing to log", e); + U.error(log, "Error while writing to log", e); } return update; @@ -610,13 +732,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param updateSeq Update sequence. */ void tryEvictAsync(boolean updateSeq) { - assert cctx.kernalContext().state().active(); + assert ctx.kernalContext().state().active(); long state = this.state.get(); GridDhtPartitionState partState = getPartState(state); - if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 && + if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 && partState == RENTING && getReservations(state) == 0 && !groupReserved() && casState(state, EVICTED)) { if (log.isDebugEnabled()) @@ -626,7 +748,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements finishDestroy(updateSeq); } else if (partState == RENTING || shouldBeRenting()) - cctx.preloader().evictPartitionAsync(this); + grp.preloader().evictPartitionAsync(this); } /** @@ -702,18 +824,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert state() == EVICTED : this; assert evictGuard.get() == -1; - if (cctx.isDrEnabled()) - cctx.dr().partitionEvicted(id); - - cctx.continuousQueries().onPartitionEvicted(id); - - cctx.dataStructures().onPartitionEvicted(id); + grp.onPartitionEvicted(id); destroyCacheDataStore(); rent.onDone(); - ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); + ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeq); clearDeferredDeletes(); } @@ -753,7 +870,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements try { CacheDataStore store = dataStore(); - cctx.offheap().destroyCacheDataStore(id, store); + grp.offheap().destroyCacheDataStore(id, store); } catch (IgniteCheckedException e) { log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e); @@ -777,7 +894,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if local node is primary for this partition. */ public boolean primary(AffinityTopologyVersion topVer) { - return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer); + List nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0)); } /** @@ -785,14 +904,23 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if local node is backup for this partition. */ public boolean backup(AffinityTopologyVersion topVer) { - return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer); + List nodes = grp.affinity().cachedAffinity(topVer).get(id); + + return nodes.indexOf(ctx.localNode()) > 0; } /** + * @param cacheId ID of cache initiated counter update. + * @param topVer Topology version for current operation. * @return Next update index. */ - public long nextUpdateCounter() { - return store.nextUpdateCounter(); + long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, @Nullable Long primaryCntr) { + long nextCntr = store.nextUpdateCounter(); + + if (grp.sharedGroup()) + grp.onPartitionCounterUpdate(cacheId, id, primaryCntr != null ? primaryCntr : nextCntr, topVer, primary); + + return nextCntr; } /** @@ -829,86 +957,53 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @throws NodeStoppingException If node stopping. */ public void clearAll() throws NodeStoppingException { - GridCacheVersion clearVer = cctx.versions().next(); - - boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - - Iterator it = allEntries().iterator(); + GridCacheVersion clearVer = ctx.versions().next(); GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); - while (it.hasNext()) { - GridCacheMapEntry cached = null; - - cctx.shared().database().checkpointReadLock(); - - try { - cached = it.next(); - - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - removeEntry(cached); - - if (!cached.isInternal()) { - if (rec) { - cctx.events().addEvent(cached.partition(), - cached.key(), - cctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } - } - } - } - catch (GridDhtInvalidPartitionException e) { - assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - - break; // Partition is already concurrently cleared and evicted. - } - catch (NodeStoppingException e) { - if (log.isDebugEnabled()) - log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); - - rent.onDone(e); + boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); - throw e; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); - } - finally { - cctx.shared().database().checkpointReadUnlock(); - } + if (grp.sharedGroup()) { + for (CacheMapHolder hld : cacheMaps.values()) + clear(hld.map, extras, rec); } + else + clear(singleCacheEntryMap.map, extras, rec); + + if (!grp.allowFastEviction()) { + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; - if (!cctx.allowFastEviction()) { try { - GridIterator it0 = cctx.offheap().iterator(id); + GridIterator it0 = grp.offheap().partitionIterator(id); while (it0.hasNext()) { - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); try { CacheDataRow row = it0.next(); - GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(), + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) { + hld = cacheMaps.get(row.cacheId()); + + if (hld == null) + continue; + } + + assert hld != null; + + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), row.key(), true, false); if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { if (rec) { - cctx.events().addEvent(cached.partition(), + hld.cctx.events().addEvent(cached.partition(), cached.key(), - cctx.localNodeId(), + ctx.localNodeId(), (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_UNLOADED, @@ -929,7 +1024,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements break; // Partition is already concurrently cleared and evicted. } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } } @@ -948,11 +1043,70 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param map Map to clear. + * @param extras Obsolete extras. + * @param evt Unload event flag. + * @throws NodeStoppingException + */ + private void clear(ConcurrentMap map, + GridCacheObsoleteEntryExtras extras, + boolean evt) throws NodeStoppingException { + Iterator it = map.values().iterator(); + + while (it.hasNext()) { + GridCacheMapEntry cached = null; + + ctx.database().checkpointReadLock(); + + try { + cached = it.next(); + + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) { + removeEntry(cached); + + if (!cached.isInternal()) { + if (evt) { + grp.addCacheEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + false); + } + } + } + } + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; + + break; // Partition is already concurrently cleared and evicted. + } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to clear cache entry for evicted partition: " + cached.partition()); + + rent.onDone(e); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e); + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + } + + /** * */ private void clearDeferredDeletes() { for (RemovedEntryHolder e : rmvQueue) - cctx.dht().removeVersionedEntry(e.key(), e.version()); + removeVersionedEntry(e.cacheId(), e.key(), e.version()); } /** {@inheritDoc} */ @@ -977,6 +1131,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtLocalPartition.class, this, + "grp", grp.cacheOrGroupName(), "state", state(), "reservations", reservations(), "empty", isEmpty(), @@ -984,12 +1139,25 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public int publicSize() { + @Override public int publicSize(int cacheId) { + if (grp.sharedGroup()) { + CacheMapHolder hld = cacheMaps.get(cacheId); + + return hld != null ? hld.size.get() : 0; + } + return getSize(state.get()); } /** {@inheritDoc} */ - @Override public void incrementPublicSize(GridCacheEntryEx e) { + @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.incrementAndGet(); + } + while (true) { long state = this.state.get(); @@ -999,7 +1167,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override public void decrementPublicSize(GridCacheEntryEx e) { + @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { + if (grp.sharedGroup()) { + if (hld == null) + hld = cacheMapHolder(e.context()); + + hld.size.decrementAndGet(); + } + while (true) { long state = this.state.get(); @@ -1011,6 +1186,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @param cacheId Cache ID. + */ + void onCacheStopped(int cacheId) { + assert grp.sharedGroup() : grp.cacheOrGroupName(); + + for (Iterator it = rmvQueue.iterator(); it.hasNext();) { + RemovedEntryHolder e = it.next(); + + if (e.cacheId() == cacheId) + it.remove(); + } + + cacheMaps.remove(cacheId); + } + + /** * @param state Composite state. * @return Partition state. */ @@ -1065,6 +1256,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * Removed entry holder. */ private static class RemovedEntryHolder { + /** */ + private final int cacheId; + /** Cache key */ private final KeyCacheObject key; @@ -1075,11 +1269,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements private final long expireTime; /** + * @param cacheId Cache ID. * @param key Key. * @param ver Entry version. * @param ttl TTL. */ - private RemovedEntryHolder(KeyCacheObject key, GridCacheVersion ver, long ttl) { + private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) { + this.cacheId = cacheId; this.key = key; this.ver = ver; @@ -1087,6 +1283,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** * @return Key. */ KeyCacheObject key() { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java index ea6ca06..87abd6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java @@ -174,7 +174,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse { } if (preloadEntries != null) - marshalInfos(preloadEntries, cctx); + marshalInfos(preloadEntries, cctx.shared(), cctx.cacheObjectContext()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index ffc1d63..d365a8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -88,9 +88,9 @@ public interface GridDhtPartitionTopology { public boolean stopping(); /** - * @return Cache ID. + * @return Cache group ID. */ - public int cacheId(); + public int groupId(); /** * Pre-initializes this topology. @@ -134,13 +134,12 @@ public interface GridDhtPartitionTopology { public void releasePartitions(int... parts); /** - * @param key Cache key. - * @param create If {@code true}, then partition will be created if it's not there. + * @param part Partition number. * @return Local partition. * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. */ - @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create) + @Nullable public GridDhtLocalPartition localPartition(int part) throws GridDhtInvalidPartitionException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 7adce6e..248d44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -42,9 +42,9 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.ClusterState; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; @@ -72,7 +72,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -82,8 +82,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private static final Long ZERO = 0L; - /** Context. */ - private final GridCacheContext cctx; + /** */ + private final GridCacheSharedContext ctx; + + /** */ + private final CacheGroupContext grp; /** Logger. */ private final IgniteLogger log; @@ -118,9 +121,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Lock. */ private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); - /** */ - private final GridCacheMapEntryFactory entryFactory; - /** Partition update counter. */ private Map> cntrMap = new HashMap<>(); @@ -131,23 +131,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private volatile boolean treatAllPartAsLoc; /** - * @param cctx Context. - * @param entryFactory Entry factory. + * @param ctx Cache shared context. + * @param grp Cache group. */ - GridDhtPartitionTopologyImpl(GridCacheContext cctx, GridCacheMapEntryFactory entryFactory) { - assert cctx != null; + public GridDhtPartitionTopologyImpl(GridCacheSharedContext ctx, + CacheGroupContext grp) { + assert ctx != null; + assert grp != null; - this.cctx = cctx; - this.entryFactory = entryFactory; + this.ctx = ctx; + this.grp = grp; - log = cctx.logger(getClass()); + log = ctx.logger(getClass()); - locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions()); + locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); } /** {@inheritDoc} */ - @Override public int cacheId() { - return cctx.cacheId(); + @Override public int groupId() { + return grp.groupId(); } /** @@ -171,7 +173,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topVer = AffinityTopologyVersion.NONE; - discoCache = cctx.discovery().discoCache(); + discoCache = ctx.discovery().discoCache(); } finally { lock.writeLock().unlock(); @@ -235,13 +237,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (dumpCnt++ < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { U.warn(log, "Failed to wait for partition eviction [" + "topVer=" + topVer + - ", cache=" + cctx.name() + + ", group=" + grp.cacheOrGroupName() + ", part=" + part.id() + ", partState=" + part.state() + ", size=" + part.internalSize() + ", reservations=" + part.reservations() + ", grpReservations=" + part.groupReserved() + - ", node=" + cctx.localNodeId() + "]"); + ", node=" + ctx.localNodeId() + "]"); if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) U.dumpThreads(log); @@ -329,7 +331,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer = this.topVer; assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", cacheName=" + cctx.name() + ']'; + ", group=" + grp.cacheOrGroupName() + ']'; return topVer; } @@ -371,7 +373,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); @@ -379,23 +381,23 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology [topVer=" + topVer + - ", cache=" + cctx.name() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) : - "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() + - ", cache=" + cctx.name() + + assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; - List> aff = cctx.affinity().assignments(exchFut.topologyVersion()); + List> aff = grp.affinity().assignments(exchFut.topologyVersion()); - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); - if (cctx.rebalanceEnabled()) { - boolean added = exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()); + if (grp.rebalanceEnabled()) { + boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added; + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); if (first) { assert exchId.isJoined() || added; @@ -406,7 +408,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() + + assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'; if (log.isDebugEnabled()) @@ -465,7 +467,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void createPartitions(List> aff, long updateSeq) { - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); for (int p = 0; p < num; p++) { if (node2part != null && node2part.valid()) { @@ -487,26 +489,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady) throws IgniteCheckedException { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); ClusterState newState = exchFut.newClusterState(); treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE) - || (cctx.kernalContext().state().active() + || (ctx.kernalContext().state().active() && discoEvt.type() == EventType.EVT_NODE_JOINED && discoEvt.eventNode().isLocal() - && !cctx.kernalContext().clientNode() + && !ctx.kernalContext().clientNode() ); // Wait for rent outside of checkpoint lock. waitForRent(); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); - cctx.shared().database().checkpointReadLock(); + ctx.database().checkpointReadLock(); - synchronized (cctx.shared().exchange().interruptLock()) { + synchronized (ctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); @@ -514,7 +515,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { U.writeLock(lock); } catch (IgniteInterruptedCheckedException e) { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); throw e; } @@ -541,7 +542,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { cntrMap.clear(); // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheAddedOnExchange(cctx.cacheId(), cctx.receivedFrom()))) { + if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -568,7 +569,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (affReady) initPartitions0(exchFut, updateSeq); else { - List> aff = cctx.affinity().idealAssignment(); + List> aff = grp.affinity().idealAssignment(); createPartitions(aff, updateSeq); } @@ -582,7 +583,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { finally { lock.writeLock().unlock(); - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } @@ -590,19 +591,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { waitForRent(); } + /** + * @param p Partition number. + * @param topVer Topology version. + * @return {@code True} if given partition belongs to local node. + */ + private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { + return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); + } + /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { treatAllPartAsLoc = false; boolean changed = waitForRent(); - int num = cctx.affinity().partitions(); + int num = grp.affinity().partitions(); AffinityTopologyVersion topVer = exchFut.topologyVersion(); - assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " + + assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + "[topVer=" + topVer + - ", affVer=" + cctx.affinity().affinityTopologyVersion() + + ", affVer=" + grp.affinity().lastVersion() + ", fut=" + exchFut + ']'; lock.writeLock().lock(); @@ -623,7 +633,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (int p = 0; p < num; p++) { GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - if (cctx.affinity().partitionLocalNode(p, topVer)) { + if (partitionLocalNode(p, topVer)) { // This partition will be created during next topology event, // which obviously has not happened at this point. if (locPart == null) { @@ -636,26 +646,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state = locPart.state(); if (state == MOVING) { - if (cctx.rebalanceEnabled()) { + if (grp.rebalanceEnabled()) { Collection owners = owners(p); // If there are no other owners, then become an owner. if (F.isEmpty(owners)) { boolean owned = locPart.own(); - assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" + + assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" + locPart + ']'; updateSeq = updateLocal(p, locPart.state(), updateSeq); changed = true; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); } if (log.isDebugEnabled()) @@ -673,7 +685,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (locPart != null) { GridDhtPartitionState state = locPart.state(); - if (state == MOVING && cctx.kernalContext().state().active()) { + if (state == MOVING && ctx.kernalContext().state().active()) { locPart.rent(false); updateSeq = updateLocal(p, locPart.state(), updateSeq); @@ -687,7 +699,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - updateRebalanceVersion(cctx.affinity().assignments(topVer)); + updateRebalanceVersion(grp.affinity().assignments(topVer)); consistencyCheck(); } @@ -715,11 +727,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition loc = locParts.get(p); if (loc == null || loc.state() == EVICTED) { - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); - if (cctx.shared().pageStore() != null) { + if (ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -749,7 +761,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtPartitionState state = loc != null ? loc.state() : null; - if (loc != null && state != EVICTED && (state != RENTING || !cctx.allowFastEviction())) + if (loc != null && state != EVICTED && (state != RENTING || !grp.allowFastEviction())) return loc; if (!create) @@ -764,7 +776,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { state = loc != null ? loc.state() : null; - boolean belongs = cctx.affinity().partitionLocalNode(p, topVer); + boolean belongs = partitionLocalNode(p, topVer); if (loc != null && state == EVICTED) { locParts.set(p, loc = null); @@ -774,7 +786,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); } - else if (loc != null && state == RENTING && cctx.allowFastEviction()) + else if (loc != null && state == RENTING && grp.allowFastEviction()) throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted."); if (loc == null) { @@ -783,7 +795,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); - locParts.set(p, loc = new GridDhtLocalPartition(cctx, p, entryFactory)); + locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); if (updateSeq) this.updateSeq.incrementAndGet(); @@ -798,9 +810,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().unlock(); } - if (created && cctx.shared().pageStore() != null) { + if (created && ctx.pageStore() != null) { try { - cctx.shared().pageStore().onPartitionCreated(cctx.cacheId(), p); + ctx.pageStore().onPartitionCreated(grp.groupId(), p); } catch (IgniteCheckedException e) { // TODO ignite-db @@ -825,8 +837,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public GridDhtLocalPartition localPartition(Object key, boolean create) { - return localPartition(cctx.affinity().partition(key), AffinityTopologyVersion.NONE, create); + @Override public GridDhtLocalPartition localPartition(int part) { + return locParts.get(part); } /** {@inheritDoc} */ @@ -882,7 +894,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { map.put(i, part.state()); } - return new GridDhtPartitionMap(cctx.nodeId(), + return new GridDhtPartitionMap(ctx.localNodeId(), updateSeq.get(), topVer, Collections.unmodifiableMap(map), @@ -922,7 +934,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List nodes(int p, AffinityTopologyVersion topVer) { - AffinityAssignment affAssignment = cctx.affinity().assignment(topVer); + AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); List affNodes = affAssignment.get(p); @@ -945,8 +957,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + ", topVer2=" + this.topVer + - ", node=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + + ", node=" + ctx.igniteInstanceName() + + ", grp=" + grp.cacheOrGroupName() + ", node2part=" + node2part + ']'; List nodes = null; @@ -958,7 +970,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { HashSet affIds = affAssignment.getIds(p); if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) { - ClusterNode n = cctx.discovery().node(nodeId); + ClusterNode n = ctx.discovery().node(nodeId); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { if (nodes == null) { @@ -991,7 +1003,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId())) : null; + Collection allIds = topVer.topologyVersion() > 0 ? + F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())) : + null; lock.readLock().lock(); @@ -999,7 +1013,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + ", allIds=" + allIds + ", node2part=" + node2part + - ", cache=" + cctx.name() + ']'; + ", grp=" + grp.cacheOrGroupName() + ']'; Collection nodeIds = part2node.get(p); @@ -1016,7 +1030,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { continue; if (hasState(p, id, state, states)) { - ClusterNode n = cctx.discovery().node(id); + ClusterNode n = ctx.discovery().node(id); if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) nodes.add(n); @@ -1032,7 +1046,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List owners(int p, AffinityTopologyVersion topVer) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, topVer); return nodes(p, topVer, OWNING); @@ -1045,7 +1059,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public List moving(int p) { - if (!cctx.rebalanceEnabled()) + if (!grp.rebalanceEnabled()) return ownersAndMoving(p, AffinityTopologyVersion.NONE); return nodes(p, AffinityTopologyVersion.NONE, MOVING); @@ -1070,12 +1084,14 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.readLock().lock(); try { - assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + if (node2part == null || stopping) + return null; + + assert node2part.valid() : "Invalid node2part [node2part=" + node2part + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNode().id() + + ", locName=" + ctx.igniteInstanceName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -1157,7 +1173,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { // then we keep the newer value. if (newPart != null && (newPart.updateSequence() < part.updateSequence() || - (cctx.startTopologyVersion().compareTo(newPart.topologyVersion()) > 0)) + (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0)) ) { if (log.isDebugEnabled()) log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" + @@ -1171,7 +1187,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (Iterator it = partMap.keySet().iterator(); it.hasNext(); ) { UUID nodeId = it.next(); - if (!cctx.discovery().alive(nodeId)) { + if (!ctx.discovery().alive(nodeId)) { if (log.isDebugEnabled()) log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" + partMap + ']'); @@ -1183,7 +1199,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { node2part = partMap; - Map> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f); + Map> p2n = new HashMap<>(grp.affinity().partitions(), 1.0f); for (Map.Entry e : partMap.entrySet()) { for (Integer p : e.getValue().keySet()) { @@ -1202,11 +1218,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = false; - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); - GridDhtPartitionMap nodeMap = partMap.get(cctx.localNodeId()); + GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); - if (nodeMap != null && cctx.shared().database().persistenceEnabled()) { + if (nodeMap != null && ctx.database().persistenceEnabled()) { for (Map.Entry e : nodeMap.entrySet()) { int p = e.getKey(); GridDhtPartitionState state = e.getValue(); @@ -1233,7 +1249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List> aff = cctx.affinity().assignments(topVer); + List> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1246,7 +1262,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Partition map after full update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1299,7 +1315,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - if (!cctx.discovery().alive(parts.nodeId())) { + if (!ctx.discovery().alive(parts.nodeId())) { if (log.isDebugEnabled()) log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + ", parts=" + parts + ']'); @@ -1373,10 +1389,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List> aff = cctx.affinity().assignments(topVer); + List> aff = grp.affinity().assignments(topVer); changed |= checkEvictions(updateSeq, aff); @@ -1389,7 +1405,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { log.debug("Partition map after single update: " + fullMapString()); if (changed) - cctx.shared().exchange().scheduleResendPartitions(); + ctx.exchange().scheduleResendPartitions(); return changed ? localPartitionMap() : null; } @@ -1403,7 +1419,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); Collection lost = null; @@ -1437,7 +1453,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = false; if (lost != null) { - PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy(); + PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); assert plc != null; @@ -1469,13 +1485,17 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) - cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { + grp.addRebalanceEvent(part, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + discoEvt.eventNode(), + discoEvt.type(), + discoEvt.timestamp()); + } } if (plc != PartitionLossPolicy.IGNORE) - cctx.needsRecovery(true); + grp.needsRecovery(true); } return changed; @@ -1490,7 +1510,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lock.writeLock().lock(); try { - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); long updSeq = updateSeq.incrementAndGet(); for (int part = 0; part < parts; part++) { @@ -1529,9 +1549,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - checkEvictions(updSeq, cctx.affinity().assignments(topVer)); + checkEvictions(updSeq, grp.affinity().assignments(topVer)); - cctx.needsRecovery(false); + grp.needsRecovery(false); } finally { lock.writeLock().unlock(); @@ -1545,7 +1565,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { Collection res = null; - int parts = cctx.affinity().partitions(); + int parts = grp.affinity().partitions(); for (int part = 0; part < parts; part++) { Set nodeIds = part2node.get(part); @@ -1582,7 +1602,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { GridDhtLocalPartition locPart = locParts.get(p); if (locPart != null) { - if (locPart.state() == OWNING && !owners.contains(cctx.localNodeId())) + if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) locPart.moving(); } @@ -1607,12 +1627,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return {@code True} if state changed. */ private boolean checkEvictions(long updateSeq) { - AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion affVer = grp.affinity().lastVersion(); boolean changed = false; if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) { - List> aff = cctx.affinity().assignments(topVer); + List> aff = grp.affinity().assignments(topVer); changed = checkEvictions(updateSeq, aff); @@ -1644,12 +1664,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return Checks if any of the local partitions need to be evicted. */ private boolean checkEvictions(long updateSeq, List> aff) { - if (!cctx.kernalContext().state().active()) + if (!ctx.kernalContext().state().active()) return false; boolean changed = false; - UUID locId = cctx.nodeId(); + UUID locId = ctx.localNodeId(); for (int p = 0; p < locParts.length(); p++) { GridDhtLocalPartition part = locParts.get(p); @@ -1662,7 +1682,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (state.active()) { List affNodes = aff.get(p); - if (!affNodes.contains(cctx.localNode())) { + if (!affNodes.contains(ctx.localNode())) { List nodes = nodes(p, topVer, OWNING); Collection nodeIds = F.nodeIds(nodes); @@ -1725,10 +1745,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private long updateLocal(int p, GridDhtPartitionState state, long updateSeq) { ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); // If this node became the oldest node. - if (cctx.localNode().equals(oldest)) { + if (ctx.localNode().equals(oldest)) { long seq = node2part.updateSequence(); if (seq != updateSeq) { @@ -1755,7 +1775,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } if (node2part != null) { - UUID locNodeId = cctx.localNodeId(); + UUID locNodeId = ctx.localNodeId(); GridDhtPartitionMap map = node2part.get(locNodeId); @@ -1792,9 +1812,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ClusterNode oldest = discoCache.oldestAliveServerNode(); - assert oldest != null || cctx.kernalContext().clientNode(); + assert oldest != null || ctx.kernalContext().clientNode(); - ClusterNode loc = cctx.localNode(); + ClusterNode loc = ctx.localNode(); if (node2part != null) { if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) { @@ -1939,11 +1959,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", cache=" + cctx.name() + - ", started=" + cctx.started() + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + - ", locNodeId=" + cctx.localNode().id() + - ", locName=" + cctx.igniteInstanceName() + ']'; + ", locNodeId=" + ctx.localNodeId() + + ", locName=" + ctx.igniteInstanceName() + ']'; for (GridDhtPartitionMap map : node2part.values()) { if (map.hasMovingPartitions()) @@ -1957,10 +1976,25 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + /** + * @param cacheId Cache ID. + */ + public void onCacheStopped(int cacheId) { + if (!grp.sharedGroup()) + return; + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part != null) + part.onCacheStopped(cacheId); + } + } + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() + + ", grp=" + grp.cacheOrGroupName() + ']'); lock.readLock().lock(); @@ -1971,7 +2005,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; - int size = part.dataStore().size(); + int size = part.dataStore().fullSize(); if (size >= threshold) X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); @@ -1988,7 +2022,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return {@code True} if given partition belongs to local node. */ private boolean localNode(int part, List> aff) { - return aff.get(part).contains(cctx.localNode()); + return aff.get(part).contains(ctx.localNode()); } /** @@ -1999,7 +2033,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (node2part == null || !node2part.valid()) return; - for (int i = 0; i < cctx.affinity().partitions(); i++) { + for (int i = 0; i < grp.affinity().partitions(); i++) { List affNodes = aff.get(i); // Topology doesn't contain server nodes (just clients). @@ -2015,7 +2049,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = topVer; if (log.isDebugEnabled()) - log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); + log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index c91eb7a..d607ff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; @@ -118,51 +119,61 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach @Override public void start() throws IgniteCheckedException { super.start(); - preldr = new GridDhtPreloader(ctx); - - preldr.start(); - - ctx.io().addHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridNearGetRequest req) { processNearGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { processNearSingleGetRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridNearLockRequest req) { processNearLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridDhtLockRequest req) { processDhtLockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtLockResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridDhtLockResponse req) { processDhtLockResponse(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridNearUnlockRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridNearUnlockRequest req) { processNearUnlockRequest(nodeId, req); } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2() { + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtUnlockRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridDhtUnlockRequest req) { processDhtUnlockRequest(nodeId, req); } }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysRequest.class, + new MessageHandler() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { + processForceKeysRequest(node, msg); + } + }); + + ctx.io().addCacheHandler(ctx.cacheId(), GridDhtForceKeysResponse.class, + new MessageHandler() { + @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { + processForceKeyResponse(node, msg); + } + }); } /** {@inheritDoc} */ @@ -382,7 +393,7 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach } IgniteInternalFuture keyFut = F.isEmpty(req.keys()) ? null : - ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); + ctx.group().preloader().request(ctx, req.keys(), req.topologyVersion()); if (keyFut == null || keyFut.isDone()) { if (keyFut != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index d777a22..6d717eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -173,19 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { } switch (writer.state()) { - case 7: + case 6: if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("retVal", retVal)) return false; @@ -207,7 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { return false; switch (reader.state()) { - case 7: + case 6: checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes"); if (!reader.isLastRead()) @@ -215,7 +215,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); - case 8: + case 7: miniId = reader.readInt("miniId"); if (!reader.isLastRead()) @@ -223,7 +223,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); - case 9: + case 8: retVal = reader.readMessage("retVal"); if (!reader.isLastRead()) @@ -243,7 +243,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java index c483408..67eacd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java @@ -47,6 +47,16 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { // No-op. } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * * @param vers Near Tx xid Versions. @@ -87,7 +97,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG)) return false; @@ -109,7 +119,7 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: vers = reader.readCollection("vers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -129,6 +139,6 @@ public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 3; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 8a674fb..75f8366 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1086,7 +1086,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture keys = entry.getValue(); - lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion()); + GridCacheContext ctx = cctx.cacheContext(cacheId); + + lastForceFut = ctx.group().preloader().request(ctx, keys, tx.topologyVersion()); if (compFut != null && lastForceFut != null) compFut.add(lastForceFut); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 362432c..5543cec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -458,7 +458,7 @@ public class GridPartitionedGetFuture extends CacheDistributedGetFutureAda GridCacheVersion ver = null; if (readNoEntry) { - CacheDataRow row = cctx.offheap().read(key); + CacheDataRow row = cctx.offheap().read(cctx, key); if (row != null) { long expireTime = row.expireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 63ed9a8..3f612f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -359,7 +359,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter