Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B00B172C9 for ; Wed, 29 Apr 2015 14:29:56 +0000 (UTC) Received: (qmail 6857 invoked by uid 500); 29 Apr 2015 14:29:56 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 6830 invoked by uid 500); 29 Apr 2015 14:29:56 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 6821 invoked by uid 99); 29 Apr 2015 14:29:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 14:29:56 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: local policy) Received: from [54.76.25.247] (HELO mx1-eu-west.apache.org) (54.76.25.247) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 14:29:22 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 5CFC02AA64 for ; Wed, 29 Apr 2015 14:29:18 +0000 (UTC) Received: (qmail 4458 invoked by uid 99); 29 Apr 2015 14:28:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 14:28:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82FBAE0215; Wed, 29 Apr 2015 14:28:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 29 Apr 2015 14:28:02 -0000 Message-Id: <4374faf20c7f4bbda87daf715dec37d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/42] incubator-ignite git commit: ignite-757 - wip X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-286 3443afcf2 -> 5d5bdee8c ignite-757 - wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a3a91d45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a3a91d45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a3a91d45 Branch: refs/heads/ignite-286 Commit: a3a91d45e0d3192f591505f39ddd7185a9baece3 Parents: fddf2a3 Author: S.Vladykin Authored: Wed Apr 22 15:48:39 2015 +0300 Committer: S.Vladykin Committed: Wed Apr 22 15:48:39 2015 +0300 ---------------------------------------------------------------------- .../managers/indexing/GridIndexingManager.java | 14 +- .../internal/processors/cache/CacheObject.java | 5 + .../processors/cache/CacheObjectAdapter.java | 5 + .../cache/CacheObjectByteArrayImpl.java | 5 + .../processors/cache/CacheObjectImpl.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 9 +- .../processors/cache/GridCacheProcessor.java | 3 +- .../processors/cache/GridCacheSwapManager.java | 35 ++--- .../cache/query/GridCacheQueryManager.java | 22 +-- .../processors/query/GridQueryIndexing.java | 11 +- .../processors/query/GridQueryProcessor.java | 69 +++++--- .../processors/query/h2/IgniteH2Indexing.java | 156 +++++++++++++++++-- .../query/h2/opt/GridH2AbstractKeyValueRow.java | 92 ++--------- .../query/h2/opt/GridH2KeyValueRowOffheap.java | 7 +- .../query/h2/opt/GridH2KeyValueRowOnheap.java | 6 +- .../query/h2/opt/GridH2RowDescriptor.java | 11 +- .../processors/query/h2/opt/GridH2Table.java | 10 +- .../query/h2/opt/GridLuceneIndex.java | 76 +++++---- .../IgniteCacheQueryMultiThreadedSelfTest.java | 6 +- .../h2/GridIndexingSpiAbstractSelfTest.java | 121 +++++++++++--- 20 files changed, 422 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java index 5cb150e..9a81cd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/GridIndexingManager.java @@ -89,9 +89,7 @@ public class GridIndexingManager extends GridManagerAdapter { public void store(final String space, final K key, final V val, long expirationTime) throws IgniteCheckedException { assert key != null; assert val != null; - - if (!enabled()) - return; + assert enabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to write to index (grid is stopping)."); @@ -115,9 +113,7 @@ public class GridIndexingManager extends GridManagerAdapter { @SuppressWarnings("unchecked") public void remove(String space, Object key) throws IgniteCheckedException { assert key != null; - - if (!enabled()) - return; + assert enabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to remove from index (grid is stopping)."); @@ -190,8 +186,7 @@ public class GridIndexingManager extends GridManagerAdapter { * @throws IgniteSpiException If failed. */ public void onSwap(String spaceName, Object key) throws IgniteSpiException { - if (!enabled()) - return; + assert enabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to process swap event (grid is stopping)."); @@ -214,8 +209,7 @@ public class GridIndexingManager extends GridManagerAdapter { */ public void onUnswap(String spaceName, Object key, Object val) throws IgniteSpiException { - if (!enabled()) - return; + assert enabled(); if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to process swap event (grid is stopping)."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 2f77e86..726466f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -46,6 +46,11 @@ public interface CacheObject extends Message { public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; /** + * @return {@code true} If this cache object contains serialized value representation. + */ + public boolean hasValueBytes(); + + /** * @return Object type. */ public byte type(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 173483d..75cf87e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -37,6 +37,11 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable /** */ protected byte[] valBytes; + /** {@inheritDoc} */ + @Override public boolean hasValueBytes() { + return valBytes != null; + } + /** * @param ctx Context. * @return {@code True} need to copy value returned to user. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java index 96b838c..d920bd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java @@ -53,6 +53,11 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable { } /** {@inheritDoc} */ + @Override public boolean hasValueBytes() { + return true; + } + + /** {@inheritDoc} */ @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java index 33a747d..ad033a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java @@ -75,7 +75,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { return (T)val; } catch (IgniteCheckedException e) { - throw new IgniteException("Failed to unmarshal object.", e); + throw new IgniteException("Failed to unmarshall object.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 77590f2..24bdf7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3466,10 +3466,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr != null && qryMgr.enabled()) { - qryMgr.store(key.value(cctx.cacheObjectContext(), false), - null, - CU.value(val, cctx, false), - null, + qryMgr.store(key, + val, ver, expireTime); } @@ -3492,8 +3490,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr != null) - qryMgr.remove(key().value(cctx.cacheObjectContext(), false), - prevVal == null ? null : prevVal.value(cctx.cacheObjectContext(), false)); + qryMgr.remove(key(), prevVal == null ? null : prevVal); } catch (IgniteCheckedException e) { throw new GridCacheIndexUpdateException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 83f1fed..c0026ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2381,8 +2381,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert val != null; - qryMgr.remove(key.value(cctx.cacheObjectContext(), false), - val.value(cctx.cacheObjectContext(), false)); + qryMgr.remove(key, val); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal key evicted from swap [swapSpaceName=" + spaceName + ']', e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index fb6b103..6444e37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -551,11 +551,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null) { - qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false), - entry.value().value(cctx.cacheObjectContext(), false), - entry.valueBytes()); - } + if (qryMgr != null) + qryMgr.onUnswap(key, entry.value()); return entry; } @@ -619,11 +616,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheQueryManager qryMgr = cctx.queries(); - if (qryMgr != null) { - qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false), - v.value(cctx.cacheObjectContext(), false), - valBytes); - } + if (qryMgr != null) + qryMgr.onUnswap(key, v); } catch (IgniteCheckedException e) { err.set(e); @@ -758,9 +752,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { EVT_CACHE_OBJECT_FROM_OFFHEAP, null, false, null, true, null, null, null); if (qryMgr != null) - qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false), - entry.value().value(cctx.cacheObjectContext(), false), - entry.valueBytes()); + qryMgr.onUnswap(key, entry.value()); GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, part, @@ -859,11 +851,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // Always fire this event, since preloading depends on it. onUnswapped(swapKey.partition(), key, entry); - if (qryMgr != null) { - qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false), - entry.value().value(cctx.cacheObjectContext(), false), - entry.valueBytes()); - } + if (qryMgr != null) + qryMgr.onUnswap(key, entry.value()); } catch (IgniteCheckedException e) { err.set(e); @@ -941,9 +930,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (entry == null) return; - qryMgr.onUnswap(key.value(cctx.cacheObjectContext(), false), - entry.value().value(cctx.cacheObjectContext(), false), - entry.valueBytes()); + qryMgr.onUnswap(key, entry.value()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1030,7 +1017,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheQueryManager qryMgr = cctx.queries(); if (qryMgr != null) - qryMgr.onSwap(spaceName, key.value(cctx.cacheObjectContext(), false)); + qryMgr.onSwap(key); } /** @@ -1059,7 +1046,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null); if (qryMgr != null) - qryMgr.onSwap(spaceName, swapEntry.key().value(cctx.cacheObjectContext(), false)); + qryMgr.onSwap(swapEntry.key()); } } else { @@ -1081,7 +1068,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null); if (qryMgr != null) - qryMgr.onSwap(spaceName, batchSwapEntry.key().value(cctx.cacheObjectContext(), false)); + qryMgr.onSwap(batchSwapEntry.key()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16063af..8a2eccf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -286,11 +286,10 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte /** * Entry for given key unswapped. * - * @param swapSpaceName Swap space name. * @param key Key. * @throws IgniteCheckedException If failed. */ - public void onSwap(String swapSpaceName, K key) throws IgniteCheckedException { + public void onSwap(CacheObject key) throws IgniteCheckedException { if (!enterBusy()) return; // Ignore index update when node is stopping. @@ -307,15 +306,14 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * * @param key Key. * @param val Value - * @param valBytes Value bytes. * @throws IgniteCheckedException If failed. */ - public void onUnswap(K key, V val, byte[] valBytes) throws IgniteCheckedException { + public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException { if (!enterBusy()) return; // Ignore index update when node is stopping. try { - qryProc.onUnswap(space, key, val, valBytes); + qryProc.onUnswap(space, key, val); } finally { leaveBusy(); @@ -334,18 +332,15 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * Writes key-value pair to index. * * @param key Key. - * @param keyBytes Byte array with key data. * @param val Value. - * @param valBytes Value bytes. * @param ver Cache entry version. * @param expirationTime Expiration time or 0 if never expires. * @throws IgniteCheckedException In case of error. */ - public void store(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - GridCacheVersion ver, long expirationTime) + public void store(CacheObject key, CacheObject val, GridCacheVersion ver, long expirationTime) throws IgniteCheckedException { assert key != null; - assert val != null || valBytes != null; + assert val != null; assert enabled(); if (key instanceof GridCacheInternal) @@ -355,10 +350,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - if (val == null) - val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); - - qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime); + qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime); } finally { invalidateResultCache(); @@ -373,7 +365,7 @@ public abstract class GridCacheQueryManager extends GridCacheManagerAdapte * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("SimplifiableIfStatement") - public void remove(Object key, Object val) throws IgniteCheckedException { + public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException { assert key != null; if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 12f774c..3f5e234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -176,8 +176,8 @@ public interface GridQueryIndexing { * @param expirationTime Expiration time or 0 if never expires. * @throws IgniteCheckedException If failed. */ - public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object key, Object val, byte[] ver, - long expirationTime) throws IgniteCheckedException; + public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject key, CacheObject val, + byte[] ver, long expirationTime) throws IgniteCheckedException; /** * Removes index entry by key. @@ -187,7 +187,7 @@ public interface GridQueryIndexing { * @param val Value. * @throws IgniteCheckedException If failed. */ - public void remove(@Nullable String spaceName, Object key, Object val) throws IgniteCheckedException; + public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException; /** * Will be called when entry with given key is swapped. @@ -196,7 +196,7 @@ public interface GridQueryIndexing { * @param key Key. * @throws IgniteCheckedException If failed. */ - public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException; + public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException; /** * Will be called when entry with given key is unswapped. @@ -204,10 +204,9 @@ public interface GridQueryIndexing { * @param spaceName Space name. * @param key Key. * @param val Value. - * @param valBytes Value bytes. * @throws IgniteCheckedException If failed. */ - public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) throws IgniteCheckedException; + public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException; /** * Rebuilds all indexes of given type. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 7ce894d..77e493d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -388,27 +388,39 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param space Space name. + * @return Cache object context. + */ + private CacheObjectContext cacheObjectContext(String space) { + return ctx.cache().internalCache(space).context().cacheObjectContext(); + } + + /** * Writes key-value pair to index. * * @param space Space. * @param key Key. - * @param keyBytes Byte array with key data. * @param val Value. - * @param valBytes Byte array with value data. * @param ver Cache entry version. * @param expirationTime Expiration time or 0 if never expires. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("unchecked") - public void store(final String space, final K key, @Nullable byte[] keyBytes, final V val, - @Nullable byte[] valBytes, byte[] ver, long expirationTime) throws IgniteCheckedException { + public void store(final String space, final CacheObject key, final CacheObject val, + byte[] ver, long expirationTime) throws IgniteCheckedException { assert key != null; assert val != null; if (log.isDebugEnabled()) log.debug("Store [space=" + space + ", key=" + key + ", val=" + val + "]"); - ctx.indexing().store(space, key, val, expirationTime); + CacheObjectContext coctx = null; + + if (ctx.indexing().enabled()) { + coctx = cacheObjectContext(space); + + ctx.indexing().store(space, key.value(coctx, false), val.value(coctx, false), expirationTime); + } if (idx == null) return; @@ -417,7 +429,10 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to write to index (grid is stopping)."); try { - final Class valCls = val.getClass(); + if (coctx == null) + coctx = cacheObjectContext(space); + + Class valCls = null; TypeId id; @@ -428,8 +443,11 @@ public class GridQueryProcessor extends GridProcessorAdapter { id = new TypeId(space, typeId); } - else + else { + valCls = val.value(coctx, false).getClass(); + id = new TypeId(space, valCls); + } TypeDescriptor desc = types.get(id); @@ -441,9 +459,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { "(multiple classes with same simple name are stored in the same cache) " + "[expCls=" + desc.valueClass().getName() + ", actualCls=" + valCls.getName() + ']'); - if (!ctx.cacheObjects().isPortableObject(key) && !desc.keyClass().isAssignableFrom(key.getClass())) - throw new IgniteCheckedException("Failed to update index, incorrect key class [expCls=" + - desc.keyClass().getName() + ", actualCls=" + key.getClass().getName() + "]"); + if (!ctx.cacheObjects().isPortableObject(key)) { + Class keyCls = key.value(coctx, false).getClass(); + + if (!desc.keyClass().isAssignableFrom(keyCls)) + throw new IgniteCheckedException("Failed to update index, incorrect key class [expCls=" + + desc.keyClass().getName() + ", actualCls=" + keyCls.getName() + "]"); + } idx.store(space, desc, key, val, ver, expirationTime); } @@ -684,13 +706,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("unchecked") - public void remove(String space, Object key, Object val) throws IgniteCheckedException { + public void remove(String space, CacheObject key, CacheObject val) throws IgniteCheckedException { assert key != null; if (log.isDebugEnabled()) log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]"); - ctx.indexing().remove(space, key); + if (ctx.indexing().enabled()) { + CacheObjectContext coctx = cacheObjectContext(space); + + ctx.indexing().remove(space, key.value(coctx, false)); + } if (idx == null) return; @@ -792,11 +818,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param key key. * @throws IgniteCheckedException If failed. */ - public void onSwap(String spaceName, Object key) throws IgniteCheckedException { + public void onSwap(String spaceName, CacheObject key) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Swap [space=" + spaceName + ", key=" + key + "]"); - ctx.indexing().onSwap(spaceName, key); + if (ctx.indexing().enabled()) { + CacheObjectContext coctx = cacheObjectContext(spaceName); + + ctx.indexing().onSwap(spaceName, key.value(coctx, false)); + } if (idx == null) return; @@ -818,15 +848,18 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param spaceName Space name. * @param key Key. * @param val Value. - * @param valBytes Value bytes. * @throws IgniteCheckedException If failed. */ - public void onUnswap(String spaceName, Object key, Object val, byte[] valBytes) + public void onUnswap(String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Unswap [space=" + spaceName + ", key=" + key + ", val=" + val + "]"); - ctx.indexing().onUnswap(spaceName, key, val); + if (ctx.indexing().enabled()) { + CacheObjectContext coctx = cacheObjectContext(spaceName); + + ctx.indexing().onUnswap(spaceName, key.value(coctx, false), val.value(coctx, false)); + } if (idx == null) return; @@ -835,7 +868,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to process swap event (grid is stopping)."); try { - idx.onUnswap(spaceName, key, val, valBytes); + idx.onUnswap(spaceName, key, val); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index b85a393..7c91ca4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -63,6 +63,7 @@ import java.io.*; import java.lang.reflect.*; import java.math.*; import java.sql.*; +import java.sql.Date; import java.text.*; import java.util.*; import java.util.concurrent.*; @@ -291,13 +292,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param tblToUpdate Table to update. * @throws IgniteCheckedException In case of error. */ - private void removeKey(@Nullable String spaceName, Object key, TableDescriptor tblToUpdate) + private void removeKey(@Nullable String spaceName, CacheObject key, TableDescriptor tblToUpdate) throws IgniteCheckedException { try { Collection tbls = tables(schema(spaceName)); + Class keyCls = getClass(objectContext(spaceName), key); + for (TableDescriptor tbl : tbls) { - if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(key.getClass())) { + if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(keyCls)) { if (tbl.tbl.update(key, null, 0, true)) { if (tbl.luceneIdx != null) tbl.luceneIdx.remove(key); @@ -350,8 +353,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object k, Object v, byte[] ver, - long expirationTime) throws IgniteCheckedException { + @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject k, CacheObject v, + byte[] ver, long expirationTime) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); removeKey(spaceName, k, tbl); @@ -368,14 +371,46 @@ public class IgniteH2Indexing implements GridQueryIndexing { tbl.luceneIdx.store(k, v, ver, expirationTime); } + /** + * @param o Object. + * @return {@code true} If it is a portable object. + */ + private boolean isPortable(CacheObject o) { + return ctx.cacheObjects().isPortableObject(o); + } + + /** + * @param coctx Cache object context. + * @param o Object. + * @return Object class. + */ + private Class getClass(CacheObjectContext coctx, CacheObject o) { + return isPortable(o) ? + Object.class : + o.value(coctx, false).getClass(); + } + + /** + * @param space Space. + * @return Cache object context. + */ + private CacheObjectContext objectContext(String space) { + return ctx.cache().internalCache(space).context().cacheObjectContext(); + } + /** {@inheritDoc} */ - @Override public void remove(@Nullable String spaceName, Object key, Object val) throws IgniteCheckedException { + @Override public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException { if (log.isDebugEnabled()) - log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']'); + log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']'); + + CacheObjectContext coctx = objectContext(spaceName); + + Class keyCls = getClass(coctx, key); + Class valCls = val == null ? null : getClass(coctx, val); for (TableDescriptor tbl : tables(schema(spaceName))) { - if (tbl.type().keyClass().isAssignableFrom(key.getClass()) - && (val == null || tbl.type().valueClass().isAssignableFrom(val.getClass()))) { + if (tbl.type().keyClass().isAssignableFrom(keyCls) + && (val == null || tbl.type().valueClass().isAssignableFrom(valCls))) { if (tbl.tbl.update(key, val, 0, true)) { if (tbl.luceneIdx != null) tbl.luceneIdx.remove(key); @@ -387,14 +422,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException { + @Override public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException { Schema schema = schemas.get(schema(spaceName)); if (schema == null) return; + Class keyCls = getClass(objectContext(spaceName), key); + for (TableDescriptor tbl : schema.tbls.values()) { - if (tbl.type().keyClass().isAssignableFrom(key.getClass())) { + if (tbl.type().keyClass().isAssignableFrom(keyCls)) { try { if (tbl.tbl.onSwap(key)) return; @@ -407,13 +444,18 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + @Override public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException { assert val != null; + CacheObjectContext coctx = objectContext(spaceName); + + Class keyCls = getClass(coctx, key); + Class valCls = getClass(coctx, val); + for (TableDescriptor tbl : tables(schema(spaceName))) { - if (tbl.type().keyClass().isAssignableFrom(key.getClass()) - && tbl.type().valueClass().isAssignableFrom(val.getClass())) { + if (tbl.type().keyClass().isAssignableFrom(keyCls) + && tbl.type().valueClass().isAssignableFrom(valCls)) { try { if (tbl.tbl.onUnswap(key, val)) return; @@ -514,7 +556,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { meta = meta(rs.getMetaData()); } catch (SQLException e) { - throw new IgniteSpiException("Failed to get meta data.", e); + throw new IgniteCheckedException("Failed to get meta data.", e); } } @@ -1114,6 +1156,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { log.debug("Starting cache query index..."); System.setProperty("h2.serializeJavaObject", "false"); + System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching. if (SysProperties.serializeJavaObject) { U.warn(log, "Serialization of Java objects in H2 was enabled."); @@ -1551,7 +1594,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (type().valueClass() == String.class) { try { - luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true); + luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type); } catch (IgniteCheckedException e1) { throw new IgniteException(e1); @@ -1564,7 +1607,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (idx.type() == FULLTEXT) { try { - luceneIdx = new GridLuceneIndex(ctx, marshaller, schema.offheap, schema.spaceName, type, true); + luceneIdx = new GridLuceneIndex(ctx, schema.offheap, schema.spaceName, type); } catch (IgniteCheckedException e1) { throw new IgniteException(e1); @@ -1906,8 +1949,87 @@ public class IgniteH2Indexing implements GridQueryIndexing { return IgniteH2Indexing.this; } + /** + * Wraps object to respective {@link Value}. + * + * @param obj Object. + * @param type Value type. + * @return Value. + * @throws IgniteCheckedException If failed. + */ + public Value wrap(Object obj, int type) throws IgniteCheckedException { + assert obj != null; + + CacheObjectContext coctx = null; + + CacheObject co = null; + + if (obj instanceof CacheObject) { // Unwrap cache object. + co = (CacheObject)obj; + + obj = co.value(coctx = objectContext(schema.spaceName), false); + } + + switch (type) { + case Value.BOOLEAN: + return ValueBoolean.get((Boolean)obj); + case Value.BYTE: + return ValueByte.get((Byte)obj); + case Value.SHORT: + return ValueShort.get((Short)obj); + case Value.INT: + return ValueInt.get((Integer)obj); + case Value.FLOAT: + return ValueFloat.get((Float)obj); + case Value.LONG: + return ValueLong.get((Long)obj); + case Value.DOUBLE: + return ValueDouble.get((Double)obj); + case Value.UUID: + UUID uuid = (UUID)obj; + return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + case Value.DATE: + return ValueDate.get((Date)obj); + case Value.TIME: + return ValueTime.get((Time)obj); + case Value.TIMESTAMP: + if (obj instanceof java.util.Date && !(obj instanceof Timestamp)) + obj = new Timestamp(((java.util.Date) obj).getTime()); + + return GridH2Utils.toValueTimestamp((Timestamp)obj); + case Value.DECIMAL: + return ValueDecimal.get((BigDecimal)obj); + case Value.STRING: + return ValueString.get(obj.toString()); + case Value.BYTES: + return ValueBytes.get((byte[])obj); + case Value.JAVA_OBJECT: + return ValueJavaObject.getNoCopy(obj, + co != null && co.hasValueBytes() ? co.valueBytes(coctx) : null, + null); + + case Value.ARRAY: + Object[] arr = (Object[])obj; + + Value[] valArr = new Value[arr.length]; + + for (int i = 0; i < arr.length; i++) { + Object o = arr[i]; + + valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass())); + } + + return ValueArray.get(valArr); + + case Value.GEOMETRY: + return ValueGeometry.getFromGeometry(obj); + } + + throw new IgniteCheckedException("Failed to wrap value[type=" + type + ", value=" + obj + "]"); + } + /** {@inheritDoc} */ - @Override public GridH2Row createRow(Object key, @Nullable Object val, long expirationTime) + @Override public GridH2Row createRow(CacheObject key, @Nullable CacheObject val, long expirationTime) throws IgniteCheckedException { try { if (val == null) // Only can happen for remove operation, can create simple search row. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java index 4a0809a..2ce91cf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -20,17 +20,13 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.*; import org.h2.message.*; import org.h2.result.*; import org.h2.value.*; import org.jetbrains.annotations.*; import java.lang.ref.*; -import java.math.*; -import java.sql.Date; import java.sql.*; -import java.util.*; /** * Table row implementation based on {@link GridQueryTypeDescriptor}. @@ -61,12 +57,16 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { * @param val Value. * @param valType Value type. * @param expirationTime Expiration time. - * @throws IgniteSpiException If failed. + * @throws IgniteCheckedException If failed. */ protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, - int valType, long expirationTime) throws IgniteSpiException { - super(wrap(key, keyType), - val == null ? null : wrap(val, valType)); // We remove by key only, so value can be null here. + int valType, long expirationTime) throws IgniteCheckedException { + super(null, null); + + setValue(KEY_COL, desc.wrap(key, keyType)); + + if (val != null) // We remove by key only, so value can be null here. + setValue(VAL_COL, desc.wrap(val, valType)); this.desc = desc; this.expirationTime = expirationTime; @@ -84,72 +84,6 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { } /** - * Wraps object to respective {@link Value}. - * - * @param obj Object. - * @param type Value type. - * @return Value. - * @throws IgniteSpiException If failed. - */ - public static Value wrap(Object obj, int type) throws IgniteSpiException { - assert obj != null; - - switch (type) { - case Value.BOOLEAN: - return ValueBoolean.get((Boolean)obj); - case Value.BYTE: - return ValueByte.get((Byte)obj); - case Value.SHORT: - return ValueShort.get((Short)obj); - case Value.INT: - return ValueInt.get((Integer)obj); - case Value.FLOAT: - return ValueFloat.get((Float)obj); - case Value.LONG: - return ValueLong.get((Long)obj); - case Value.DOUBLE: - return ValueDouble.get((Double)obj); - case Value.UUID: - UUID uuid = (UUID)obj; - return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - case Value.DATE: - return ValueDate.get((Date)obj); - case Value.TIME: - return ValueTime.get((Time)obj); - case Value.TIMESTAMP: - if (obj instanceof java.util.Date && !(obj instanceof Timestamp)) - obj = new Timestamp(((java.util.Date) obj).getTime()); - - return GridH2Utils.toValueTimestamp((Timestamp)obj); - case Value.DECIMAL: - return ValueDecimal.get((BigDecimal)obj); - case Value.STRING: - return ValueString.get(obj.toString()); - case Value.BYTES: - return ValueBytes.get((byte[])obj); - case Value.JAVA_OBJECT: - return ValueJavaObject.getNoCopy(obj, null, null); - case Value.ARRAY: - Object[] arr = (Object[])obj; - - Value[] valArr = new Value[arr.length]; - - for (int i = 0; i < arr.length; i++) { - Object o = arr[i]; - - valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass())); - } - - return ValueArray.get(valArr); - - case Value.GEOMETRY: - return ValueGeometry.getFromGeometry(obj); - } - - throw new IgniteSpiException("Failed to wrap value[type=" + type + ", value=" + obj + "]"); - } - - /** * @return Expiration time of respective cache entry. */ public long expirationTime() { @@ -164,7 +98,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { /** * Should be called to remove reference on value. * - * @throws IgniteSpiException If failed. + * @throws IgniteCheckedException If failed. */ public synchronized void onSwap() throws IgniteCheckedException { setValue(VAL_COL, null); @@ -178,7 +112,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { * @throws IgniteCheckedException If failed. */ public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException { - setValue(VAL_COL, wrap(val, desc.valueType())); + setValue(VAL_COL, desc.wrap(val, desc.valueType())); notifyAll(); } @@ -258,7 +192,7 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { Object valObj = desc.readFromSwap(k); if (valObj != null) { - Value upd = wrap(valObj, desc.valueType()); + Value upd = desc.wrap(valObj, desc.valueType()); v = updateWeakValue(upd); @@ -317,9 +251,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { return ValueNull.INSTANCE; try { - return wrap(res, desc.fieldType(col)); + return desc.wrap(res, desc.fieldType(col)); } - catch (IgniteSpiException e) { + catch (IgniteCheckedException e) { throw DbException.convert(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java index 9c2c1b2..c47f122 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.offheap.unsafe.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.*; import org.h2.store.*; import org.h2.value.*; import org.jetbrains.annotations.*; @@ -87,10 +86,10 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { * @param val Value. * @param valType Value type. * @param expirationTime Expiration time. - * @throws IgniteSpiException If failed. + * @throws IgniteCheckedException If failed. */ public GridH2KeyValueRowOffheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType, - long expirationTime) throws IgniteSpiException { + long expirationTime) throws IgniteCheckedException { super(desc, key, keyType, val, valType, expirationTime); } @@ -247,7 +246,7 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { Value v = peekValue(VAL_COL); if (v == null) { - setValue(VAL_COL, wrap(val, desc.valueType())); + setValue(VAL_COL, desc.wrap(val, desc.valueType())); v = peekValue(VAL_COL); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java index e998d9b..ee7c79e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import org.apache.ignite.spi.*; +import org.apache.ignite.*; import org.h2.value.*; import org.jetbrains.annotations.*; @@ -35,10 +35,10 @@ public class GridH2KeyValueRowOnheap extends GridH2AbstractKeyValueRow { * @param val Value. * @param valType Value type. * @param expirationTime Expiration time. - * @throws IgniteSpiException If failed. + * @throws IgniteCheckedException If failed. */ public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, int valType, - long expirationTime) throws IgniteSpiException { + long expirationTime) throws IgniteCheckedException { super(desc, key, keyType, val, valType, expirationTime); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index c5f9551..a7c690e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.query.h2.opt; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.h2.value.*; import org.jetbrains.annotations.*; /** @@ -40,7 +42,7 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory curr; + /** */ + private CacheObjectContext coctx; + /** * Constructor. * @@ -313,6 +319,8 @@ public class GridLuceneIndex implements Closeable { this.docs = docs; this.filters = filters; + coctx = objectContext(); + findNext(); } @@ -328,10 +336,22 @@ public class GridLuceneIndex implements Closeable { } /** + * @param bytes Bytes. + * @param ldr Class loader. + * @return Object. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException { + return (Z)coctx.processor().unmarshal(coctx, bytes, ldr); + } + + /** * Finds next element. * * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("unchecked") private void findNext() throws IgniteCheckedException { curr = null; @@ -345,26 +365,22 @@ public class GridLuceneIndex implements Closeable { throw new IgniteCheckedException(e); } - String keyStr = doc.get(KEY_FIELD_NAME); - ClassLoader ldr = null; if (ctx != null && ctx.deploy().enabled()) ldr = ctx.cache().internalCache(spaceName).context().deploy().globalLoader(); - K k = marshaller.unmarshal(org.apache.commons.codec.binary.Base64.decodeBase64(keyStr), ldr); + K k = unmarshall(org.apache.commons.codec.binary.Base64.decodeBase64(doc.get(KEY_FIELD_NAME)), ldr); - byte[] valBytes = doc.getBinaryValue(VAL_FIELD_NAME); + V v = type.valueClass() == String.class ? + (V)doc.get(VAL_STR_FIELD_NAME) : + this.unmarshall(doc.getBinaryValue(VAL_FIELD_NAME), ldr); - V v = valBytes != null ? marshaller.unmarshal(valBytes, ldr) : - type.valueClass() == String.class ? - (V)doc.get(VAL_STR_FIELD_NAME): null; + assert v != null; if (!filter(k, v)) continue; -// byte[] ver = doc.getBinaryValue(VER_FIELD_NAME); TODO rm version - curr = new IgniteBiTuple<>(k, v); break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java index 51fc92d..8acc6b0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@ -117,15 +117,15 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes * */ private static class FakeIndexing extends IgniteH2Indexing { - @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteCheckedException { + @Override public void onSwap(@Nullable String spaceName, CacheObject key) throws IgniteCheckedException { super.onSwap(spaceName, key); idxSwapCnt.incrementAndGet(); } - @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + @Override public void onUnswap(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException { - super.onUnswap(spaceName, key, val, valBytes); + super.onUnswap(spaceName, key, val); idxUnswapCnt.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a3a91d45/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index e9a9ad4..ba6912a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -19,14 +19,18 @@ package org.apache.ignite.internal.processors.query.h2; import org.apache.ignite.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; +import java.nio.*; import java.util.*; import java.util.concurrent.*; @@ -108,14 +112,14 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract * @param age Age. * @return AA. */ - private Map aa(long id, String name, int age) { + private CacheObject aa(long id, String name, int age) { Map map = new HashMap<>(); map.put("id", id); map.put("name", name); map.put("age", age); - return map; + return new TestCacheObject(map); } /** @@ -125,12 +129,12 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract * @param txt Text. * @return AB. */ - private Map ab(long id, String name, int age, String txt) { - Map map = aa(id, name, age); + private CacheObject ab(long id, String name, int age, String txt) { + Map map = aa(id, name, age).value(null, false); map.put("txt", txt); - return map; + return new CacheObjectImpl(map, null); } /** @@ -140,12 +144,12 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract * @param sex Sex. * @return BA. */ - private Map ba(long id, String name, int age, boolean sex) { - Map map = aa(id, name, age); + private CacheObject ba(long id, String name, int age, boolean sex) { + Map map = aa(id, name, age).value(null, false); map.put("sex", sex); - return map; + return new CacheObjectImpl(map, null); } /** @@ -169,6 +173,14 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract } /** + * @param key Key. + * @return Cache object. + */ + private CacheObject key(int key) { + return new TestCacheObject(key); + } + + /** * @throws Exception If failed. */ public void testSpi() throws Exception { @@ -202,16 +214,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(spi.query(typeBA.space(), "select * from B.A", Collections.emptySet(), typeBA, null).hasNext()); // Nothing to remove. - spi.remove("A", 1, aa(1, "", 10)); - spi.remove("B", 1, ba(1, "", 10, true)); + spi.remove("A", key(1), aa(1, "", 10)); + spi.remove("B", key(1), ba(1, "", 10, true)); - spi.store(typeAA.space(), typeAA, 1, aa(1, "Vasya", 10), "v1".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(1), aa(1, "Vasya", 10), "v1".getBytes(), 0); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(0, spi.size(typeAB.space(), typeAB, null)); assertEquals(0, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAB.space(), typeAB, 1, ab(1, "Vasya", 20, "Some text about Vasya goes here."), + spi.store(typeAB.space(), typeAB, key(1), ab(1, "Vasya", 20, "Some text about Vasya goes here."), "v2".getBytes(), 0); // In one space all keys must be unique. @@ -219,33 +231,33 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(0, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeBA.space(), typeBA, 1, ba(2, "Petya", 25, true), "v3".getBytes(), 0); + spi.store(typeBA.space(), typeBA, key(1), ba(2, "Petya", 25, true), "v3".getBytes(), 0); // No replacement because of different space. assertEquals(0, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeBA.space(), typeBA, 1, ba(2, "Kolya", 25, true), "v4".getBytes(), 0); + spi.store(typeBA.space(), typeBA, key(1), ba(2, "Kolya", 25, true), "v4".getBytes(), 0); // Replacement in the same table. assertEquals(0, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAA.space(), typeAA, 2, aa(2, "Valera", 19), "v5".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(2), aa(2, "Valera", 19), "v5".getBytes(), 0); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAA.space(), typeAA, 3, aa(3, "Borya", 18), "v6".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(3), aa(3, "Borya", 18), "v6".getBytes(), 0); assertEquals(2, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAB.space(), typeAB, 4, ab(4, "Vitalya", 20, "Very Good guy"), "v7".getBytes(), 0); + spi.store(typeAB.space(), typeAB, key(4), ab(4, "Vitalya", 20, "Very Good guy"), "v7".getBytes(), 0); assertEquals(2, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); @@ -307,13 +319,13 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(fieldsRes.iterator().hasNext()); // Remove - spi.remove(typeAA.space(), 2, aa(2, "Valera", 19)); + spi.remove(typeAA.space(), key(2), aa(2, "Valera", 19)); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.remove(typeBA.space(), 1, ba(2, "Kolya", 25, true)); + spi.remove(typeBA.space(), key(1), ba(2, "Kolya", 25, true)); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); @@ -352,7 +364,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract spi.unregisterType(typeBA.space(), typeBA); // Should not store but should not fail as well. - spi.store(typeAA.space(), typeAA, 10, aa(1, "Fail", 100500), "v220".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(10), aa(1, "Fail", 100500), "v220".getBytes(), 0); assertEquals(-1, spi.size(typeAA.space(), typeAA, null)); } @@ -526,4 +538,73 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract return textIdx == null; } } + + /** + */ + private static class TestCacheObject implements CacheObject { + /** */ + private Object val; + + /** + * @param val Value. + */ + private TestCacheObject(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Nullable @Override public T value(CacheObjectContext ctx, boolean cpy) { + return (T)val; + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean hasValueBytes() { + return false; + } + + /** {@inheritDoc} */ + @Override public byte type() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + throw new UnsupportedOperationException(); + } + } }