Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 791101739A for ; Thu, 30 Apr 2015 13:15:00 +0000 (UTC) Received: (qmail 96020 invoked by uid 500); 30 Apr 2015 13:15:00 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 95946 invoked by uid 500); 30 Apr 2015 13:15:00 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 95851 invoked by uid 99); 30 Apr 2015 13:15:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 13:15:00 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.164.171.186] (HELO mx1-us-east.apache.org) (54.164.171.186) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 13:14:50 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id BB7AB43E68 for ; Thu, 30 Apr 2015 13:14:29 +0000 (UTC) Received: (qmail 94747 invoked by uid 99); 30 Apr 2015 13:14:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Apr 2015 13:14:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D778E00C5; Thu, 30 Apr 2015 13:14:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 30 Apr 2015 13:14:41 -0000 Message-Id: <89dc4bf89d104857990cdbb16fcae97c@git.apache.org> In-Reply-To: <6365773cb52340069df21b275484c71f@git.apache.org> References: <6365773cb52340069df21b275484c71f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/33] incubator-ignite git commit: ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy X-Virus-Checked: Checked by ClamAV on apache.org ignite-728 Need to reimplement CREATE-TIME-TTL as eviction policy Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b58e1ac8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b58e1ac8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b58e1ac8 Branch: refs/heads/ignite-sprint-5 Commit: b58e1ac8001ff11e2faff2a55c61d8955ca73d95 Parents: 5fb7948 Author: agura Authored: Thu Apr 23 21:26:31 2015 +0300 Committer: agura Committed: Mon Apr 27 17:02:54 2015 +0300 ---------------------------------------------------------------------- .../eviction/sorted/SortedEvictionPolicy.java | 2 +- .../processors/cache/GridCacheTtlManager.java | 164 ++++++++++++------- .../processors/cache/GridCacheUtils.java | 5 +- 3 files changed, 110 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java index 0065244..7965c97 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java @@ -381,7 +381,7 @@ public class SortedEvictionPolicy implements EvictionPolicy, SortedE private static final long serialVersionUID = 0L; /** Size. */ - private volatile LongAdder8 size = new LongAdder8(); + private final LongAdder8 size = new LongAdder8(); /** * @param comp Comparator. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5198b53..d8af2b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.thread.*; -import java.util.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; /** * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set. @@ -34,14 +35,11 @@ import java.util.*; @SuppressWarnings("NakedNotify") public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private final GridConcurrentSkipListSet pendingEntries = new GridConcurrentSkipListSet<>(); + private final GridConcurrentSkipListSetEx pendingEntries = new GridConcurrentSkipListSetEx(); /** Cleanup worker thread. */ private CleanupWorker cleanupWorker; - /** Sync mutex. */ - private final Object mux = new Object(); - /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) @@ -68,24 +66,13 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { - EntryWrapper wrapper = new EntryWrapper(entry); - - pendingEntries.add(wrapper); - - // If entry is on the first position, notify waiting thread. - if (wrapper == pendingEntries.firstx()) { - synchronized (mux) { - mux.notifyAll(); - } - } + pendingEntries.add(new EntryWrapper(entry)); } /** * @param entry Entry to remove. */ public void removeTrackedEntry(GridCacheMapEntry entry) { - // Remove must be called while holding lock on entry before updating expire time. - // No need to wake up waiting thread in this case. pendingEntries.remove(new EntryWrapper(entry)); } @@ -97,6 +84,45 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } /** + * Expires entries by TTL. + * + * @param sizeLimited Size limited. + */ + public void expire(boolean sizeLimited) { + long now = U.currentTimeMillis(); + + GridCacheVersion obsoleteVer = null; + + int size = pendingEntries.sizex(); + + while (!sizeLimited || size-- > 0) { + EntryWrapper e = pendingEntries.pollFirst(); + + if (e == null) + break; + + if (e.expireTime > now) { + pendingEntries.add(e); + + break; + } + + if (obsoleteVer == null) + obsoleteVer = cctx.versions().next(); + + if (log.isDebugEnabled()) + log.debug("Trying to remove expired entry from cache: " + e); + + if (e.entry.onTtlExpired(obsoleteVer)) { + e.entry.context().cache().removeEntry(e.entry); + + if (e.entry.context().cache().configuration().isStatisticsEnabled()) + e.entry.context().cache().metrics0().onEvict(); + } + } + } + + /** * Entry cleanup worker. */ private class CleanupWorker extends GridWorker { @@ -110,52 +136,18 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { while (!isCancelled()) { - long now = U.currentTimeMillis(); - - GridCacheVersion obsoleteVer = null; - - for (Iterator it = pendingEntries.iterator(); it.hasNext(); ) { - EntryWrapper wrapper = it.next(); - - if (wrapper.expireTime <= now) { - if (log.isDebugEnabled()) - log.debug("Trying to remove expired entry from cache: " + wrapper); - - if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); - - if (wrapper.entry.onTtlExpired(obsoleteVer)) - wrapper.entry.context().cache().removeEntry(wrapper.entry); + expire(false); - if (wrapper.entry.context().cache().configuration().isStatisticsEnabled()) - wrapper.entry.context().cache().metrics0().onEvict(); + EntryWrapper first = pendingEntries.firstx(); - it.remove(); - } - else - break; - } + if (first != null) { + long waitTime = first.expireTime - U.currentTimeMillis(); - synchronized (mux) { - while (true) { - // Access of the first element must be inside of - // synchronization block, so we don't miss out - // on thread notification events sent from - // 'addTrackedEntry(..)' method. - EntryWrapper first = pendingEntries.firstx(); - - if (first != null) { - long waitTime = first.expireTime - U.currentTimeMillis(); - - if (waitTime > 0) - mux.wait(waitTime); - else - break; - } - else - mux.wait(5000); - } + if (waitTime > 0) + U.sleep(waitTime); } + else + U.sleep(500); } } } @@ -214,4 +206,58 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { return res; } } + + /** + * Provides additional method {@code #sizex()}. NOTE: Only the following methods supports this addition: + *
    + *
  • {@code #add()}
  • + *
  • {@code #remove()}
  • + *
  • {@code #pollFirst()}
  • + *
      + */ + private static class GridConcurrentSkipListSetEx extends GridConcurrentSkipListSet { + /** */ + private static final long serialVersionUID = 0L; + + /** Size. */ + private final LongAdder8 size = new LongAdder8(); + + /** + * @return Size based on performed operations. + */ + public int sizex() { + return size.intValue(); + } + + /** {@inheritDoc} */ + @Override public boolean add(EntryWrapper e) { + boolean res = super.add(e); + + assert res; + + size.increment(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean remove(Object o) { + boolean res = super.remove(o); + + if (res) + size.decrement(); + + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public EntryWrapper pollFirst() { + EntryWrapper e = super.pollFirst(); + + if (e != null) + size.decrement(); + + return e; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58e1ac8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e7c7f9d..a0e45e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1157,6 +1157,8 @@ public class GridCacheUtils { if (ctx.isNear()) ctx.near().dht().context().evicts().unwind(); + + ctx.ttl().expire(true); } /** @@ -1166,11 +1168,12 @@ public class GridCacheUtils { assert ctx != null; for (GridCacheContext cacheCtx : ctx.cacheContexts()) { - cacheCtx.evicts().unwind(); if (cacheCtx.isNear()) cacheCtx.near().dht().context().evicts().unwind(); + + cacheCtx.ttl().expire(true); } }