Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 06B4D1859D for ; Wed, 18 Nov 2015 08:08:14 +0000 (UTC) Received: (qmail 2648 invoked by uid 500); 18 Nov 2015 08:08:13 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 2330 invoked by uid 500); 18 Nov 2015 08:08:13 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 99469 invoked by uid 99); 18 Nov 2015 08:08:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 08:08:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58B8DE10A7; Wed, 18 Nov 2015 08:08:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 18 Nov 2015 08:08:43 -0000 Message-Id: <86446b46b71b47358bd3b81a9dedeb37@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] ignite git commit: IGNITE-1910: .Net Fixed leak in ScanQuery. IGNITE-1910: .Net Fixed leak in ScanQuery. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1de65393 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1de65393 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1de65393 Branch: refs/heads/ignite-1816 Commit: 1de6539322cf8b33eceeb8d1ffdf50ceb398cc89 Parents: c00e4ac Author: Pavel Tupitsyn Authored: Tue Nov 17 16:40:35 2015 +0300 Committer: vozerov-gridgain Committed: Tue Nov 17 16:40:35 2015 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 289 ++++++++++--------- .../Cache/Query/CacheQueriesTest.cs | 17 ++ .../Cache/Store/CacheStoreTest.cs | 35 +++ .../Apache.Ignite.Core/Cache/Query/ScanQuery.cs | 15 +- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 13 +- 5 files changed, 230 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 58a8424..bef587a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -816,201 +816,218 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte final IgniteBiPredicate keyValFilter = qry.scanFilter(); - injectResources(keyValFilter); + try { + injectResources(keyValFilter); - final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); + final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); - final GridCacheAdapter cache = dht != null ? dht : cctx.cache(); + final GridCacheAdapter cache = dht != null ? dht : cctx.cache(); - final ExpiryPolicy plc = cctx.expiry(); + final ExpiryPolicy plc = cctx.expiry(); - final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - final boolean backups = qry.includeBackups() || cctx.isReplicated(); + final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter> heapIt = - new GridCloseableIteratorAdapter>() { - private IgniteBiTuple next; + final GridCloseableIteratorAdapter> heapIt = + new GridCloseableIteratorAdapter>() { + private IgniteBiTuple next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator iter; + private Iterator iter; - private GridDhtLocalPartition locPart; + private GridDhtLocalPartition locPart; - { - Integer part = qry.partition(); + { + Integer part = qry.partition(); - if (part == null || dht == null) - iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); - else if (part < 0 || part >= cctx.affinity().partitions()) - iter = F.emptyIterator(); - else { - locPart = dht.topology().localPartition(part, topVer, false); + if (part == null || dht == null) + iter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + iter = F.emptyIterator(); + else { + locPart = dht.topology().localPartition(part, topVer, false); - // double check for owning state - if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || - locPart.state() != OWNING) - throw new GridDhtUnreservedPartitionException(part, - cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); + // double check for owning state + if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || + locPart.state() != OWNING) + throw new GridDhtUnreservedPartitionException(part, + cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); - iter = new Iterator() { - private Iterator iter0 = locPart.keySet().iterator(); + iter = new Iterator() { + private Iterator iter0 = locPart.keySet().iterator(); - @Override public boolean hasNext() { - return iter0.hasNext(); - } + @Override public boolean hasNext() { + return iter0.hasNext(); + } - @Override public K next() { - KeyCacheObject key = iter0.next(); + @Override public K next() { + KeyCacheObject key = iter0.next(); - return key.value(cctx.cacheObjectContext(), false); - } + return key.value(cctx.cacheObjectContext(), false); + } - @Override public void remove() { - iter0.remove(); - } - }; + @Override public void remove() { + iter0.remove(); + } + }; + } + + advance(); } - advance(); - } + @Override public boolean onHasNext() { + return next != null; + } - @Override public boolean onHasNext() { - return next != null; - } + @Override public IgniteBiTuple onNext() { + if (next == null) + throw new NoSuchElementException(); - @Override public IgniteBiTuple onNext() { - if (next == null) - throw new NoSuchElementException(); + IgniteBiTuple next0 = next; - IgniteBiTuple next0 = next; + advance(); - advance(); + return next0; + } - return next0; - } + private void advance() { + IgniteBiTuple next0 = null; - private void advance() { - IgniteBiTuple next0 = null; + while (iter.hasNext()) { + next0 = null; - while (iter.hasNext()) { - next0 = null; + K key = iter.next(); - K key = iter.next(); + V val; - V val; + try { + GridCacheEntryEx entry = cache.peekEx(key); - try { - GridCacheEntryEx entry = cache.peekEx(key); + CacheObject cacheVal = + entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null; - CacheObject cacheVal = - entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null; + // TODO 950 nocopy + val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable()); + } + catch (GridCacheEntryRemovedException e) { + val = null; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); - // TODO 950 nocopy - val = (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(cacheVal, qry.keepPortable()); - } - catch (GridCacheEntryRemovedException e) { - val = null; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); + val = null; + } - val = null; - } + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); + expiryPlc = cctx.cache().expiryPolicy(plc); + } - expiryPlc = cctx.cache().expiryPolicy(plc); + if (val != null) { + next0 = F.t(key, val); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } } - if (val != null) { - next0 = F.t(key, val); + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; - if (checkPredicate(next0)) - break; - else - next0 = null; - } + if (next == null) + sendTtlUpdate(); } - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; - - if (next == null) + @Override protected void onClose() { sendTtlUpdate(); - } - @Override protected void onClose() { - sendTtlUpdate(); - - if (locPart != null) - locPart.release(); - } - - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); - - expiryPlc = null; + if (locPart != null) + locPart.release(); } - } - private boolean checkPredicate(Map.Entry e) { - if (keyValFilter != null) { - Map.Entry e0 = (Map.Entry)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); - return keyValFilter.apply(e0.getKey(), e0.getValue()); + expiryPlc = null; + } } - return true; - } - }; + private boolean checkPredicate(Map.Entry e) { + if (keyValFilter != null) { + Map.Entry e0 = (Map.Entry)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); - final GridIterator> it; + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } - if (cctx.isSwapOrOffheapEnabled()) { - List>> iters = new ArrayList<>(3); + return true; + } + }; - iters.add(heapIt); + final GridIterator> it; - if (cctx.isOffHeapEnabled()) - iters.add(offheapIterator(qry, backups)); + if (cctx.isSwapOrOffheapEnabled()) { + List>> iters = new ArrayList<>(3); - if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry, backups)); + iters.add(heapIt); - it = new CompoundIterator<>(iters); - } - else - it = heapIt; + if (cctx.isOffHeapEnabled()) + iters.add(offheapIterator(qry, backups)); - return new GridCloseableIteratorAdapter>() { - @Override protected boolean onHasNext() { - return it.hasNext(); - } + if (cctx.swap().swapEnabled()) + iters.add(swapIterator(qry, backups)); - @Override protected IgniteBiTuple onNext() { - return it.next(); + it = new CompoundIterator<>(iters); } + else + it = heapIt; - @Override protected void onRemove() { - it.remove(); - } + return new GridCloseableIteratorAdapter>() { + @Override protected boolean onHasNext() { + return it.hasNext(); + } - @Override protected void onClose() throws IgniteCheckedException { - try { - heapIt.close(); + @Override protected IgniteBiTuple onNext() { + return it.next(); } - finally { - if (keyValFilter instanceof PlatformCacheEntryFilter) - ((PlatformCacheEntryFilter)keyValFilter).onClose(); + + @Override protected void onRemove() { + it.remove(); } - } - }; + + @Override protected void onClose() throws IgniteCheckedException { + try { + heapIt.close(); + } + finally { + closeScanFilter(keyValFilter); + } + } + }; + } + catch (IgniteCheckedException | RuntimeException e) + { + closeScanFilter(keyValFilter); + + throw e; + } + } + + /** + * Closes a filter if it is closeable. + * + * @param f Filter. + */ + private static void closeScanFilter(Object f) { + if (f instanceof PlatformCacheEntryFilter) + ((PlatformCacheEntryFilter)f).onClose(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs index 7c7fe35..74c4801 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs @@ -21,6 +21,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query using System.Collections; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; + using System.Linq; using System.Text; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; @@ -115,6 +116,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Query Assert.IsTrue(cache.IsEmpty()); } + TestUtils.AssertHandleRegistryIsEmpty(300, + Enumerable.Range(0, GridCnt).Select(x => Ignition.GetIgnite("grid-" + x)).ToArray()); + Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name); } @@ -625,6 +629,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Query qry = new ScanQuery(new PortableScanQueryFilter()); ValidateQueryResults(cache, qry, exp, keepPortable); + // Invalid + exp = PopulateCache(cache, loc, cnt, x => x < 50); + qry = new ScanQuery(new InvalidScanQueryFilter()); + Assert.Throws(() => ValidateQueryResults(cache, qry, exp, keepPortable)); + // Exception exp = PopulateCache(cache, loc, cnt, x => x < 50); qry = new ScanQuery(new ScanQueryFilter {ThrowErr = true}); @@ -917,4 +926,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Query ThrowErr = r.ReadBoolean(); } } + + /// + /// Filter that can't be serialized. + /// + public class InvalidScanQueryFilter : ScanQueryFilter + { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs index 0dc9912..eb148f0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreTest.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Store using System.Collections.Generic; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Impl; using NUnit.Framework; @@ -90,6 +91,27 @@ namespace Apache.Ignite.Core.Tests.Cache.Store } /// + /// Cache entry predicate that throws an exception. + /// + [Serializable] + public class ExceptionalEntryFilter : ICacheEntryFilter + { + /** */ + public bool Invoke(ICacheEntry entry) + { + throw new Exception("Expected exception in ExceptionalEntryFilter"); + } + } + + /// + /// Filter that can't be serialized. + /// + public class InvalidCacheEntryFilter : CacheEntryFilter + { + // No-op. + } + + /// /// /// public class CacheStoreTest @@ -106,6 +128,9 @@ namespace Apache.Ignite.Core.Tests.Cache.Store /** */ private const string TemplateStoreCacheName = "template_store*"; + /** */ + private volatile int _storeCount = 3; + /// /// /// @@ -166,6 +191,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store CacheTestStore.Reset(); + TestUtils.AssertHandleRegistryHasItems(300, _storeCount, Ignition.GetIgnite(GridName())); + Console.WriteLine("Test finished: " + TestContext.CurrentContext.Test.Name); } @@ -182,6 +209,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Store for (int i = 105; i < 110; i++) Assert.AreEqual("val_" + i, cache.Get(i)); + + // Test invalid filter + Assert.Throws(() => cache.LoadCache(new InvalidCacheEntryFilter(), 100, 10)); + + // Test exception in filter + Assert.Throws(() => cache.LoadCache(new ExceptionalEntryFilter(), 100, 10)); } [Test] @@ -443,6 +476,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store cache.Put(1, cache.Name); Assert.AreEqual(cache.Name, CacheTestStore.Map[1]); + + _storeCount++; } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs index e1478f3..12fb363 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/ScanQuery.cs @@ -17,6 +17,7 @@ namespace Apache.Ignite.Core.Cache.Query { + using System; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Cache; @@ -62,8 +63,18 @@ namespace Apache.Ignite.Core.Cache.Query { var holder = new CacheEntryFilterHolder(Filter, (key, val) => Filter.Invoke( new CacheEntry((TK) key, (TV) val)), writer.Marshaller, keepBinary); - - writer.WriteObject(holder); + + try + { + writer.WriteObject(holder); + } + catch (Exception) + { + writer.Marshaller.Ignite.HandleRegistry.Release(holder.Handle); + + throw; + } + writer.WriteLong(holder.Handle); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1de65393/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index a6dfe7e..b1870d7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -272,7 +272,18 @@ namespace Apache.Ignite.Core.Impl.Cache { var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry((TK)k, (TV)v)), Marshaller, IsKeepBinary); - writer.WriteObject(p0); + + try + { + writer.WriteObject(p0); + } + catch (Exception) + { + writer.Marshaller.Ignite.HandleRegistry.Release(p0.Handle); + + throw; + } + writer.WriteLong(p0.Handle); } else