ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [07/10] ignite git commit: IGNITE-3513 Cleanup worker is placed in the Thread's waiting queue using Thread.sleep method
Date Wed, 03 Aug 2016 08:16:38 GMT
IGNITE-3513 Cleanup worker is placed in the Thread's waiting queue using Thread.sleep method


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f27a47b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f27a47b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f27a47b

Branch: refs/heads/master
Commit: 4f27a47b81314b2eb52a5bc5b1d938bb586ae2aa
Parents: 7b3d196
Author: EdShangGG <eshangareev@gridgain.com>
Authored: Mon Aug 1 20:25:54 2016 +0300
Committer: EdShangGG <eshangareev@gridgain.com>
Committed: Mon Aug 1 20:25:54 2016 +0300

----------------------------------------------------------------------
 modules/benchmarks/pom.xml                      |   4 +-
 .../jmh/notify/JmhParkVsNotifyBenchmark.java    | 105 ++++++++
 .../jmh/notify/JmhWaitStategyBenchmark.java     | 259 +++++++++++++++++++
 .../processors/cache/GridCacheTtlManager.java   |  65 ++++-
 .../GridCacheTtlManagerNotificationTest.java    | 202 +++++++++++++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   3 +
 6 files changed, 626 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/modules/benchmarks/pom.xml b/modules/benchmarks/pom.xml
index a7d823d..00315a8 100644
--- a/modules/benchmarks/pom.xml
+++ b/modules/benchmarks/pom.xml
@@ -36,8 +36,8 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <jmh.version>1.11.3</jmh.version>
-        <javac.target>1.6</javac.target>
+        <jmh.version>1.13</jmh.version>
+        <javac.target>1.7</javac.target>
         <uberjar.name>benchmarks</uberjar.name>
     </properties>
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
new file mode 100644
index 0000000..b85f6d8
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhParkVsNotifyBenchmark.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.notify;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+/** */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput})
+@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Group)
+public class JmhParkVsNotifyBenchmark {
+    /** level of concurrency */
+    private static final int THREAD_COUNT = 16;
+
+    /** Thread. */
+    private volatile Thread thread;
+
+    /**
+     *
+     */
+    @Setup(Level.Iteration)
+    public void setup() {
+        thread = null;
+    }
+
+    /**
+     *
+     */
+    @Benchmark
+    @Group("park")
+    public void park() {
+        if (thread == null)
+            thread = Thread.currentThread();
+
+        LockSupport.park(thread);
+    }
+
+    /**
+     *
+     */
+    @Benchmark
+    @GroupThreads(THREAD_COUNT)
+    @Group("park")
+    public void unpark() {
+        LockSupport.unpark(thread);
+    }
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /**
+     *
+     */
+    @Benchmark
+    @Group("condition")
+    @GroupThreads(THREAD_COUNT)
+    public void notifyAll0() {
+        synchronized (mux) {
+            mux.notify();
+        }
+    }
+
+    /**
+     *
+     */
+    @Benchmark
+    @Group("condition")
+    public void wait0() throws InterruptedException {
+        synchronized (mux) {
+            mux.wait();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
new file mode 100644
index 0000000..4a7ee23
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/notify/JmhWaitStategyBenchmark.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.notify;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.cache.JmhCacheAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.model.IntValue;
+import org.jsr166.ThreadLocalRandom8;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.BenchmarkParams;
+import org.openjdk.jmh.results.RunResult;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+
+/**
+ *
+ */
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode({/*Mode.AverageTime,*/ Mode.Throughput})
+@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(1)
+@State(Scope.Benchmark)
+public class JmhWaitStategyBenchmark extends JmhCacheAbstractBenchmark {
+    /** */
+    private static class RandomExpiryPolicy implements ExpiryPolicy {
+        /** rate duration will decrease with */
+        private final double rate;
+
+        /** current duration. */
+        private final AtomicLong duration = new AtomicLong(1_000_000_000);
+
+        /** */
+        RandomExpiryPolicy(double rate) {
+            this.rate = rate;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForCreation() {
+            boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+            return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet())
:
+                new Duration(TimeUnit.MILLISECONDS, duration.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForAccess() {
+            boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+            return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet())
:
+                new Duration(TimeUnit.MILLISECONDS, duration.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForUpdate() {
+            boolean generateEvt = ThreadLocalRandom8.current().nextDouble() < rate;
+            return generateEvt ? new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet())
:
+                new Duration(TimeUnit.MILLISECONDS, duration.get());
+        }
+    }
+
+    /** @param rate duration will decrease with */
+    private static Factory<ExpiryPolicy> getExpiryPolicyFactoryWithDecreasingRate(final
double rate) {
+        return new Factory<ExpiryPolicy>() {
+            @Override public ExpiryPolicy create() {
+                return new RandomExpiryPolicy(rate);
+            }
+        };
+    }
+
+    /** Decreasing expiry policy. */
+    private static final ExpiryPolicy DECREASING_EXPIRY_POLICY = new ExpiryPolicy() {
+        AtomicLong duration = new AtomicLong(1_000_000_000);
+
+        @Override public Duration getExpiryForCreation() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+        }
+
+        @Override public Duration getExpiryForAccess() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+        }
+
+        @Override public Duration getExpiryForUpdate() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.decrementAndGet());
+        }
+    };
+
+    /** Increasing expiry policy. */
+    private static final ExpiryPolicy INCREASING_EXPIRY_POLICY = new ExpiryPolicy() {
+        AtomicLong duration = new AtomicLong(1_000_000);
+
+        @Override public Duration getExpiryForCreation() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+        }
+
+        @Override public Duration getExpiryForAccess() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+        }
+
+        @Override public Duration getExpiryForUpdate() {
+            return new Duration(TimeUnit.MILLISECONDS, duration.incrementAndGet());
+        }
+    };
+
+    /** Decreasing policy factory. */
+    private final static Factory<ExpiryPolicy> DECREASING_POLICY_FACTORY = new Factory<ExpiryPolicy>()
{
+        @Override public ExpiryPolicy create() {
+            return DECREASING_EXPIRY_POLICY;
+        }
+    };
+
+    /** Increasing policy factory. */
+    private final static Factory<ExpiryPolicy> INCREASING_POLICY_FACTORY = new Factory<ExpiryPolicy>()
{
+        @Override public ExpiryPolicy create() {
+            return INCREASING_EXPIRY_POLICY;
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Setup (Level.Iteration)
+    @Override public void setup() throws Exception {
+        Ignition.stopAll(true);
+
+        super.setup();
+
+        CacheConfiguration<Object, Object> cfg = new CacheConfiguration<>();
+        cfg.setName("cache");
+        cfg.setEagerTtl(true);
+        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        String prop = System.getProperty("bench.exp.policy");
+
+        switch (prop) {
+            case "inc":
+                cfg.setExpiryPolicyFactory(INCREASING_POLICY_FACTORY);
+                break;
+            case "dec":
+                cfg.setExpiryPolicyFactory(DECREASING_POLICY_FACTORY);
+                break;
+            default:
+                assert prop.charAt(0) == 'r';
+                double rate = Double.parseDouble(prop.trim().substring(1)) / 100;
+                cfg.setExpiryPolicyFactory(getExpiryPolicyFactoryWithDecreasingRate(rate));
+                break;
+        }
+
+        node.createCache(cfg);
+
+        cache = node.getOrCreateCache("cache");
+
+        IgniteDataStreamer<Integer, IntValue> dataLdr = node.dataStreamer(cache.getName());
+
+        for (int i = 0; i < CNT; i++)
+            dataLdr.addData(i, new IntValue(i));
+
+        dataLdr.close();
+
+        System.out.println("Cache populated.");
+    }
+
+    /** {@inheritDoc} */
+    @TearDown
+    public void tearDown() throws Exception {
+        Ignition.stopAll(true);
+    }
+
+    /**
+     * Test PUT operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    public void put() throws Exception {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        cache.put(key, new IntValue(key));
+    }
+
+    /**
+     * Benchmark runner
+     */
+    public static void main(String[] args) throws RunnerException {
+        List<String> policies = Arrays.asList("inc", "dec", "r25", "r50", "r75");
+        int[] threads = {2, 4, 8, 16, 32};
+
+        List<RunResult> results = new ArrayList<>();
+
+        for (String policy : policies) {
+            for (int thread : threads) {
+                ChainedOptionsBuilder builder = new OptionsBuilder()
+                    .jvmArgs()
+                    .timeUnit(TimeUnit.MILLISECONDS)
+                    .measurementIterations(10)
+                    .measurementTime(TimeValue.seconds(20))
+                    .warmupIterations(5)
+                    .warmupTime(TimeValue.seconds(10))
+                    .jvmArgs("-Dbench.exp.policy=" + policy)
+                    .forks(1)
+                    .threads(thread)
+                    .mode(Mode.Throughput)
+                    .include(JmhWaitStategyBenchmark.class.getSimpleName());
+
+                results.addAll(new Runner(builder.build()).run());
+            }
+        }
+
+        for (RunResult result : results) {
+            BenchmarkParams params = result.getParams();
+            Collection<String> args1 = params.getJvmArgs();
+            for (String s : args1) {
+                System.out.print(s.substring(s.length() -3, s.length()));
+                System.out.print(" x ");
+            }
+            System.out.print(params.getThreads());
+            System.out.print("\t\t");
+            System.out.println(result.getPrimaryResult().toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 3e4561b..ae2895e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,9 +42,19 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
     private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
 
-    /** Cleanup worker thread. */
+    /** Cleanup worker. */
     private CleanupWorker cleanupWorker;
 
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Next expire time. */
+    private volatile long nextExpireTime;
+
+    /** Next expire time updater. */
+    private static final AtomicLongFieldUpdater<GridCacheTtlManager> nextExpireTimeUpdater
=
+        AtomicLongFieldUpdater.newUpdater(GridCacheTtlManager.class, "nextExpireTime");
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
@@ -80,7 +91,24 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         assert Thread.holdsLock(entry);
         assert cleanupWorker != null;
 
-        pendingEntries.add(new EntryWrapper(entry));
+        EntryWrapper e = new EntryWrapper(entry);
+
+        pendingEntries.add(e);
+
+        while (true) {
+            long nextExpireTime = this.nextExpireTime;
+
+            if (e.expireTime < nextExpireTime) {
+                if (nextExpireTimeUpdater.compareAndSet(this, nextExpireTime, e.expireTime))
{
+                    synchronized (mux) {
+                        mux.notifyAll();
+                    }
+
+                    break;
+                }
+            } else
+                break;
+        }
     }
 
     /**
@@ -159,7 +187,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         /**
          * Creates cleanup worker.
          */
-        protected CleanupWorker() {
+        CleanupWorker() {
             super(cctx.gridName(), "ttl-cleanup-worker-" + cctx.name(), cctx.logger(GridCacheTtlManager.class));
         }
 
@@ -168,16 +196,33 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
             while (!isCancelled()) {
                 expire();
 
-                EntryWrapper first = pendingEntries.firstx();
+                long waitTime;
 
-                if (first != null) {
-                    long waitTime = first.expireTime - U.currentTimeMillis();
+                while (true) {
+                    long curTime = U.currentTimeMillis();
+
+                    GridCacheTtlManager.EntryWrapper first = pendingEntries.firstx();
 
-                    if (waitTime > 0)
-                        U.sleep(waitTime);
+                    if (first == null) {
+                        waitTime = 500;
+                        nextExpireTime = curTime + 500;
+                    }
+                    else {
+                        long expireTime = first.expireTime;
+
+                        waitTime = expireTime - curTime;
+                        nextExpireTime = expireTime;
+                    }
+
+                    synchronized (mux) {
+                        if (pendingEntries.firstx() == first) {
+                            if (waitTime > 0)
+                                mux.wait(waitTime);
+
+                            break;
+                        }
+                    }
                 }
-                else
-                    U.sleep(500);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
new file mode 100644
index 0000000..85a491e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerNotificationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ *
+ */
+public class GridCacheTtlManagerNotificationTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Test cache mode. */
+    protected CacheMode cacheMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setEagerTtl(true);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testThatNotificationWorkAsExpected() throws Exception {
+        try (final Ignite g = startGrid(0)) {
+            final BlockingArrayQueue<Event> queue = new BlockingArrayQueue<>();
+
+            g.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    queue.add(evt);
+
+                    return true;
+                }
+            }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+            final String key = "key";
+
+            IgniteCache<Object, Object> cache = g.cache(null);
+
+            ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100_000));
+
+            cache.withExpiryPolicy(plc1).put(key + 1, 1);
+
+            Thread.sleep(1_000); // Cleaner should see entry.
+
+            ExpiryPolicy plc2 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000));
+
+            cache.withExpiryPolicy(plc2).put(key + 2, 1);
+
+            assertNotNull(queue.poll(5, SECONDS)); // We should receive event about second
entry expiration.
+        }
+    }
+
+    /**
+     * Add in several threads value to cache with different expiration policy.
+     * Wait for expiration of keys with small expiration duration.
+     */
+    public void testThatNotificationWorkAsExpectedInMultithreadedMode() throws Exception
{
+        final CyclicBarrier barrier = new CyclicBarrier(21);
+        final AtomicInteger keysRangeGen = new AtomicInteger();
+        final AtomicInteger evtCnt = new AtomicInteger();
+        final int cnt = 1_000;
+
+        try (final Ignite g = startGrid(0)) {
+            final IgniteCache<Object, Object> cache = g.cache(null);
+
+            g.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    evtCnt.incrementAndGet();
+
+                    return true;
+                }
+            }, EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+
+            int smallDuration = 2000;
+
+            int threadCnt = 10;
+
+            GridTestUtils.runMultiThreadedAsync(
+                new CacheFiller(cache, 100_000, barrier, keysRangeGen, cnt),
+                threadCnt, "");
+
+            GridTestUtils.runMultiThreadedAsync(
+                new CacheFiller(cache, smallDuration, barrier, keysRangeGen, cnt),
+                threadCnt, "");
+
+            barrier.await();
+
+            Thread.sleep(1_000); // Cleaner should see at least one entry.
+
+            barrier.await();
+
+            assertEquals(2 * threadCnt * cnt, cache.size());
+
+            Thread.sleep(2 * smallDuration);
+
+            assertEquals(threadCnt * cnt, cache.size());
+            assertEquals(threadCnt * cnt, evtCnt.get());
+        }
+    }
+
+    /** */
+    private static class CacheFiller implements Runnable {
+        /** Barrier. */
+        private final CyclicBarrier barrier;
+        /** Keys range generator. */
+        private final AtomicInteger keysRangeGenerator;
+        /** Count. */
+        private final int cnt;
+        /** Cache. */
+        private final IgniteCache<Object, Object> cache;
+        /** Expiration duration. */
+        private final int expirationDuration;
+
+        /**
+         * @param cache Cache.
+         * @param expirationDuration Expiration duration.
+         * @param barrier Barrier.
+         * @param keysRangeGenerator Keys.
+         * @param cnt Count.
+         */
+        CacheFiller(IgniteCache<Object, Object> cache, int expirationDuration, CyclicBarrier
barrier,
+            AtomicInteger keysRangeGenerator, int cnt) {
+            this.expirationDuration = expirationDuration;
+            this.barrier = barrier;
+            this.keysRangeGenerator = keysRangeGenerator;
+            this.cnt = cnt;
+            this.cache = cache;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                barrier.await();
+
+                ExpiryPolicy plc1 = new CreatedExpiryPolicy(new Duration(MILLISECONDS, expirationDuration));
+                int keyStart = keysRangeGenerator.getAndIncrement() * cnt;
+
+                for (int i = keyStart; i < keyStart + cnt; i++)
+                    cache.withExpiryPolicy(plc1).put("key" + i, 1);
+
+                barrier.await();
+            }
+            catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f27a47b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 21935e5..28cb2da 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.expiry;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.cache.store.IgniteCacheExpiryStoreLoadSelfTest;
+import org.apache.ignite.internal.processors.cache.GridCacheTtlManagerNotificationTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheEntryListenerExpiredEventsTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpireAndUpdateConsistencyTest;
 
@@ -71,6 +72,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheExpireAndUpdateConsistencyTest.class);
 
+        suite.addTestSuite(GridCacheTtlManagerNotificationTest.class);
+
         return suite;
     }
 }
\ No newline at end of file


Mime
View raw message