Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9112F200B49 for ; Wed, 3 Aug 2016 10:16:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8FD2F160A64; Wed, 3 Aug 2016 08:16:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1D1B0160AB5 for ; Wed, 3 Aug 2016 10:16:33 +0200 (CEST) Received: (qmail 73659 invoked by uid 500); 3 Aug 2016 08:16:33 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 73458 invoked by uid 99); 3 Aug 2016 08:16:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Aug 2016 08:16:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E64F9EE696; Wed, 3 Aug 2016 08:16:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 03 Aug 2016 08:16:38 -0000 Message-Id: <89a160d33a114206ba5a2db6e224b77e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/10] ignite git commit: IGNITE-3513 Cleanup worker is placed in the Thread's waiting queue using Thread.sleep method archived-at: Wed, 03 Aug 2016 08:16:35 -0000 IGNITE-3513 Cleanup worker is placed in the Thread's waiting queue using Thread.sleep method Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f27a47b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f27a47b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f27a47b Branch: refs/heads/master Commit: 4f27a47b81314b2eb52a5bc5b1d938bb586ae2aa Parents: 7b3d196 Author: EdShangGG Authored: Mon Aug 1 20:25:54 2016 +0300 Committer: EdShangGG Committed: Mon Aug 1 20:25:54 2016 +0300 ---------------------------------------------------------------------- modules/benchmarks/pom.xml | 4 +- .../jmh/notify/JmhParkVsNotifyBenchmark.java | 105 ++++++++ .../jmh/notify/JmhWaitStategyBenchmark.java | 259 +++++++++++++++++++ .../processors/cache/GridCacheTtlManager.java | 65 ++++- .../GridCacheTtlManagerNotificationTest.java | 202 +++++++++++++++ .../IgniteCacheExpiryPolicyTestSuite.java | 3 + 6 files changed, 626 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/pom.xml ---------------------------------------------------------------------- diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml index a7d823d..00315a8 100644 --- a/modules/benchmarks/pom.xml +++ b/modules/benchmarks/pom.xml @@ -36,8 +36,8 @@ UTF-8 - 1.11.3 - 1.6 + 1.13 + 1.7 benchmarks http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java new file mode 100644 index 0000000..b85f6d8 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.notify; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +/** */ +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) +@Fork(1) +@State(Scope.Group) +public class JmhParkVsNotifyBenchmark { + /** level of concurrency */ + private static final int THREAD_COUNT = 16; + + /** Thread. */ + private volatile Thread thread; + + /** + * + */ + @Setup(Level.Iteration) + public void setup() { + thread = null; + } + + /** + * + */ + @Benchmark + @Group("park") + public void park() { + if (thread == null) + thread = Thread.currentThread(); + + LockSupport.park(thread); + } + + /** + * + */ + @Benchmark + @GroupThreads(THREAD_COUNT) + @Group("park") + public void unpark() { + LockSupport.unpark(thread); + } + + /** Mutex. */ + private final Object mux = new Object(); + + /** + * + */ + @Benchmark + @Group("condition") + @GroupThreads(THREAD_COUNT) + public void notifyAll0() { + synchronized (mux) { + mux.notify(); + } + } + + /** + * + */ + @Benchmark + @Group("condition") + public void wait0() throws InterruptedException { + synchronized (mux) { + mux.wait(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java new file mode 100644 index 0000000..4a7ee23 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.notify; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.cache.configuration.Factory; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.benchmarks.jmh.cache.JmhCacheAbstractBenchmark; +import org.apache.ignite.internal.benchmarks.model.IntValue; +import org.jsr166.ThreadLocalRandom8; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.BenchmarkParams; +import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +/** + * + */ +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS) +@Fork(1) +@State(Scope.Benchmark) +public class JmhWaitStategyBenchmark extends JmhCacheAbstractBenchmark { + /** */ + private static class RandomExpiryPolicy implements ExpiryPolicy { + /** rate duration will decrease with */ + private final double rate; + + /** current duration. */ + private final AtomicLong duration = new AtomicLong(1_000_000_000); + + /** */ + RandomExpiryPolicy(double rate) { + this.rate = rate; + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForCreation() { + boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate; + return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) : + new Duration(TimeUnit.MILLISECONDS, duration.get()); + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForAccess() { + boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate; + return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) : + new Duration(TimeUnit.MILLISECONDS, duration.get()); + } + + /** {@inheritDoc} */ + @Override public Duration getExpiryForUpdate() { + boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate; + return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()) : + new Duration(TimeUnit.MILLISECONDS, duration.get()); + } + } + + /** @param rate duration will decrease with */ + private static Factory getExpiryPolicyFactoryWithDecreasingRate(final double rate) { + return new Factory() { + @Override public ExpiryPolicy create() { + return new RandomExpiryPolicy(rate); + } + }; + } + + /** Decreasing expiry policy. */ + private static final ExpiryPolicy DECREASING_EXPIRY_POLICY = new ExpiryPolicy() { + AtomicLong duration = new AtomicLong(1_000_000_000); + + @Override public Duration getExpiryForCreation() { + return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()); + } + + @Override public Duration getExpiryForAccess() { + return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()); + } + + @Override public Duration getExpiryForUpdate() { + return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet()); + } + }; + + /** Increasing expiry policy. */ + private static final ExpiryPolicy INCREASING_EXPIRY_POLICY = new ExpiryPolicy() { + AtomicLong duration = new AtomicLong(1_000_000); + + @Override public Duration getExpiryForCreation() { + return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet()); + } + + @Override public Duration getExpiryForAccess() { + return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet()); + } + + @Override public Duration getExpiryForUpdate() { + return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet()); + } + }; + + /** Decreasing policy factory. */ + private final static Factory DECREASING_POLICY_FACTORY = new Factory() { + @Override public ExpiryPolicy create() { + return DECREASING_EXPIRY_POLICY; + } + }; + + /** Increasing policy factory. */ + private final static Factory INCREASING_POLICY_FACTORY = new Factory() { + @Override public ExpiryPolicy create() { + return INCREASING_EXPIRY_POLICY; + } + }; + + /** {@inheritDoc} */ + @Setup (Level.Iteration) + @Override public void setup() throws Exception { + Ignition.stopAll(true); + + super.setup(); + + CacheConfiguration cfg = new CacheConfiguration<>(); + cfg.setName("cache"); + cfg.setEagerTtl(true); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + String prop = System.getProperty("bench.exp.policy"); + + switch (prop) { + case "inc": + cfg.setExpiryPolicyFactory(INCREASING_POLICY_FACTORY); + break; + case "dec": + cfg.setExpiryPolicyFactory(DECREASING_POLICY_FACTORY); + break; + default: + assert prop.charAt(0) == 'r'; + double rate = Double.parseDouble(prop.trim().substring(1)) / 100; + cfg.setExpiryPolicyFactory(getExpiryPolicyFactoryWithDecreasingRate(rate)); + break; + } + + node.createCache(cfg); + + cache = node.getOrCreateCache("cache"); + + IgniteDataStreamer dataLdr = node.dataStreamer(cache.getName()); + + for (int i = 0; i < CNT; i++) + dataLdr.addData(i, new IntValue(i)); + + dataLdr.close(); + + System.out.println("Cache populated."); + } + + /** {@inheritDoc} */ + @TearDown + public void tearDown() throws Exception { + Ignition.stopAll(true); + } + + /** + * Test PUT operation. + * + * @throws Exception If failed. + */ + @Benchmark + public void put() throws Exception { + int key = ThreadLocalRandom.current().nextInt(CNT); + + cache.put(key, new IntValue(key)); + } + + /** + * Benchmark runner + */ + public static void main(String[] args) throws RunnerException { + List policies = Arrays.asList("inc", "dec", "r25", "r50", "r75"); + int[] threads = {2, 4, 8, 16, 32}; + + List results = new ArrayList<>(); + + for (String policy : policies) { + for (int thread : threads) { + ChainedOptionsBuilder builder = new OptionsBuilder() + .jvmArgs() + .timeUnit(TimeUnit.MILLISECONDS) + .measurementIterations(10) + .measurementTime(TimeValue.seconds(20)) + .warmupIterations(5) + .warmupTime(TimeValue.seconds(10)) + .jvmArgs("-Dbench.exp.policy=" + policy) + .forks(1) + .threads(thread) + .mode(Mode.Throughput) + .include(JmhWaitStategyBenchmark.class.getSimpleName()); + + results.addAll(new Runner(builder.build()).run()); + } + } + + for (RunResult result : results) { + BenchmarkParams params = result.getParams(); + Collection args1 = params.getJvmArgs(); + for (String s : args1) { + System.out.print(s.substring(s.length() -3, s.length())); + System.out.print(" x "); + } + System.out.print(params.getThreads()); + System.out.print("\t\t"); + System.out.println(result.getPrimaryResult().toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 3e4561b..ae2895e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; @@ -41,9 +42,19 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx(); - /** Cleanup worker thread. */ + /** Cleanup worker. */ private CleanupWorker cleanupWorker; + /** Mutex. */ + private final Object mux = new Object(); + + /** Next expire time. */ + private volatile long nextExpireTime; + + /** Next expire time updater. */ + private static final AtomicLongFieldUpdater nextExpireTimeUpdater = + AtomicLongFieldUpdater.newUpdater(GridCacheTtlManager.class, "nextExpireTime"); + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { boolean cleanupDisabled = cctx.kernalContext().isDaemon() || @@ -80,7 +91,24 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { assert Thread.holdsLock(entry); assert cleanupWorker != null; - pendingEntries.add(new EntryWrapper(entry)); + EntryWrapper e = new EntryWrapper(entry); + + pendingEntries.add(e); + + while (true) { + long nextExpireTime = this.nextExpireTime; + + if (e.expireTime < nextExpireTime) { + if (nextExpireTimeUpdater.compareAndSet(this, nextExpireTime, e.expireTime)) { + synchronized (mux) { + mux.notifyAll(); + } + + break; + } + } else + break; + } } /** @@ -159,7 +187,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** * Creates cleanup worker. */ - protected CleanupWorker() { + CleanupWorker() { super(cctx.gridName(), "ttl-cleanup-worker-" + cctx.name(), cctx.logger(GridCacheTtlManager.class)); } @@ -168,16 +196,33 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { while (!isCancelled()) { expire(); - EntryWrapper first = pendingEntries.firstx(); + long waitTime; - if (first != null) { - long waitTime = first.expireTime - U.currentTimeMillis(); + while (true) { + long curTime = U.currentTimeMillis(); + + GridCacheTtlManager.EntryWrapper first = pendingEntries.firstx(); - if (waitTime > 0) - U.sleep(waitTime); + if (first == null) { + waitTime = 500; + nextExpireTime = curTime + 500; + } + else { + long expireTime = first.expireTime; + + waitTime = expireTime - curTime; + nextExpireTime = expireTime; + } + + synchronized (mux) { + if (pendingEntries.firstx() == first) { + if (waitTime > 0) + mux.wait(waitTime); + + break; + } + } } - else - U.sleep(500); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java new file mode 100644 index 0000000..85a491e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.eclipse.jetty.util.BlockingArrayQueue; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * + */ +public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Test cache mode. */ + protected CacheMode cacheMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(cacheMode); + ccfg.setEagerTtl(true); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testThatNotificationWorkAsExpected() throws Exception { + try (final Ignite g = startGrid(0)) { + final BlockingArrayQueue queue = new BlockingArrayQueue<>(); + + g.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + queue.add(evt); + + return true; + } + }, EventType.EVT_CACHE_OBJECT_EXPIRED); + + final String key = "key"; + + IgniteCache cache = g.cache(null); + + ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100_000)); + + cache.withExpiryPolicy(plc1).put(key + 1, 1); + + Thread.sleep(1_000); // Cleaner should see entry. + + ExpiryPolicy plc2 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)); + + cache.withExpiryPolicy(plc2).put(key + 2, 1); + + assertNotNull(queue.poll(5, SECONDS)); // We should receive event about second entry expiration. + } + } + + /** + * Add in several threads value to cache with different expiration policy. + * Wait for expiration of keys with small expiration duration. + */ + public void testThatNotificationWorkAsExpectedInMultithreadedMode() throws Exception { + final CyclicBarrier barrier = new CyclicBarrier(21); + final AtomicInteger keysRangeGen = new AtomicInteger(); + final AtomicInteger evtCnt = new AtomicInteger(); + final int cnt = 1_000; + + try (final Ignite g = startGrid(0)) { + final IgniteCache cache = g.cache(null); + + g.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + evtCnt.incrementAndGet(); + + return true; + } + }, EventType.EVT_CACHE_OBJECT_EXPIRED); + + + int smallDuration = 2000; + + int threadCnt = 10; + + GridTestUtils.runMultiThreadedAsync( + new CacheFiller(cache, 100_000, barrier, keysRangeGen, cnt), + threadCnt, ""); + + GridTestUtils.runMultiThreadedAsync( + new CacheFiller(cache, smallDuration, barrier, keysRangeGen, cnt), + threadCnt, ""); + + barrier.await(); + + Thread.sleep(1_000); // Cleaner should see at least one entry. + + barrier.await(); + + assertEquals(2 * threadCnt * cnt, cache.size()); + + Thread.sleep(2 * smallDuration); + + assertEquals(threadCnt * cnt, cache.size()); + assertEquals(threadCnt * cnt, evtCnt.get()); + } + } + + /** */ + private static class CacheFiller implements Runnable { + /** Barrier. */ + private final CyclicBarrier barrier; + /** Keys range generator. */ + private final AtomicInteger keysRangeGenerator; + /** Count. */ + private final int cnt; + /** Cache. */ + private final IgniteCache cache; + /** Expiration duration. */ + private final int expirationDuration; + + /** + * @param cache Cache. + * @param expirationDuration Expiration duration. + * @param barrier Barrier. + * @param keysRangeGenerator Keys. + * @param cnt Count. + */ + CacheFiller(IgniteCache cache, int expirationDuration, CyclicBarrier barrier, + AtomicInteger keysRangeGenerator, int cnt) { + this.expirationDuration = expirationDuration; + this.barrier = barrier; + this.keysRangeGenerator = keysRangeGenerator; + this.cnt = cnt; + this.cache = cache; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + barrier.await(); + + ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, expirationDuration)); + int keyStart = keysRangeGenerator.getAndIncrement() * cnt; + + for (int i = keyStart; i < keyStart + cnt; i++) + cache.withExpiryPolicy(plc1).put("key" + i, 1); + + barrier.await(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java index 21935e5..28cb2da 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.expiry; import junit.framework.TestSuite; import org.apache.ignite.cache.store.IgniteCacheExpiryStoreLoadSelfTest; +import org.apache.ignite.internal.processors.cache.GridCacheTtlManagerNotificationTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerExpiredEventsTest; import org.apache.ignite.internal.processors.cache.IgniteCacheExpireAndUpdateConsistencyTest; @@ -71,6 +72,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class); + suite.addTestSuite(GridCacheTtlManagerNotificationTest.class); + return suite; } } \ No newline at end of file