ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/50] incubator-ignite git commit: ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy
Date Tue, 05 May 2015 06:56:54 GMT
ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ef7d0114
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ef7d0114
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ef7d0114

Branch: refs/heads/ignite-157-2
Commit: ef7d0114c4466eefaff1098c41e5bdb6c3766a28
Parents: 2a68725
Author: agura <agura@gridgain.com>
Authored: Thu Apr 23 21:26:31 2015 +0300
Committer: agura <agura@gridgain.com>
Committed: Tue Apr 28 17:15:45 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheTtlManager.java   | 164 ++++++++++++-------
 .../processors/cache/GridCacheUtils.java        |   5 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |   4 +-
 3 files changed, 111 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5198b53..d8af2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.thread.*;
 
-import java.util.*;
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
 
 /**
  * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()}
flag is set.
@@ -34,14 +35,11 @@ import java.util.*;
 @SuppressWarnings("NakedNotify")
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
-    private final GridConcurrentSkipListSet<EntryWrapper> pendingEntries = new GridConcurrentSkipListSet<>();
+    private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx();
 
     /** Cleanup worker thread. */
     private CleanupWorker cleanupWorker;
 
-    /** Sync mutex. */
-    private final Object mux = new Object();
-
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
@@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
      * @param entry Entry to add.
      */
     public void addTrackedEntry(GridCacheMapEntry entry) {
-        EntryWrapper wrapper = new EntryWrapper(entry);
-
-        pendingEntries.add(wrapper);
-
-        // If entry is on the first position, notify waiting thread.
-        if (wrapper == pendingEntries.firstx()) {
-            synchronized (mux) {
-                mux.notifyAll();
-            }
-        }
+        pendingEntries.add(new EntryWrapper(entry));
     }
 
     /**
      * @param entry Entry to remove.
      */
     public void removeTrackedEntry(GridCacheMapEntry entry) {
-        // Remove must be called while holding lock on entry before updating expire time.
-        // No need to wake up waiting thread in this case.
         pendingEntries.remove(new EntryWrapper(entry));
     }
 
@@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Expires entries by TTL.
+     *
+     * @param sizeLimited Size limited.
+     */
+    public void expire(boolean sizeLimited) {
+        long now = U.currentTimeMillis();
+
+        GridCacheVersion obsoleteVer = null;
+
+        int size = pendingEntries.sizex();
+
+        while (!sizeLimited || size-- > 0) {
+            EntryWrapper e = pendingEntries.pollFirst();
+
+            if (e == null)
+                break;
+
+            if (e.expireTime > now) {
+                pendingEntries.add(e);
+
+                break;
+            }
+
+            if (obsoleteVer == null)
+                obsoleteVer = cctx.versions().next();
+
+            if (log.isDebugEnabled())
+                log.debug("Trying to remove expired entry from cache: " + e);
+
+            if (e.entry.onTtlExpired(obsoleteVer)) {
+                e.entry.context().cache().removeEntry(e.entry);
+
+                if (e.entry.context().cache().configuration().isStatisticsEnabled())
+                    e.entry.context().cache().metrics0().onEvict();
+            }
+        }
+    }
+
+    /**
      * Entry cleanup worker.
      */
     private class CleanupWorker extends GridWorker {
@@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
             while (!isCancelled()) {
-                long now = U.currentTimeMillis();
-
-                GridCacheVersion obsoleteVer = null;
-
-                for (Iterator<EntryWrapper> it = pendingEntries.iterator(); it.hasNext();
) {
-                    EntryWrapper wrapper = it.next();
-
-                    if (wrapper.expireTime <= now) {
-                        if (log.isDebugEnabled())
-                            log.debug("Trying to remove expired entry from cache: " + wrapper);
-
-                        if (obsoleteVer == null)
-                            obsoleteVer = cctx.versions().next();
-
-                        if (wrapper.entry.onTtlExpired(obsoleteVer))
-                            wrapper.entry.context().cache().removeEntry(wrapper.entry);
+                expire(false);
 
-                        if (wrapper.entry.context().cache().configuration().isStatisticsEnabled())
-                            wrapper.entry.context().cache().metrics0().onEvict();
+                EntryWrapper first = pendingEntries.firstx();
 
-                        it.remove();
-                    }
-                    else
-                        break;
-                }
+                if (first != null) {
+                    long waitTime = first.expireTime - U.currentTimeMillis();
 
-                synchronized (mux) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTrackedEntry(..)' method.
-                        EntryWrapper first = pendingEntries.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.expireTime - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux.wait(5000);
-                    }
+                    if (waitTime > 0)
+                        U.sleep(waitTime);
                 }
+                else
+                    U.sleep(500);
             }
         }
     }
@@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
             return res;
         }
     }
+
+    /**
+     * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports
this addition:
+     * <ul>
+     *     <li>{@code #add()}</li>
+     *     <li>{@code #remove()}</li>
+     *     <li>{@code #pollFirst()}</li>
+     * <ul/>
+     */
+    private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet<EntryWrapper>
{
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Size. */
+        private final LongAdder8 size = new LongAdder8();
+
+        /**
+         * @return Size based on performed operations.
+         */
+        public int sizex() {
+            return size.intValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(EntryWrapper e) {
+            boolean res = super.add(e);
+
+            assert res;
+
+            size.increment();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean remove(Object o) {
+            boolean res = super.remove(o);
+
+            if (res)
+                size.decrement();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public EntryWrapper pollFirst() {
+            EntryWrapper e = super.pollFirst();
+
+            if (e != null)
+                size.decrement();
+
+            return e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e7c7f9d..a0e45e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1157,6 +1157,8 @@ public class GridCacheUtils {
 
         if (ctx.isNear())
             ctx.near().dht().context().evicts().unwind();
+
+        ctx.ttl().expire(true);
     }
 
     /**
@@ -1166,11 +1168,12 @@ public class GridCacheUtils {
         assert ctx != null;
 
         for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts()) {
-
             cacheCtx.evicts().unwind();
 
             if (cacheCtx.isNear())
                 cacheCtx.near().dht().context().evicts().unwind();
+
+            cacheCtx.ttl().expire(true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ef7d0114/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index a873bb0..544fe6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -77,7 +77,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         super.afterTest();
 
         for (int i = 0; i < gridCount(); i++) {
-            GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous();
+            GridContinuousProcessor proc = grid(i).context().continuous();
 
             ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");
 
@@ -712,7 +712,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
 
         expirePlcCache.put(key, 10);
 
-        U.sleep(200);
+        U.sleep(500);
 
         if (!eagerTtl())
             assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event
if eager ttl is disabled.


Mime
View raw message