Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C19E01743C for ; Wed, 24 Jun 2015 11:06:30 +0000 (UTC) Received: (qmail 91459 invoked by uid 500); 24 Jun 2015 11:06:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 91428 invoked by uid 500); 24 Jun 2015 11:06:30 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 91419 invoked by uid 99); 24 Jun 2015 11:06:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 11:06:30 +0000 X-ASF-Spam-Status: No, hits=-2001.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 24 Jun 2015 11:04:14 +0000 Received: (qmail 87402 invoked by uid 99); 24 Jun 2015 11:06:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 11:06:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4D1F4E0544; Wed, 24 Jun 2015 11:06:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anovikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 24 Jun 2015 11:06:28 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [29/50] incubator-ignite git commit: ignite-973 - swap manager fix + offheap processor signatures fix X-Virus-Checked: Checked by ClamAV on apache.org ignite-973 - swap manager fix + offheap processor signatures fix Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23512df8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23512df8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23512df8 Branch: refs/heads/master Commit: 23512df815c196f9a2292ad948b130380f123770 Parents: b23ea74 Author: S.Vladykin Authored: Fri Jun 19 13:31:27 2015 +0300 Committer: S.Vladykin Committed: Fri Jun 19 13:31:27 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheSwapManager.java | 12 +- .../offheap/GridOffHeapProcessor.java | 19 +- .../apache/ignite/internal/util/GridDebug.java | 37 ++-- .../cache/IgniteCacheOffheapEvictQueryTest.java | 179 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 1 + 5 files changed, 220 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index d0d9049..f709e03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -977,15 +977,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try offheap. if (offheapEnabled) { - byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false), - key.valueBytes(cctx.cacheObjectContext())); - - if(val != null && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onOffHeapRemove(); + // TODO Pass closure c to offheap.remove and apply it before the actual remove. + byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); if (val != null) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + if (c != null) - c.apply(val); // Probably we should read value and apply closure before removing... + c.apply(val); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java index a99c4c0..81bf9f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/offheap/GridOffHeapProcessor.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.offheap; import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.offheap.*; @@ -101,7 +102,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return Key bytes * @throws IgniteCheckedException If failed. */ - private byte[] keyBytes(Object key, @Nullable byte[] keyBytes) throws IgniteCheckedException { + private byte[] keyBytes(KeyCacheObject key, @Nullable byte[] keyBytes) throws IgniteCheckedException { assert key != null; return keyBytes != null ? keyBytes : marsh.marshal(key); @@ -130,7 +131,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return {@code true} If offheap space contains value for the given key. * @throws IgniteCheckedException If failed. */ - public boolean contains(@Nullable String spaceName, int part, Object key, byte[] keyBytes) + public boolean contains(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); @@ -147,7 +148,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return Value bytes. * @throws IgniteCheckedException If failed. */ - @Nullable public byte[] get(@Nullable String spaceName, int part, Object key, byte[] keyBytes) + @Nullable public byte[] get(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); @@ -166,7 +167,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return Tuple where first value is pointer and second is value size. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteBiTuple valuePointer(@Nullable String spaceName, int part, Object key, + @Nullable public IgniteBiTuple valuePointer(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); @@ -182,7 +183,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @param keyBytes Key bytes. * @throws IgniteCheckedException If failed. */ - public void enableEviction(@Nullable String spaceName, int part, Object key, byte[] keyBytes) + public void enableEviction(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); @@ -201,7 +202,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return Value bytes. * @throws IgniteCheckedException If failed. */ - @Nullable public T getValue(@Nullable String spaceName, int part, Object key, byte[] keyBytes, + @Nullable public T getValue(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes, @Nullable ClassLoader ldr) throws IgniteCheckedException { byte[] valBytes = get(spaceName, part, key, keyBytes); @@ -221,7 +222,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return Value bytes. * @throws IgniteCheckedException If failed. */ - @Nullable public byte[] remove(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException { + @Nullable public byte[] remove(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); return m == null ? null : m.remove(part, U.hash(key), keyBytes(key, keyBytes)); @@ -237,7 +238,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @param valBytes Value bytes. * @throws IgniteCheckedException If failed. */ - public void put(@Nullable String spaceName, int part, Object key, byte[] keyBytes, byte[] valBytes) + public void put(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes, byte[] valBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); @@ -258,7 +259,7 @@ public class GridOffHeapProcessor extends GridProcessorAdapter { * @return {@code true} If succeeded. * @throws IgniteCheckedException If failed. */ - public boolean removex(@Nullable String spaceName, int part, Object key, byte[] keyBytes) throws IgniteCheckedException { + public boolean removex(@Nullable String spaceName, int part, KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { GridOffHeapPartitionedMap m = offheap(spaceName); return m != null && m.removex(part, U.hash(key), keyBytes(key, keyBytes)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java index aadec74..98c8664 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -173,13 +174,6 @@ public class GridDebug { } /** - * Dump collected data to stdout. - */ - public static void dump() { - dump(que.get()); - } - - /** * Dumps given number of last events. * * @param n Number of last elements to dump. @@ -204,7 +198,7 @@ public class GridDebug { * @param que Queue. */ @SuppressWarnings("TypeMayBeWeakened") - public static void dump(ConcurrentLinkedQueue que) { + public static void dump(Collection que) { if (que == null) return; @@ -226,7 +220,7 @@ public class GridDebug { */ public static String dumpWithStop(Object... x) { debug(x); - return dumpWithReset(null); + return dumpWithReset(null, null); } /** @@ -235,16 +229,20 @@ public class GridDebug { * @return Empty string (useful for assertions like {@code assert x == 0 : D.dumpWithReset();} ). */ public static String dumpWithReset() { - return dumpWithReset(new ConcurrentLinkedQueue()); + return dumpWithReset(new ConcurrentLinkedQueue(), null); } /** * Dump existing queue to stdout and atomically replace it with given. * * @param q2 Queue. + * @param filter Filter for logged debug items. * @return Empty string. */ - private static String dumpWithReset(@Nullable ConcurrentLinkedQueue q2) { + public static String dumpWithReset( + @Nullable ConcurrentLinkedQueue q2, + @Nullable IgnitePredicate filter + ) { ConcurrentLinkedQueue q; do { @@ -255,7 +253,20 @@ public class GridDebug { } while (!que.compareAndSet(q, q2)); - dump(q); + Collection col = null; + + if (filter == null) + col = q; + else if (q != null) { + col = new ArrayList<>(); + + for (Item item : q) { + if (filter.apply(item)) + col.add(item); + } + } + + dump(col); return ""; } @@ -281,7 +292,7 @@ public class GridDebug { */ private static String formatEntry(long ts, String threadName, long threadId, Object... data) { return "<" + DEBUG_DATE_FMT.format(new Date(ts)) + "><~DBG~><" + threadName + " id:" + threadId + "> " + - Arrays.toString(data); + Arrays.deepToString(data); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java new file mode 100644 index 0000000..fc6c125 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java @@ -0,0 +1,179 @@ +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.inmemory.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + */ +public class IgniteCacheOffheapEvictQueryTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi()); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setSwapEnabled(true); + cacheCfg.setBackups(0); + cacheCfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED); + cacheCfg.setEvictionPolicy(null); + cacheCfg.setNearConfiguration(null); + + cacheCfg.setSqlOnheapRowCacheSize(128); + + cacheCfg.setIndexedTypes( + Integer.class, Integer.class + ); + + cacheCfg.setOffHeapMaxMemory(2000); // Small offheap for evictions from offheap to swap. + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testEvictAndRemove() throws Exception { + final int KEYS_CNT = 3000; + final int THREADS_CNT = 256; + + final IgniteCache c = startGrid().cache(null); + + for (int i = 0; i < KEYS_CNT; i++) { + c.put(i, i); + + if ((i & 1) == 0) + c.localEvict(F.asList(i)); + } + + X.println("___ Cache loaded..."); + + final CyclicBarrier b = new CyclicBarrier(THREADS_CNT, new Runnable() { + @Override public void run() { + X.println("___ go!"); + } + }); + + final AtomicInteger keys = new AtomicInteger(KEYS_CNT); + + IgniteInternalFuture fut = multithreadedAsync(new Runnable() { + @Override public void run() { + Random rnd = new GridRandom(); + + try { + b.await(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedException(e); + } + catch (BrokenBarrierException e) { + throw new IllegalStateException(e); + } + + while (keys.get() > 0) { + int k = rnd.nextInt(KEYS_CNT); + + try { + switch (rnd.nextInt(4)) { + case 0: + c.localEvict(F.asList(k)); + + break; + + case 1: + c.get(k); + + break; + + case 2: + if (c.remove(k)) + keys.decrementAndGet(); + + break; + + case 3: + c.query(new SqlFieldsQuery("select _val from Integer where _key between ? and ?") + .setArgs(k, k + 20).setLocal(true)).getAll(); + + break; + } + } + catch (CacheException e) { + String msgStart = "Failed to get value for key:"; + + for (Throwable th = e; th != null; th = th.getCause()) { + String msg = th.getMessage(); + + if (msg != null && msg.startsWith(msgStart)) { + int dot = msg.indexOf('.', msgStart.length()); + + assertTrue(dot != -1); + + final Integer failedKey = Integer.parseInt(msg.substring(msgStart.length(), dot).trim()); + + X.println("___ failed key: " + failedKey); + + break; + } + } + + LT.warn(log, null, e.getMessage()); + + return; + } + } + } + }, THREADS_CNT); + + try { + fut.get(60_000); + + if (c.size(CachePeekMode.ALL) != 0) + fail("Not all keys removed."); + + X.println("___ all keys removed"); + } + catch (IgniteFutureTimeoutCheckedException e) { + X.println("___ timeout"); + X.println("___ keys: " + keys.get()); + + keys.set(0); + + fut.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23512df8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index dee3078..b9205a9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -63,6 +63,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheQueryEvictsMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryOffheapMultiThreadedSelfTest.class); // suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); TODO IGNITE-971. + suite.addTestSuite(IgniteCacheOffheapEvictQueryTest.class); suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);