Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 871DF200CB4 for ; Mon, 12 Jun 2017 17:33:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 85FDB160BFA; Mon, 12 Jun 2017 15:33:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 07AD7160BEC for ; Mon, 12 Jun 2017 17:33:16 +0200 (CEST) Received: (qmail 2719 invoked by uid 500); 12 Jun 2017 15:33:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 2616 invoked by uid 99); 12 Jun 2017 15:33:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jun 2017 15:33:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42111E8EFD; Mon, 12 Jun 2017 15:33:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Mon, 12 Jun 2017 15:33:19 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/62] [abbrv] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches archived-at: Mon, 12 Jun 2017 15:33:19 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 06a3416..b112e1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -104,7 +104,7 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage assert cctx.config().getCacheMode() != LOCAL; - cctx.io().addHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2() { + cctx.io().addCacheHandler(cctx.cacheId(), GridCacheQueryRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheQueryRequest req) { processQueryRequest(nodeId, req); } @@ -560,11 +560,11 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addOrderedCacheHandler(topic, resHnd); fut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removeOrderedHandler(false, topic); } }); @@ -744,11 +744,11 @@ public class GridCacheDistributedQueryManager extends GridCacheQueryManage final Object topic = topic(cctx.nodeId(), req.id()); - cctx.io().addOrderedHandler(topic, resHnd); + cctx.io().addOrderedCacheHandler(topic, resHnd); fut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture fut) { - cctx.io().removeOrderedHandler(topic); + cctx.io().removeOrderedHandler(false, topic); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 07545a5..0c264db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -863,12 +863,12 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte locPart = locPart0; - it = cctx.offheap().iterator(part); + it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part); } else { locPart = null; - it = cctx.offheap().iterator(true, backups, topVer); + it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer); } return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 00ddff8..9dc7817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy /** * Query request. */ -public class GridCacheQueryRequest extends GridCacheMessage implements GridCacheDeployable { +public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index 4d8e658..521aacf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; @@ -46,7 +46,7 @@ import org.jetbrains.annotations.Nullable; /** * Query request. */ -public class GridCacheQueryResponse extends GridCacheMessage implements GridCacheDeployable { +public class GridCacheQueryResponse extends GridCacheIdMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java index 76147ee..ef0157e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.UUID; import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -31,7 +31,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * Batch acknowledgement. */ -public class CacheContinuousQueryBatchAck extends GridCacheMessage { +public class CacheContinuousQueryBatchAck extends GridCacheIdMessage { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java index 336f650..7a7c045 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java @@ -155,9 +155,12 @@ public class CacheContinuousQueryEventBuffer { batch = initBatch(entry.topologyVersion()); if (batch == null || cntr < batch.startCntr) { - if (backup) + if (backup) { backupQ.add(entry); + return null; + } + return entry; } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e5347c8..2b696a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -487,6 +487,70 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler onEntryUpdated(evt, primary, false, null); } + @Override public CounterSkipContext skipUpdateCounter(final GridCacheContext cctx, + @Nullable CounterSkipContext skipCtx, + int part, + long cntr, + AffinityTopologyVersion topVer, + boolean primary) { + if (skipCtx == null) + skipCtx = new CounterSkipContext(part, cntr, topVer); + + if (loc) { + assert !locCache; + + final Collection> evts = handleEvent(ctx, skipCtx.entry()); + + if (!evts.isEmpty()) { + if (asyncCb) { + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + locLsnr.onUpdated(evts); + } + }, part); + } + else + skipCtx.addProcessClosure(new Runnable() { + @Override public void run() { + locLsnr.onUpdated(evts); + } + }); + } + + return skipCtx; + } + + CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part); + + final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary); + + if (entryOrList != null) { + skipCtx.addProcessClosure(new Runnable() { + @Override public void run() { + try { + ctx.continuous().addNotification(nodeId, + routineId, + entryOrList, + topic, + false, + true); + } + catch (ClusterTopologyCheckedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), + "Failed to send event notification to node: " + nodeId, ex); + } + } + }); + } + + return skipCtx; + } + @Override public void onPartitionEvicted(int part) { entryBufs.remove(part); } @@ -1011,7 +1075,7 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler t.get1()); for (AffinityTopologyVersion topVer : t.get2()) { - for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)) { + for (ClusterNode node : ctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer)) { if (!node.isLocal()) { try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 84b22f9..7da657f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.Map; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.jetbrains.annotations.Nullable; @@ -76,6 +77,25 @@ public interface CacheContinuousQueryListener { public void skipUpdateEvent(CacheContinuousQueryEvent evt, AffinityTopologyVersion topVer, boolean primary); /** + * For cache updates in shared cache group need notify others caches CQ listeners + * that generated counter should be skipped. + * + * @param cctx Cache context. + * @param skipCtx Context. + * @param part Partition. + * @param cntr Counter to skip. + * @param topVer Topology version. + * @return Context. + */ + @Nullable public CounterSkipContext skipUpdateCounter( + GridCacheContext cctx, + @Nullable CounterSkipContext skipCtx, + int part, + long cntr, + AffinityTopologyVersion topVer, + boolean primary); + + /** * @param part Partition. */ public void onPartitionEvicted(int part); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 1a655e9..f264056 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -127,7 +127,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); if (cctx.affinityNode()) { - cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, + cctx.io().addCacheHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, new CI2() { @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); @@ -175,7 +175,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param primary Primary. * @param topVer Topology version. */ - public void skipUpdateEvent(Map lsnrs, + private void skipUpdateEvent(Map lsnrs, KeyCacheObject key, int partId, long updCntr, @@ -204,6 +204,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param skipCtx Context. + * @param part Partition number. + * @param cntr Update counter. + * @param topVer Topology version. + * @return Context. + */ + @Nullable public CounterSkipContext skipUpdateCounter(@Nullable CounterSkipContext skipCtx, + int part, + long cntr, + AffinityTopologyVersion topVer, + boolean primary) { + for (CacheContinuousQueryListener lsnr : lsnrs.values()) + skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, topVer, primary); + + return skipCtx; + } + + /** * @param internal Internal entry flag (internal key or not user cache). * @param preload Whether update happened during preloading. * @return Registered listeners. @@ -633,7 +651,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { hnd.localCache(cctx.isLocal()); IgnitePredicate pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ? - F.nodeForNodeId(cctx.localNodeId()) : cctx.config().getNodeFilter(); + F.nodeForNodeId(cctx.localNodeId()) : cctx.group().nodeFilter(); assert pred != null : cctx.config(); @@ -658,7 +676,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } if (notifyExisting) { - final Iterator it = cctx.offheap().iterator(true, true, AffinityTopologyVersion.NONE); + final Iterator it = cctx.offheap().cacheIterator(cctx.cacheId(), + true, + true, + AffinityTopologyVersion.NONE); locLsnr.onUpdated(new Iterable() { @Override public Iterator iterator() { @@ -807,16 +828,24 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { intLsnrCnt.incrementAndGet(); } else { - added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; + synchronized (this) { + if (lsnrCnt.get() == 0) { + if (cctx.group().sharedGroup() && !cctx.isLocal()) + cctx.group().addCacheWithContinuousQuery(cctx); + } - if (added) { - lsnrCnt.incrementAndGet(); + added = lsnrs.putIfAbsent(lsnrId, lsnr) == null; - lsnr.onExecution(); + if (added) + lsnrCnt.incrementAndGet(); } + + if (added) + lsnr.onExecution(); } - return added ? GridContinuousHandler.RegisterStatus.REGISTERED : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; + return added ? GridContinuousHandler.RegisterStatus.REGISTERED : + GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** @@ -834,11 +863,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } } else { - if ((lsnr = lsnrs.remove(id)) != null) { - lsnrCnt.decrementAndGet(); + synchronized (this) { + if ((lsnr = lsnrs.remove(id)) != null) { + int cnt = lsnrCnt.decrementAndGet(); - lsnr.onUnregister(); + if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal()) + cctx.group().removeCacheWithContinuousQuery(cctx); + } } + + if (lsnr != null) + lsnr.onUnregister(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java new file mode 100644 index 0000000..23702bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CounterSkipContext { + /** */ + private final CacheContinuousQueryEntry entry; + + /** */ + private List procC; + + /** + * @param part Partition. + * @param cntr Filtered counter. + * @param topVer Topology version. + */ + CounterSkipContext(int part, long cntr, AffinityTopologyVersion topVer) { + entry = new CacheContinuousQueryEntry(0, + null, + null, + null, + null, + false, + part, + cntr, + topVer, + (byte)0); + + entry.markFiltered(); + } + + /** + * @return Entry for filtered counter. + */ + CacheContinuousQueryEntry entry() { + return entry; + } + + /** + * @return Entries + */ + @Nullable public List processClosures() { + return procC; + } + + /** + * @param c Closure send + */ + void addProcessClosure(Runnable c) { + if (procC == null) + procC = new ArrayList<>(); + + procC.add(c); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 5cba0cf..96af425 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1480,7 +1480,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (modified) cacheVal = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val)); - GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; + GridCacheOperation op = modified ? (cacheVal == null ? DELETE : UPDATE) : NOOP; if (op == NOOP) { ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 163ed99..30aa335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -565,7 +565,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ public void cached(GridCacheEntryEx entry) { assert entry == null || entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this + - ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']'; + ", entry=" + entry + + ", ctxNear=" + ctx.isNear() + + ", ctxDht=" + ctx.isDht() + ']'; this.entry = entry; } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index a591517..ba3b2b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -136,68 +136,68 @@ public class IgniteTxHandler { txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); - ctx.io().addHandler(0, GridNearTxPrepareRequest.class, new CI2() { + ctx.io().addCacheHandler(0, GridNearTxPrepareRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); } }); - ctx.io().addHandler(0, GridNearTxPrepareResponse.class, new CI2() { + ctx.io().addCacheHandler(0, GridNearTxPrepareResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg); } }); - ctx.io().addHandler(0, GridNearTxFinishRequest.class, new CI2() { + ctx.io().addCacheHandler(0, GridNearTxFinishRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg); } }); - ctx.io().addHandler(0, GridNearTxFinishResponse.class, new CI2() { + ctx.io().addCacheHandler(0, GridNearTxFinishResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg); } }); - ctx.io().addHandler(0, GridDhtTxPrepareRequest.class, new CI2() { + ctx.io().addCacheHandler(0, GridDhtTxPrepareRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxPrepareResponse.class, new CI2() { + ctx.io().addCacheHandler(0, GridDhtTxPrepareResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg); } }); - ctx.io().addHandler(0, GridDhtTxFinishRequest.class, new CI2() { + ctx.io().addCacheHandler(0, GridDhtTxFinishRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2() { + ctx.io().addCacheHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); } }); - ctx.io().addHandler(0, GridDhtTxFinishResponse.class, new CI2() { + ctx.io().addCacheHandler(0, GridDhtTxFinishResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheMessage msg) { processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); } }); - ctx.io().addHandler(0, GridCacheTxRecoveryRequest.class, + ctx.io().addCacheHandler(0, GridCacheTxRecoveryRequest.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) { processCheckPreparedTxRequest(nodeId, req); } }); - ctx.io().addHandler(0, GridCacheTxRecoveryResponse.class, + ctx.io().addCacheHandler(0, GridCacheTxRecoveryResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) { processCheckPreparedTxResponse(nodeId, res); @@ -506,8 +506,8 @@ public class IgniteTxHandler { for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) { GridCacheContext ctx = e.context(); - Collection cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer); - Collection cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer); + Collection cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer); + Collection cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 5a708d7..52a0f56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -660,6 +660,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig txEntry.updateCounter()))); if (op == CREATE || op == UPDATE) { + assert val != null : txEntry; + GridCacheUpdateTxResult updRes = cached.innerSet( this, eventNodeId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java index d1d6afd..94fe005 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java @@ -70,6 +70,16 @@ public class TxLocksRequest extends GridCacheMessage { this.txKeys = txKeys; } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * @return Future ID. */ @@ -139,13 +149,13 @@ public class TxLocksRequest extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG)) return false; @@ -167,7 +177,7 @@ public class TxLocksRequest extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: futId = reader.readLong("futId"); if (!reader.isLastRead()) @@ -175,7 +185,7 @@ public class TxLocksRequest extends GridCacheMessage { reader.incrementState(); - case 4: + case 3: txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); if (!reader.isLastRead()) @@ -195,7 +205,7 @@ public class TxLocksRequest extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 4; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java index 7856eaa..a5c8f09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java @@ -73,6 +73,16 @@ public class TxLocksResponse extends GridCacheMessage { // No-op. } + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + /** * @return Future ID. */ @@ -229,25 +239,25 @@ public class TxLocksResponse extends GridCacheMessage { } switch (writer.state()) { - case 3: + case 2: if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); - case 4: + case 3: if (!writer.writeObjectArray("locksArr", locksArr, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 5: + case 4: if (!writer.writeObjectArray("nearTxKeysArr", nearTxKeysArr, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 6: + case 5: if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG)) return false; @@ -269,7 +279,7 @@ public class TxLocksResponse extends GridCacheMessage { return false; switch (reader.state()) { - case 3: + case 2: futId = reader.readLong("futId"); if (!reader.isLastRead()) @@ -277,7 +287,7 @@ public class TxLocksResponse extends GridCacheMessage { reader.incrementState(); - case 4: + case 3: locksArr = reader.readObjectArray("locksArr", MessageCollectionItemType.MSG, TxLockList.class); if (!reader.isLastRead()) @@ -285,7 +295,7 @@ public class TxLocksResponse extends GridCacheMessage { reader.incrementState(); - case 5: + case 4: nearTxKeysArr = reader.readObjectArray("nearTxKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); if (!reader.isLastRead()) @@ -293,7 +303,7 @@ public class TxLocksResponse extends GridCacheMessage { reader.incrementState(); - case 6: + case 5: txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); if (!reader.isLastRead()) @@ -313,7 +323,7 @@ public class TxLocksResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 6; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index b25b229..56f1183 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -134,7 +134,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { cacheProc = ctx.cache(); sharedCtx = cacheProc.context(); - sharedCtx.io().addHandler(0, + sharedCtx.io().addCacheHandler(0, GridChangeGlobalStateMessageResponse.class, new CI2() { @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { @@ -194,7 +194,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { @Override public void stop(boolean cancel) throws IgniteCheckedException { super.stop(cancel); - sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class); + sharedCtx.io().removeHandler(false, 0, GridChangeGlobalStateMessageResponse.class); ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); IgniteCheckedException stopErr = new IgniteInterruptedCheckedException( @@ -377,7 +377,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { actx.setFail(); - // revert change if activation request fail + // Revert change if activation request fail. if (actx.activate) { try { cacheProc.onKernalStopCaches(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 175bcea..edf8dc1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1561,7 +1561,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { fut.onDone(e); } catch (Throwable e) { - log.error("Failed to rebuild indexes for type: " + typeName, e); + U.error(log, "Failed to rebuild indexes for type [cache=" + cacheName + + ", type=" + typeName + ']', e); fut.onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java index 6ac2390..fd498ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java @@ -17,6 +17,19 @@ package org.apache.ignite.internal.processors.query; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.QueryEntity; @@ -31,32 +44,18 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; import org.apache.ignite.internal.processors.query.property.QueryClassProperty; import org.apache.ignite.internal.processors.query.property.QueryFieldAccessor; import org.apache.ignite.internal.processors.query.property.QueryMethodsAccessor; import org.apache.ignite.internal.processors.query.property.QueryPropertyAccessor; import org.apache.ignite.internal.processors.query.property.QueryReadOnlyMethodsAccessor; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import java.lang.reflect.Method; -import java.math.BigDecimal; -import java.sql.Time; -import java.sql.Timestamp; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.getInteger; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java index 69188c5..af65de0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCachePartitionsTask.java @@ -124,7 +124,7 @@ public class VisorCachePartitionsTask extends VisorMultiNodeTask rmvQueue = GridTestUtils.getFieldValue(p, "rmvQueue"); - if (!rmvQueue.isEmpty() || p.dataStore().size() != 0) + if (!rmvQueue.isEmpty() || p.dataStore().fullSize() != 0) return false; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java index c060eb3..7263656 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java @@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac cache = grid(g).cache(DEFAULT_CACHE_NAME); for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) { - int size = p.dataStore().size(); + int size = p.dataStore().fullSize(); assertTrue("Unexpected size: " + size, size <= 32); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java index d8a2065..f3a2204 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java @@ -283,30 +283,33 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT Map dupPartsData, GridDhtPartitionsFullMessage msg) { - Integer cacheId; - Integer dupCacheId; + int cache1Grp = groupIdForCache(ignite(0), cache1); + int cache2Grp = groupIdForCache(ignite(0), cache2); - if (dupPartsData.containsKey(CU.cacheId(cache1))) { - cacheId = CU.cacheId(cache1); - dupCacheId = CU.cacheId(cache2); + Integer grpId; + Integer dupGrpId; + + if (dupPartsData.containsKey(cache1Grp)) { + grpId = cache1Grp; + dupGrpId = cache2Grp; } else { - cacheId = CU.cacheId(cache2); - dupCacheId = CU.cacheId(cache1); + grpId = cache2Grp; + dupGrpId = cache1Grp; } - assertTrue(dupPartsData.containsKey(cacheId)); - assertEquals(dupCacheId, dupPartsData.get(cacheId)); - assertFalse(dupPartsData.containsKey(dupCacheId)); + assertTrue(dupPartsData.containsKey(grpId)); + assertEquals(dupGrpId, dupPartsData.get(grpId)); + assertFalse(dupPartsData.containsKey(dupGrpId)); Map parts = msg.partitions(); - GridDhtPartitionFullMap emptyFullMap = parts.get(cacheId); + GridDhtPartitionFullMap emptyFullMap = parts.get(grpId); for (GridDhtPartitionMap map : emptyFullMap.values()) assertEquals(0, map.map().size()); - GridDhtPartitionFullMap fullMap = parts.get(dupCacheId); + GridDhtPartitionFullMap fullMap = parts.get(dupGrpId); for (GridDhtPartitionMap map : fullMap.values()) assertFalse(map.map().isEmpty()); @@ -323,29 +326,32 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT Map dupPartsData, GridDhtPartitionsSingleMessage msg) { - Integer cacheId; - Integer dupCacheId; + int cache1Grp = groupIdForCache(ignite(0), cache1); + int cache2Grp = groupIdForCache(ignite(0), cache2); + + Integer grpId; + Integer dupGrpId; - if (dupPartsData.containsKey(CU.cacheId(cache1))) { - cacheId = CU.cacheId(cache1); - dupCacheId = CU.cacheId(cache2); + if (dupPartsData.containsKey(cache1Grp)) { + grpId = cache1Grp; + dupGrpId = cache2Grp; } else { - cacheId = CU.cacheId(cache2); - dupCacheId = CU.cacheId(cache1); + grpId = cache2Grp; + dupGrpId = cache1Grp; } - assertTrue(dupPartsData.containsKey(cacheId)); - assertEquals(dupCacheId, dupPartsData.get(cacheId)); - assertFalse(dupPartsData.containsKey(dupCacheId)); + assertTrue(dupPartsData.containsKey(grpId)); + assertEquals(dupGrpId, dupPartsData.get(grpId)); + assertFalse(dupPartsData.containsKey(dupGrpId)); Map parts = msg.partitions(); - GridDhtPartitionMap emptyMap = parts.get(cacheId); + GridDhtPartitionMap emptyMap = parts.get(grpId); assertEquals(0, emptyMap.map().size()); - GridDhtPartitionMap map = parts.get(dupCacheId); + GridDhtPartitionMap map = parts.get(dupGrpId); assertFalse(map.map().isEmpty()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java index 1886c76..bdc5507 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java @@ -22,8 +22,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.local.GridLocalCacheEntry; @@ -97,9 +96,9 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest { checkCacheMapEntry(TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class); - checkCacheMapEntry(ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class); + checkCacheMapEntry(ATOMIC, REPLICATED, GridDhtCacheEntry.class); - checkCacheMapEntry(TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class); + checkCacheMapEntry(TRANSACTIONAL, REPLICATED, GridDhtCacheEntry.class); } /** @@ -135,7 +134,7 @@ public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest { assertNotNull(entry); - assertEquals(entry.getClass(), entryCls); + assertEquals(entryCls, entry.getClass()); } finally { jcache.destroy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java index eefdf9d..b0d9f0e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java @@ -167,10 +167,16 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe } } + /** + * @return Cache context. + */ protected GridCacheContext cacheContext() { return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context(); } + /** + * @return IO manager. + */ protected GridCacheIoManager cacheIoManager() { return grid(0).context().cache().context().io(); } @@ -182,10 +188,22 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe /** */ public static final short DIRECT_TYPE = 302; + /** {@inheritDoc} */ + @Override public int handlerId() { + return 0; + } + + /** {@inheritDoc} */ + @Override public boolean cacheGroupMessage() { + return false; + } + + /** {@inheritDoc} */ @Override public short directType() { return DIRECT_TYPE; } + /** {@inheritDoc} */ @Override public byte fieldsCount() { return 3; } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java index a13ad64..cff9745 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -128,7 +129,7 @@ public class GridCacheLeakTest extends GridCommonAbstractTest { GridCacheConcurrentMap map = ((IgniteKernal)grid(g)).internalCache(CACHE_NAME).map(); info("Map size for cache [g=" + g + ", size=" + map.internalSize() + - ", pubSize=" + map.publicSize() + ']'); + ", pubSize=" + map.publicSize(CU.cacheId(CACHE_NAME)) + ']'); assertTrue("Wrong map size: " + map.internalSize(), map.internalSize() <= 8192); } http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java index 7562fe5..bc4f2cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java @@ -68,8 +68,10 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { /** Caches rebalance finish times. */ private ConcurrentHashMap8> times; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTestsStarted(); + times = new ConcurrentHashMap8<>(); for (int i = 0; i < GRID_CNT; i++) @@ -93,8 +95,8 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { Map, int[]> listeners = new HashMap<>(); listeners.put(new IgnitePredicate() { - @Override public boolean apply(CacheRebalancingEvent event) { - times.get(gridIdx(event)).putIfAbsent(event.cacheName(), event.timestamp()); + @Override public boolean apply(CacheRebalancingEvent evt) { + times.get(gridIdx(evt)).putIfAbsent(evt.cacheName(), evt.timestamp()); return true; } }, new int[]{EventType.EVT_CACHE_REBALANCE_STOPPED}); @@ -194,7 +196,11 @@ public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { } } - private int gridIdx(Event event) { - return getTestIgniteInstanceIndex((String)event.node().attributes().get(GRID_NAME_ATTR)); + /** + * @param evt Event. + * @return Index event node. + */ + private int gridIdx(Event evt) { + return getTestIgniteInstanceIndex((String)evt.node().attributes().get(GRID_NAME_ATTR)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java index 18c0b32..52f19b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java @@ -112,7 +112,8 @@ public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest { assertNull(g.cache(DEFAULT_CACHE_NAME).get(key)); if (!g.internalCache(DEFAULT_CACHE_NAME).context().deferredDelete()) - assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key))); + assertNull(g.internalCache(DEFAULT_CACHE_NAME).map().getEntry(g.internalCache(DEFAULT_CACHE_NAME).context(), + g.internalCache(DEFAULT_CACHE_NAME).context().toCacheKeyObject(key))); } }); }