incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [14/50] [abbrv] git commit: Working on BLUR-229
Date Sun, 03 Nov 2013 15:20:08 GMT
Working on BLUR-229


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/43bdf5aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/43bdf5aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/43bdf5aa

Branch: refs/heads/0.3.0-lucene-upgrade
Commit: 43bdf5aa4db9c3e1d28b831c493ce2c1d4397410
Parents: 839dc46
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Oct 23 10:51:23 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Oct 23 10:51:44 2013 -0400

----------------------------------------------------------------------
 .../blur/store/blockcache_v2/BaseCache.java     | 55 +++++++++++++++-----
 1 file changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/43bdf5aa/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
index 549042b..4d7a2bb 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
@@ -68,7 +68,7 @@ public class BaseCache extends Cache implements Closeable {
     @Override
     public void onEviction(CacheKey key, CacheValue value) {
       _evictions.mark();
-      addToReleaseQueue(value);
+      addToReleaseQueue(key, value);
     }
   }
 
@@ -79,6 +79,26 @@ public class BaseCache extends Cache implements Closeable {
     }
   }
 
+  static class ReleaseEntry {
+    CacheKey _key;
+    CacheValue _value;
+    final long _createTime = System.currentTimeMillis();
+
+    @Override
+    public String toString() {
+      return "ReleaseEntry [_key=" + _key + ", _value=" + _value + "]";
+    }
+
+    public boolean hasLivedToLong(long warningTimeForEntryCleanup) {
+      long now = System.currentTimeMillis();
+      if (_createTime + warningTimeForEntryCleanup < now) {
+        return true;
+      }
+      return false;
+    }
+
+  }
+
   private final ConcurrentLinkedHashMap<CacheKey, CacheValue> _cacheMap;
   private final FileNameFilter _readFilter;
   private final FileNameFilter _writeFilter;
@@ -95,7 +115,8 @@ public class BaseCache extends Cache implements Closeable {
   private final Thread _oldFileDaemonThread;
   private final Thread _oldCacheValueDaemonThread;
   private final AtomicBoolean _running = new AtomicBoolean(true);
-  private final BlockingQueue<CacheValue> _releaseQueue;
+  private final BlockingQueue<ReleaseEntry> _releaseQueue;
+  private final long _warningTimeForEntryCleanup = TimeUnit.MINUTES.toMillis(1);
 
   public BaseCache(long totalNumberOfBytes, Size fileBufferSize, Size cacheBlockSize, FileNameFilter
readFilter,
       FileNameFilter writeFilter, Quiet quiet, STORE store) {
@@ -111,7 +132,7 @@ public class BaseCache extends Cache implements Closeable {
     _misses = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, MISS), MISS, TimeUnit.SECONDS);
     _evictions = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, EVICTION), EVICTION,
TimeUnit.SECONDS);
     _removals = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, REMOVAL), REMOVAL,
TimeUnit.SECONDS);
-    _releaseQueue = new LinkedBlockingQueue<CacheValue>();
+    _releaseQueue = new LinkedBlockingQueue<ReleaseEntry>();
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, CACHE, ENTRIES), new Gauge<Long>()
{
       @Override
       public Long value() {
@@ -141,7 +162,7 @@ public class BaseCache extends Cache implements Closeable {
     _oldFileDaemonThread.setName("BaseCacheOldFileCleanup");
     _oldFileDaemonThread.setPriority(Thread.MIN_PRIORITY);
     _oldFileDaemonThread.start();
-    
+
     _oldCacheValueDaemonThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -161,20 +182,23 @@ public class BaseCache extends Cache implements Closeable {
   }
 
   protected void cleanupOldCacheValues() {
-    Iterator<CacheValue> iterator = _releaseQueue.iterator();
+    Iterator<ReleaseEntry> iterator = _releaseQueue.iterator();
     while (iterator.hasNext()) {
-      CacheValue value = iterator.next();
+      ReleaseEntry entry = iterator.next();
+      CacheValue value = entry._value;
       if (value.refCount() == 0) {
         value.release();
         iterator.remove();
         long capacity = _cacheMap.capacity();
         _cacheMap.setCapacity(capacity + value.size());
+      } else if (entry.hasLivedToLong(_warningTimeForEntryCleanup)) {
+        LOG.warn("CacheValue has not been released [{0}] for over [{1} ms]", entry, _warningTimeForEntryCleanup);
       }
     }
   }
 
   protected void cleanupOldFiles() {
-    LOG.info("Cleanup old files from cache.");
+    LOG.debug("Cleanup old files from cache.");
     Set<Long> validFileIds = new HashSet<Long>(_fileNameToId.values());
     for (CacheKey key : _cacheMap.keySet()) {
       long fileId = key.getFileId();
@@ -182,7 +206,7 @@ public class BaseCache extends Cache implements Closeable {
         CacheValue remove = _cacheMap.remove(key);
         if (remove != null) {
           _removals.mark();
-          addToReleaseQueue(remove);
+          addToReleaseQueue(key, remove);
         }
       }
     }
@@ -194,12 +218,12 @@ public class BaseCache extends Cache implements Closeable {
     _cacheMap.clear();
     _oldFileDaemonThread.interrupt();
     _oldCacheValueDaemonThread.interrupt();
-    for (CacheValue value : _releaseQueue) {
-      value.release();
+    for (ReleaseEntry entry : _releaseQueue) {
+      entry._value.release();
     }
   }
 
-  private void addToReleaseQueue(CacheValue value) {
+  private void addToReleaseQueue(CacheKey key, CacheValue value) {
     if (value != null) {
       if (value.refCount() == 0) {
         value.release();
@@ -207,8 +231,13 @@ public class BaseCache extends Cache implements Closeable {
       }
       long capacity = _cacheMap.capacity();
       _cacheMap.setCapacity(capacity - value.size());
-      LOG.info("CacheValue was not released [{0}]", value);
-      _releaseQueue.add(value);
+
+      ReleaseEntry releaseEntry = new ReleaseEntry();
+      releaseEntry._key = key;
+      releaseEntry._value = value;
+
+      LOG.info("CacheValue was not released [{0}]", releaseEntry);
+      _releaseQueue.add(releaseEntry);
     }
   }
 


Mime
View raw message