Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 06EE6184CF for ; Fri, 23 Oct 2015 11:52:04 +0000 (UTC) Received: (qmail 22087 invoked by uid 500); 23 Oct 2015 11:52:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 22015 invoked by uid 500); 23 Oct 2015 11:52:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 21656 invoked by uid 99); 23 Oct 2015 11:52:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Oct 2015 11:52:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AFEBE3908; Fri, 23 Oct 2015 11:52:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Fri, 23 Oct 2015 11:52:17 -0000 Message-Id: <037b29cf096f4edf8ecbddb3db806e27@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/19] ignite git commit: IGNITE-426 Cleanup code. IGNITE-426 Cleanup code. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd8d930f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd8d930f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd8d930f Branch: refs/heads/ignite-426-2-reb Commit: bd8d930fedf07fbff2e6e835e3b65995e32472aa Parents: 33910f5 Author: nikolay_tikhonov Authored: Thu Oct 22 13:30:26 2015 +0300 Committer: nikolay_tikhonov Committed: Fri Oct 23 14:50:11 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 10 ++--- .../cache/GridCacheUpdateTxResult.java | 8 +--- .../dht/GridClientPartitionTopology.java | 2 - .../distributed/dht/GridDhtLocalPartition.java | 40 +++++++++----------- .../dht/GridDhtPartitionTopologyImpl.java | 6 +-- .../dht/atomic/GridDhtAtomicCache.java | 12 +++++- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +-- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 19 +++++++++- .../distributed/near/GridNearAtomicCache.java | 2 +- .../continuous/CacheContinuousQueryEntry.java | 7 ++++ .../continuous/CacheContinuousQueryHandler.java | 1 - .../CacheContinuousQueryListener.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 3 +- 13 files changed, 65 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c550e7c..061d20a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1707,7 +1707,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject oldVal; CacheObject updated; - GridCacheVersion rmvVer = null; + GridCacheVersion enqueueVer = null; GridCacheVersionConflictContext conflictCtx = null; @@ -2264,7 +2264,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - rmvVer = newVer; + enqueueVer = newVer; boolean hasValPtr = hasOffHeapPointer(); @@ -2343,7 +2343,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme invokeRes, newSysTtl, newSysExpireTime, - rmvVer, + enqueueVer, conflictCtx, true, updateIdx0); @@ -4045,9 +4045,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme */ protected void deletedUnlocked(boolean deleted) { assert Thread.holdsLock(this); - - if (!cctx.deferredDelete()) - return; + assert cctx.deferredDelete(); if (deleted) { assert !deletedUnlocked() : this; http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index 0f63777..bea1000 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -59,14 +59,8 @@ public class GridCacheUpdateTxResult { } /** - * Sets partition idx. - * - * @param partIdx Partition idx. + * @return Partition idx. */ - public void partIdx(long partIdx) { - this.partIdx = partIdx; - } - public long partIdx() { return partIdx; } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 516b7bd..217073a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -882,8 +882,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { - assert false; - return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index baa8520..410b2f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -17,18 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicStampedReference; -import java.util.concurrent.locks.ReentrantLock; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; @@ -58,6 +46,19 @@ import org.jetbrains.annotations.NotNull; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; +import javax.cache.CacheException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicStampedReference; +import java.util.concurrent.locks.ReentrantLock; + import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; @@ -111,7 +112,7 @@ public class GridDhtLocalPartition implements Comparable, private final LongAdder8 mapPubSize = new LongAdder8(); /** Remove queue. */ - private GridCircularBuffer> rmvQueue; + private final GridCircularBuffer> rmvQueue; /** Group reservations. */ private final CopyOnWriteArrayList reservations = new CopyOnWriteArrayList<>(); @@ -144,8 +145,7 @@ public class GridDhtLocalPartition implements Comparable, int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20); - if (cctx.deferredDelete()) - rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); + rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize)); } /** @@ -299,8 +299,6 @@ public class GridDhtLocalPartition implements Comparable, * @throws IgniteCheckedException If failed. */ public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException { - assert cctx.deferredDelete(); - try { T2 evicted = rmvQueue.add(new T2<>(key, ver)); @@ -502,8 +500,7 @@ public class GridDhtLocalPartition implements Comparable, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - if (cctx.deferredDelete()) - clearDeferredDeletes(); + clearDeferredDeletes(); return new GridFinishedFuture<>(true); } @@ -556,8 +553,7 @@ public class GridDhtLocalPartition implements Comparable, ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq); - if (cctx.deferredDelete()) - clearDeferredDeletes(); + clearDeferredDeletes(); return true; } @@ -792,8 +788,6 @@ public class GridDhtLocalPartition implements Comparable, * */ private void clearDeferredDeletes() { - assert cctx.deferredDelete(); - rmvQueue.forEach(new CI1>() { @Override public void apply(T2 t) { cctx.dht().removeVersionedEntry(t.get1(), t.get2()); http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 4616b17..1195ddd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -229,7 +229,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topReadyFut = exchFut; - rebalancedTopVer = AffinityTopologyVersion.NONE;; + rebalancedTopVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -1339,13 +1339,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { - X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); + X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); for (GridDhtLocalPartition part : locParts.values()) { int size = part.size(); if (size >= threshold) - X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); + X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a487593..a9ee79a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1815,10 +1815,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { else if (conflictCtx.isMerge()) newConflictVer = null; // Conflict version is discarded in case of merge. + EntryProcessor entryProcessor = null; + if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), - op == TRANSFORM ? req.entryProcessor(i) : null, + entryProcessor, updRes.newTtl(), updRes.conflictExpireTime(), newConflictVer, @@ -1831,6 +1833,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), + entryProcessor, updRes.newTtl(), updRes.conflictExpireTime()); } @@ -2101,10 +2104,13 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } if (dhtFut != null) { + EntryProcessor entryProcessor = + entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); + if (!batchRes.readersOnly()) dhtFut.addWriteEntry(entry, writeVal, - entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()), + entryProcessor, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE, null, @@ -2116,6 +2122,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, + entryProcessor, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } @@ -2519,6 +2526,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { CacheObject val = req.value(i); CacheObject prevVal = req.previousValue(i); + EntryProcessor entryProcessor = req.entryProcessor(i); Long updateIdx = req.updateIdx(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4019579..f48b0e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -133,9 +133,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); waitForExchange = !topLocked; - - // We can send entry processor instead of value to backup if updates are ordered. - forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM; } /** {@inheritDoc} */ @@ -265,12 +262,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter * @param readers Entry readers. * @param entry Entry. * @param val Value. + * @param entryProcessor Entry processor.. * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable readers, GridDhtCacheEntry entry, @Nullable CacheObject val, + EntryProcessor entryProcessor, long ttl, long expireTime) { CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); @@ -311,6 +310,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter updateReq.addNearWriteValue(entry.key(), val, + entryProcessor, ttl, expireTime); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 380b194..00513a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -295,21 +295,36 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param key Key to add. * @param val Value, {@code null} if should be removed. + * @param entryProcessor Entry processor. * @param ttl TTL. * @param expireTime Expire time. */ public void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, + EntryProcessor entryProcessor, long ttl, long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); - nearVals = new ArrayList<>(); + + if (forceTransformBackups) { + nearEntryProcessors = new ArrayList<>(); + nearEntryProcessorsBytes = new ArrayList<>(); + } + else + nearVals = new ArrayList<>(); } nearKeys.add(key); - nearVals.add(val); + + if (forceTransformBackups) { + assert entryProcessor != null; + + nearEntryProcessors.add(entryProcessor); + } + else + nearVals.add(val); if (ttl >= 0) { if (nearTtls == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index eaeb5f7..5373b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -361,7 +361,7 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { expireTime, null, false, - /*intercept*/false, + intercept, req.subjectId(), taskName, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index eefbbae..d96c824 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -143,6 +143,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** * @return Event type. */ EventType eventType() { http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 520dd46..5298a2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 2f9e111..4937ee7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -38,9 +38,7 @@ interface CacheContinuousQueryListener { * @param primary Primary flag. * @param recordIgniteEvt Whether to record event. */ - public void onEntryUpdated(CacheContinuousQueryEvent evt, - boolean primary, - boolean recordIgniteEvt); + public void onEntryUpdated(CacheContinuousQueryEvent evt, boolean primary, boolean recordIgniteEvt); /** * Listener unregistered callback. http://git-wip-us.apache.org/repos/asf/ignite/blob/bd8d930f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 680a96c..b8922a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -183,7 +183,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { { assert e != null; assert key != null; - assert Thread.holdsLock(e) : e; boolean internal = e.isInternal() || !e.context().userCache(); @@ -663,7 +662,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheEntryEventFilter fltr = null; if (cfg.getCacheEntryEventFilterFactory() != null) { - fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create(); + fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create(); if (!(fltr instanceof Serializable)) throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "