Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1914E1791B for ; Thu, 2 Jul 2015 23:08:35 +0000 (UTC) Received: (qmail 23623 invoked by uid 500); 2 Jul 2015 23:08:35 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 23591 invoked by uid 500); 2 Jul 2015 23:08:35 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 23576 invoked by uid 99); 2 Jul 2015 23:08:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 23:08:35 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 02 Jul 2015 23:06:21 +0000 Received: (qmail 19167 invoked by uid 99); 2 Jul 2015 23:06:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 23:06:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F717E367A; Thu, 2 Jul 2015 23:06:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 02 Jul 2015 23:06:56 -0000 Message-Id: <377078f83ffc4b8ca4c6a3c389a71390@git.apache.org> In-Reply-To: <44ccb87102ce4a0184a88f896859ca82@git.apache.org> References: <44ccb87102ce4a0184a88f896859ca82@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/34] incubator-ignite git commit: ignite-973-2 - read offheap value before remove X-Virus-Checked: Checked by ClamAV on apache.org ignite-973-2 - read offheap value before remove Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/260dc2dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/260dc2dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/260dc2dd Branch: refs/heads/ignite-1026 Commit: 260dc2dd4978d0a57732b7edd0aa0b043d4eff4c Parents: 285d790 Author: S.Vladykin Authored: Tue Jun 23 15:28:24 2015 +0300 Committer: S.Vladykin Committed: Tue Jun 23 15:28:24 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheSwapManager.java | 192 +++++++++++-------- .../query/h2/opt/GridH2KeyValueRowOffheap.java | 8 +- 2 files changed, 118 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index f709e03..e45ec2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -535,21 +535,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try removing from offheap. if (offheapEnabled) { - byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); - - if (cctx.config().isStatisticsEnabled()) { - if (entryBytes != null) - cctx.cache().metrics0().onOffHeapRemove(); - - cctx.cache().metrics0().onOffHeapRead(entryBytes != null); - } - - if (entryBytes != null) { - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); - - if (entry == null) - return null; + GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part); + if (entry != null) { // Always fire this event, since preloading depends on it. onOffHeaped(part, key, entry); @@ -569,11 +557,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { null, null); - GridCacheQueryManager qryMgr = cctx.queries(); - - if (qryMgr != null) - qryMgr.onUnswap(key, entry.value()); - return entry; } } @@ -737,6 +720,47 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param key Key. + * @param keyBytes Key bytes. + * @param part Partition. + * @return Swap entry. + * @throws IgniteCheckedException If failed. + */ + @Nullable private GridCacheSwapEntry removeFromOffheap(KeyCacheObject key, byte[] keyBytes, int part) + throws IgniteCheckedException { + final GridCacheQueryManager qryMgr = cctx.queries(); + + GridCacheSwapEntry entry; + + if (qryMgr != null) { + entry = readOffheapBeforeRemove(key, keyBytes, part); + + if (entry != null) { + if (offheap.removex(spaceName, part, key, keyBytes)) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + } + else + entry = null; // Failed to remove -> reset to null. + } + } + else { + byte[] entryBytes = offheap.remove(spaceName, part, key, keyBytes); + + if (entryBytes != null) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + + entry = swapEntry(unmarshalSwapEntry(entryBytes)); + } + else + entry = null; + } + + return entry; + } + + /** * @param keys Collection of keys to remove from swap. * @return Collection of swap entries. * @throws IgniteCheckedException If failed, @@ -759,40 +783,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (KeyCacheObject key : keys) { int part = cctx.affinity().partition(key); - byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + GridCacheSwapEntry entry = removeFromOffheap(key, key.valueBytes(cctx.cacheObjectContext()), part); - if(entryBytes != null && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onOffHeapRemove(); + if (entry != null) { + // Always fire this event, since preloading depends on it. + onOffHeaped(part, key, entry); - if (entryBytes != null) { - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP)) + cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, + EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null); - if (entry != null) { - // Always fire this event, since preloading depends on it. - onOffHeaped(part, key, entry); - - if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP)) - cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, - EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null); - - if (qryMgr != null) - qryMgr.onUnswap(key, entry.value()); - - GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, - part, - ByteBuffer.wrap(entry.valueBytes()), - entry.type(), - entry.version(), entry.ttl(), - entry.expireTime(), - entry.keyClassLoaderId(), - entry.valueClassLoaderId()); + GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, + part, + ByteBuffer.wrap(entry.valueBytes()), + entry.type(), + entry.version(), entry.ttl(), + entry.expireTime(), + entry.keyClassLoaderId(), + entry.valueClassLoaderId()); - unswapped.value(entry.value()); + unswapped.value(entry.value()); - res.add(unswapped); + res.add(unswapped); - continue; - } + continue; } if (swapEnabled) { @@ -940,6 +954,34 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * Reads value from offheap and unswaps it for indexing. + * + * @param key Key. + * @param keyBytes Key bytes. + * @param part Partition. + * @return Swap entry. + * @throws IgniteCheckedException If failed. + */ + public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part) + throws IgniteCheckedException { + assert cctx.queries() != null; + + byte[] val = offheap.get(spaceName, part, key, keyBytes); + + if (val != null) { + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(val)); + + if (entry != null) { + cctx.queries().onUnswap(key, entry.value()); + + return entry; + } + } + + return null; + } + + /** * @param key Key to remove. * @throws IgniteCheckedException If failed. */ @@ -951,42 +993,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridCacheQueryManager qryMgr = cctx.queries(); - CI1 c = qryMgr == null ? null : new CI1() { - @Override public void apply(byte[] rmv) { - if (rmv == null) - return; - - try { - if (cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onSwapRemove(); - - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); - - if (entry == null) - return; - - qryMgr.onUnswap(key, entry.value()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }; - int part = cctx.affinity().partition(key); // First try offheap. if (offheapEnabled) { - // TODO Pass closure c to offheap.remove and apply it before the actual remove. - byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext()); - if (val != null) { + if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) && + offheap.removex(spaceName, part, key, keyBytes)) { if (cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onOffHeapRemove(); - if (c != null) - c.apply(val); - return; } } @@ -998,7 +1015,30 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { swapMgr.remove(spaceName, swapKey, - c, + new CI1() { + @Override public void apply(byte[] rmv) { + if (rmv == null) + return; + + try { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + + if (qryMgr == null) + return; + + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); + + if (entry == null) + return; + + qryMgr.onUnswap(key, entry.value()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, cctx.deploy().globalLoader()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260dc2dd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java index f89591a..1f54713 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java @@ -236,12 +236,8 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { try { GridUnsafeMemory mem = desc.memory(); - if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) { - if (beforeRmv) - return; // The offheap value is in its place, nothing to do here. - else - throw new IllegalStateException("Unswap without swap: " + p); - } + if (mem.readLongVolatile(p + OFFSET_VALUE_REF) != 0) + return; // The offheap value is in its place, nothing to do here. Value v = peekValue(VAL_COL);