Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4F8D179B1 for ; Tue, 21 Apr 2015 13:35:50 +0000 (UTC) Received: (qmail 23195 invoked by uid 500); 21 Apr 2015 13:35:50 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 23166 invoked by uid 500); 21 Apr 2015 13:35:50 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 23157 invoked by uid 99); 21 Apr 2015 13:35:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 13:35:50 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of root@apache.org designates 54.164.171.186 as permitted sender) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 13:35:45 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id AF70C43EDA for ; Tue, 21 Apr 2015 13:34:51 +0000 (UTC) Received: (qmail 18408 invoked by uid 99); 21 Apr 2015 13:34:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 13:34:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AF2DE0D65; Tue, 21 Apr 2015 13:34:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 21 Apr 2015 13:35:00 -0000 Message-Id: <54e188d189b84b5097ef37d715cfbfdc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/50] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-758 X-Virus-Checked: Checked by ClamAV on apache.org Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-758 Conflicts: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960b0a3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960b0a3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960b0a3e Branch: refs/heads/ignite-646 Commit: 960b0a3ee91bdeb92696538b38bd51a8440ed924 Parents: 3b5fa57 357a715 Author: ivasilinets Authored: Fri Apr 17 14:30:19 2015 +0300 Committer: ivasilinets Committed: Fri Apr 17 14:30:19 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 28 ++ .../processors/cache/GridCacheAtomicFuture.java | 7 - .../processors/cache/GridCacheGateway.java | 111 +++++-- .../processors/cache/IgniteCacheProxy.java | 295 +++++++++-------- .../dht/atomic/GridDhtAtomicCache.java | 51 +-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 19 -- .../dht/atomic/GridNearAtomicUpdateFuture.java | 14 - .../datastreamer/DataStreamerCacheUpdaters.java | 2 +- .../datastreamer/DataStreamerImpl.java | 81 +++-- .../datastreamer/DataStreamerUpdateJob.java | 16 +- .../dr/IgniteDrDataStreamerCacheUpdater.java | 2 - .../internal/GridLifecycleBeanSelfTest.java | 36 +++ .../GridCacheAtomicTimeoutSelfTest.java | 314 ------------------- .../IgniteCacheAtomicMessageRecoveryTest.java | 32 ++ .../IgniteCacheMessageRecoveryAbstractTest.java | 175 +++++++++++ .../IgniteCacheTxMessageRecoveryTest.java | 32 ++ ...eAtomicInvalidPartitionHandlingSelfTest.java | 14 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 3 +- .../query/h2/sql/GridSqlFunction.java | 6 +- .../query/h2/sql/GridSqlPlaceholder.java | 51 +++ .../query/h2/sql/GridSqlQueryParser.java | 6 +- .../h2/sql/AbstractH2CompareQueryTest.java | 49 ++- .../query/h2/sql/GridQueryParsingTest.java | 9 + .../query/h2/sql/H2CompareBigQueryTest.java | 2 +- .../processors/query/h2/sql/bigQuery.sql | 2 +- 25 files changed, 711 insertions(+), 646 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index b654afd,aa73414..04c721a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@@ -106,10 -123,8 +123,8 @@@ public class GridCacheGateway * @param prj Projection to guard. * @return Previous projection set on this thread. */ - @Nullable public GridCacheProjectionImpl enter(@Nullable GridCacheProjectionImpl prj) { + @Nullable public CacheOperationContext enter(@Nullable CacheOperationContext prj) { try { - ctx.itHolder().checkWeakQueue(); - GridCacheAdapter cache = ctx.cache(); GridCachePreloader preldr = cache != null ? cache.preloader() : null; @@@ -155,18 -163,39 +163,39 @@@ } /** + * @param prj Projection to guard. + * @return Previous projection set on this thread. + */ + @Nullable public GridCacheProjectionImpl enterNoLock(@Nullable GridCacheProjectionImpl prj) { + onEnter(); + + if (stopped) + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); + + return setProjectionPerCall(prj); + } + + /** + * Set thread local projection per call. + * + * @param prj Projection to guard. + * @return Previous projection set on this thread. + */ - private GridCacheProjectionImpl setProjectionPerCall(@Nullable GridCacheProjectionImpl prj) { - GridCacheProjectionImpl prev = ctx.projectionPerCall(); ++ private CacheOperationContext setProjectionPerCall(@Nullable CacheOperationContext prj) { ++ CacheOperationContext prev = ctx.operationContextPerCall(); + + if (prev != null || prj != null) - ctx.projectionPerCall(prj); ++ ctx.operationContextPerCall(prj); + + return prev; + } + + /** * @param prev Previous. */ - public void leave(GridCacheProjectionImpl prev) { + public void leave(CacheOperationContext prev) { try { - ctx.tm().resetContext(); - ctx.mvcc().contextReset(); - - // Unwind eviction notifications. - CU.unwindEvicts(ctx); - - // Return back previous thread local operation context per call. - ctx.operationContextPerCall(prev); + leaveNoLock(prev); } finally { rwLock.readUnlock(); @@@ -174,6 -203,30 +203,30 @@@ } /** + * @param prev Previous. + */ - public void leaveNoLock(GridCacheProjectionImpl prev) { ++ public void leaveNoLock(CacheOperationContext prev) { + ctx.tm().resetContext(); + ctx.mvcc().contextReset(); + + // Unwind eviction notifications. + CU.unwindEvicts(ctx); + + // Return back previous thread local projection per call. - ctx.projectionPerCall(prev); ++ ctx.operationContextPerCall(prev); + } + + /** + * + */ + private void onEnter() { + ctx.itHolder().checkWeakQueue(); + + if (ctx.deploymentEnabled()) + ctx.deploy().onEnter(); + } + + /** * */ public void block() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 64d19ba,244e200..91f12b3 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@@ -97,10 -101,27 +101,27 @@@ public class IgniteCacheProxy ext */ public IgniteCacheProxy( GridCacheContext ctx, - GridCacheProjectionEx delegate, - @Nullable GridCacheProjectionImpl prj, + IgniteInternalCache delegate, + CacheOperationContext opCtx, boolean async ) { - this(ctx, delegate, prj, async, true); ++ this(ctx, delegate, opCtx, async, true); + } + + /** + * @param ctx Context. + * @param delegate Delegate. + * @param prj Projection. + * @param async Async support flag. + * @param lock If {@code false} does not acquire read lock on gateway enter. + */ + private IgniteCacheProxy( + GridCacheContext ctx, - GridCacheProjectionEx delegate, - @Nullable GridCacheProjectionImpl prj, ++ IgniteInternalCache delegate, ++ @Nullable CacheOperationContext opCtx, + boolean async, + boolean lock + ) { super(async); assert ctx != null; @@@ -112,7 -133,19 +133,19 @@@ gate = ctx.gate(); - legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, prj); + legacyProxy = new GridCacheProxyImpl<>(ctx, delegate, opCtx); + + this.lock = lock; + } + + /** + * Gets cache proxy which does not acquire read lock on gateway enter, should be + * used only if grid read lock is externally acquired. + * + * @return Ignite cache proxy with simple gate. + */ + public IgniteCacheProxy cacheNoGate() { - return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false); ++ return new IgniteCacheProxy<>(ctx, delegate, opCtx, isAsync(), false); } /** @@@ -131,7 -164,7 +164,7 @@@ /** {@inheritDoc} */ @Override public CacheMetrics metrics() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { return ctx.cache().metrics(); @@@ -143,7 -176,7 +176,7 @@@ /** {@inheritDoc} */ @Override public CacheMetrics metrics(ClusterGroup grp) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { List metrics = new ArrayList<>(grp.nodes().size()); @@@ -168,7 -201,7 +201,7 @@@ /** {@inheritDoc} */ @Override public CacheMetricsMXBean mxBean() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return ctx.cache().mxBean(); @@@ -190,7 -223,7 +223,7 @@@ /** {@inheritDoc} */ @Nullable @Override public Entry randomEntry() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return ctx.cache().randomEntry(); @@@ -202,15 -235,19 +235,15 @@@ /** {@inheritDoc} */ @Override public IgniteCache withExpiryPolicy(ExpiryPolicy plc) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { - CacheOperationContext prj0 = opCtx.withExpiryPolicy(plc); + GridCacheProjectionEx prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); - return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync()); - return new IgniteCacheProxy<>(ctx, - prj0, - (GridCacheProjectionImpl)prj0, - isAsync(), - lock); ++ return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl)prj0, isAsync()); } finally { - onLeave(prev); + gate.leave(prev); } } @@@ -222,7 -259,7 +255,7 @@@ /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) @@@ -242,7 -279,7 +275,7 @@@ /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) @@@ -262,7 -299,7 +295,7 @@@ /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -294,7 -331,7 +327,7 @@@ /** {@inheritDoc} */ @Override public boolean isLocalLocked(K key, boolean byCurrThread) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key); @@@ -449,7 -486,7 +482,7 @@@ @Override public QueryCursor query(Query qry) { A.notNull(qry, "qry"); - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@@ -511,7 -548,7 +544,7 @@@ /** {@inheritDoc} */ @Override public Iterable> localEntries(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return delegate.localEntries(peekModes); @@@ -526,19 -563,19 +559,19 @@@ /** {@inheritDoc} */ @Override public QueryMetrics queryMetrics() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { - return delegate.context().queries().metrics(); + return delegate.queries().metrics(); } finally { - onLeave(prev); + gate.leave(prev); } } /** {@inheritDoc} */ @Override public void localEvict(Collection keys) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { delegate.evictAll(keys); @@@ -550,7 -587,7 +583,7 @@@ /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return delegate.localPeek(key, peekModes, null); @@@ -566,7 -603,7 +599,7 @@@ /** {@inheritDoc} */ @Override public void localPromote(Set keys) throws CacheException { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { delegate.promoteAll(keys); @@@ -582,7 -619,7 +615,7 @@@ /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -603,7 -640,7 +636,7 @@@ /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return delegate.localSize(peekModes); @@@ -619,7 -656,7 +652,7 @@@ /** {@inheritDoc} */ @Override public V get(K key) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -642,7 -679,7 +675,7 @@@ /** {@inheritDoc} */ @Override public Map getAll(Set keys) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -668,7 -705,7 +701,7 @@@ */ public Map getAll(Collection keys) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -695,7 -732,7 +728,7 @@@ * @return Entry set. */ public Set> entrySetx(CacheEntryPredicate... filter) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { return delegate.entrySetx(filter); @@@ -707,7 -744,7 +740,7 @@@ /** {@inheritDoc} */ @Override public boolean containsKey(K key) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -725,7 -762,7 +758,7 @@@ /** {@inheritDoc} */ @Override public boolean containsKeys(Set keys) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -747,7 -784,7 +780,7 @@@ boolean replaceExisting, @Nullable final CompletionListener completionLsnr ) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { IgniteInternalFuture fut = ctx.cache().loadAll(keys, replaceExisting); @@@ -775,7 -812,7 +808,7 @@@ /** {@inheritDoc} */ @Override public void put(K key, V val) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) @@@ -795,7 -832,7 +828,7 @@@ /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -818,7 -855,7 +851,7 @@@ /** {@inheritDoc} */ @Override public void putAll(Map map) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) @@@ -838,7 -875,7 +871,7 @@@ /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -861,7 -898,7 +894,7 @@@ /** {@inheritDoc} */ @Override public boolean remove(K key) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -884,7 -921,7 +917,7 @@@ /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -907,7 -944,7 +940,7 @@@ /** {@inheritDoc} */ @Override public V getAndRemove(K key) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -930,7 -967,7 +963,7 @@@ /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -953,7 -990,7 +986,7 @@@ /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ GridCacheProjectionImpl prev = gate.enter(prj); try { if (isAsync()) { @@@ -976,7 -1013,7 +1009,7 @@@ /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -999,7 -1036,7 +1032,7 @@@ /** {@inheritDoc} */ @Override public void removeAll(Set keys) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) @@@ -1018,7 -1055,7 +1051,7 @@@ /** {@inheritDoc} */ @Override public void removeAll() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) @@@ -1036,7 -1073,7 +1069,7 @@@ /** {@inheritDoc} */ @Override public void clear(K key) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) @@@ -1054,7 -1091,7 +1087,7 @@@ /** {@inheritDoc} */ @Override public void clearAll(Set keys) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) @@@ -1072,7 -1109,7 +1105,7 @@@ /** {@inheritDoc} */ @Override public void clear() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) @@@ -1090,7 -1127,7 +1123,7 @@@ /** {@inheritDoc} */ @Override public void localClear(K key) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { delegate.clearLocally(key); @@@ -1102,7 -1139,7 +1135,7 @@@ /** {@inheritDoc} */ @Override public void localClearAll(Set keys) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { for (K key : keys) @@@ -1117,7 -1154,7 +1150,7 @@@ @Override public T invoke(K key, EntryProcessor entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -1155,7 -1192,7 +1188,7 @@@ @Override public T invoke(K key, CacheEntryProcessor entryProcessor, Object... args) throws EntryProcessorException { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -1194,7 -1231,7 +1227,7 @@@ EntryProcessor entryProcessor, Object... args) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -1219,7 -1256,7 +1252,7 @@@ CacheEntryProcessor entryProcessor, Object... args) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -1244,7 -1281,7 +1277,7 @@@ Map> map, Object... args) { try { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { if (isAsync()) { @@@ -1317,9 -1354,9 +1350,9 @@@ } /** - * + * @return Proxy delegate. */ - public GridCacheProjectionEx delegate() { + public IgniteInternalCache delegate() { return delegate; } @@@ -1336,7 -1375,7 +1369,7 @@@ /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false); @@@ -1351,7 -1390,7 +1384,7 @@@ /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration lsnrCfg) { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { ctx.continuousQueries().cancelJCacheQuery(lsnrCfg); @@@ -1366,7 -1405,7 +1399,7 @@@ /** {@inheritDoc} */ @Override public Iterator> iterator() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { return ctx.cache().igniteIterator(); @@@ -1378,7 -1417,7 +1411,7 @@@ /** {@inheritDoc} */ @Override protected IgniteCache createAsyncInstance() { - return new IgniteCacheProxy<>(ctx, delegate, opCtx, true); - return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock); ++ return new IgniteCacheProxy<>(ctx, delegate, opCtx, true, lock); } /** @@@ -1405,19 -1444,25 +1438,20 @@@ * @return Projection for portable objects. */ public IgniteCache keepPortable() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { - GridCacheProjectionImpl prj0 = new GridCacheProjectionImpl<>( - (CacheProjection)(prj != null ? prj : delegate), - (GridCacheContext)ctx, - prj != null ? prj.skipStore() : false, - prj != null ? prj.subjectId() : null, - true, - prj != null ? prj.expiry() : null); + CacheOperationContext prj = + new CacheOperationContext(opCtx.skipStore(), opCtx.subjectId(), true, opCtx.expiry()); return new IgniteCacheProxy<>((GridCacheContext)ctx, - prj0, - prj0, + (GridCacheAdapter)delegate, + prj, - isAsync()); + isAsync(), + lock); } finally { - gate.leave(prev); + onLeave(prev); } } @@@ -1425,24 -1470,30 +1459,25 @@@ * @return Cache with skip store enabled. */ public IgniteCache skipStore() { - CacheOperationContext prev = gate.enter(opCtx); - GridCacheProjectionImpl prev = onEnter(prj); ++ CacheOperationContext prev = onEnter(opCtx); try { - boolean skip = prj != null && prj.skipStore(); + boolean skip = opCtx != null && opCtx.skipStore(); if (skip) return this; - GridCacheProjectionImpl prj0 = new GridCacheProjectionImpl<>( - (prj != null ? prj : delegate), - ctx, - true, - prj != null ? prj.subjectId() : null, - prj != null && prj.isKeepPortable(), - prj != null ? prj.expiry() : null); + CacheOperationContext prj0 = + new CacheOperationContext(true, opCtx.subjectId(), opCtx.isKeepPortable(), opCtx.expiry()); return new IgniteCacheProxy<>(ctx, - prj0, + delegate, prj0, - isAsync()); + isAsync(), + lock); } finally { - gate.leave(prev); + onLeave(prev); } } @@@ -1469,13 -1520,58 +1504,58 @@@ return legacyProxy; } + /** - * @param prj Projection to guard. ++ * @param opCtx Cache operation context to guard. + * @return Previous projection set on this thread. + */ - private GridCacheProjectionImpl onEnter(GridCacheProjectionImpl prj) { ++ private CacheOperationContext onEnter(CacheOperationContext opCtx) { + if (lock) - return gate.enter(prj); ++ return gate.enter(opCtx); + else - return gate.enterNoLock(prj); ++ return gate.enterNoLock(opCtx); + } + + /** + * On enter. + * + * @return {@code True} if enter successful. + */ + private boolean onEnterIfNoClose() { + if (lock) + return gate.enterIfNotClosed(); + else + return gate.enterIfNotClosedNoLock(); + } + + /** - * @param prj Projection to guard.. ++ * @param opCtx Operation context to guard. + */ - private void onLeave(GridCacheProjectionImpl prj) { ++ private void onLeave(CacheOperationContext opCtx) { + if (lock) - gate.leave(prj); ++ gate.leave(opCtx); + else - gate.leaveNoLock(prj); ++ gate.leaveNoLock(opCtx); + } + + /** + * On leave. + */ + private void onLeave() { + if (lock) + gate.leave(); + else + gate.leaveNoLock(); + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); out.writeObject(delegate); - out.writeObject(prj); + out.writeObject(opCtx); + + out.writeBoolean(lock); } /** {@inheritDoc} */ @@@ -1483,11 -1579,13 +1563,13 @@@ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { ctx = (GridCacheContext)in.readObject(); - delegate = (GridCacheProjectionEx)in.readObject(); + delegate = (IgniteInternalCache)in.readObject(); - prj = (GridCacheProjectionImpl)in.readObject(); + opCtx = (CacheOperationContext)in.readObject(); gate = ctx.gate(); + + lock = in.readBoolean(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960b0a3e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java ----------------------------------------------------------------------