incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [10/30] git commit: Fixed BLUR-284
Date Thu, 07 Nov 2013 02:41:33 GMT
Fixed BLUR-284


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/57810c8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/57810c8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/57810c8f

Branch: refs/heads/blur-console-v2
Commit: 57810c8f7f569a02def87da5830c1f90dbd34bcf
Parents: 1c3a4a1
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Tue Oct 22 15:39:45 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Tue Oct 22 15:40:49 2013 -0400

----------------------------------------------------------------------
 .../blur/manager/writer/BlurNRTIndex.java       |  1 -
 .../refcounter/DirectoryReferenceFileGC.java    |  7 +-
 .../store/BlockCacheDirectoryFactoryV2.java     |  4 +-
 .../blur/store/blockcache_v2/BaseCache.java     | 60 +++++++++++++--
 .../store/blockcache_v2/CacheIndexInput.java    | 80 ++++++++++++++++++++
 .../cachevalue/BaseCacheValue.java              |  1 +
 6 files changed, 141 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
index f9365d4..a9428c6 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurNRTIndex.java
@@ -114,7 +114,6 @@ public class BlurNRTIndex extends BlurIndex {
       sdp = new SnapshotDeletionPolicy(_tableContext.getIndexDeletionPolicy());
     }
     conf.setIndexDeletionPolicy(sdp);
-//    conf.setMergedSegmentWarmer(new FieldBasedWarmer(shardContext, _isClosed));
 
     TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
     mergePolicy.setUseCompoundFile(false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
index fa03be6..8fda709 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/store/refcounter/DirectoryReferenceFileGC.java
@@ -17,6 +17,7 @@ package org.apache.blur.lucene.store.refcounter;
  * limitations under the License.
  */
 import java.io.Closeable;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
@@ -32,6 +33,7 @@ import org.apache.lucene.store.Directory;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.MetricName;
+
 import static org.apache.blur.metrics.MetricsConstants.*;
 
 public class DirectoryReferenceFileGC extends TimerTask implements Closeable {
@@ -67,7 +69,7 @@ public class DirectoryReferenceFileGC extends TimerTask implements Closeable
{
       }
     }
   }
-  
+
   public DirectoryReferenceFileGC() {
     _timer = new Timer("Blur-File-GC", true);
     _timer.scheduleAtFixedRate(this, _delay, _delay);
@@ -105,6 +107,9 @@ public class DirectoryReferenceFileGC extends TimerTask implements Closeable
{
         } else {
           count++;
         }
+      } catch (FileNotFoundException e) {
+        LOG.error("File [{0}] already deleted.", value);
+        iterator.remove();
       } catch (IOException e) {
         LOG.error("Unknown error", e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
index 6135266..840d0cd 100644
--- a/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
+++ b/blur-store/src/main/java/org/apache/blur/store/BlockCacheDirectoryFactoryV2.java
@@ -57,7 +57,7 @@ public class BlockCacheDirectoryFactoryV2 extends BlockCacheDirectoryFactory
{
     FileNameFilter readFilter = new FileNameFilter() {
       @Override
       public boolean accept(CacheDirectory directory, String fileName) {
-        if (fileName.endsWith(".fdt") || fileName.endsWith(".fdx")) {
+        if (fileName.endsWith(".fdt")) {
           return false;
         }
         return true;
@@ -67,7 +67,7 @@ public class BlockCacheDirectoryFactoryV2 extends BlockCacheDirectoryFactory
{
     FileNameFilter writeFilter = new FileNameFilter() {
       @Override
       public boolean accept(CacheDirectory directory, String fileName) {
-        if (fileName.endsWith(".fdt") || fileName.endsWith(".fdx")) {
+        if (fileName.endsWith(".fdt")) {
           return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
index 1c69e5e..549042b 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
@@ -33,7 +33,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +58,7 @@ public class BaseCache extends Cache implements Closeable {
 
   private static final Log LOG = LogFactory.getLog(BaseCache.class);
   private static final long _1_MINUTE = TimeUnit.MINUTES.toMillis(1);
+  protected static final long _1_SECOND = TimeUnit.SECONDS.toMillis(1);
 
   public enum STORE {
     ON_HEAP, OFF_HEAP
@@ -64,6 +67,7 @@ public class BaseCache extends Cache implements Closeable {
   class BaseCacheEvictionListener implements EvictionListener<CacheKey, CacheValue>
{
     @Override
     public void onEviction(CacheKey key, CacheValue value) {
+      _evictions.mark();
       addToReleaseQueue(value);
     }
   }
@@ -88,8 +92,10 @@ public class BaseCache extends Cache implements Closeable {
   private final Meter _misses;
   private final Meter _evictions;
   private final Meter _removals;
-  private final Thread _daemonThread;
+  private final Thread _oldFileDaemonThread;
+  private final Thread _oldCacheValueDaemonThread;
   private final AtomicBoolean _running = new AtomicBoolean(true);
+  private final BlockingQueue<CacheValue> _releaseQueue;
 
   public BaseCache(long totalNumberOfBytes, Size fileBufferSize, Size cacheBlockSize, FileNameFilter
readFilter,
       FileNameFilter writeFilter, Quiet quiet, STORE store) {
@@ -105,6 +111,7 @@ public class BaseCache extends Cache implements Closeable {
     _misses = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, MISS), MISS, TimeUnit.SECONDS);
     _evictions = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, EVICTION), EVICTION,
TimeUnit.SECONDS);
     _removals = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, REMOVAL), REMOVAL,
TimeUnit.SECONDS);
+    _releaseQueue = new LinkedBlockingQueue<CacheValue>();
     Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, CACHE, ENTRIES), new Gauge<Long>()
{
       @Override
       public Long value() {
@@ -117,7 +124,7 @@ public class BaseCache extends Cache implements Closeable {
         return _cacheMap.weightedSize();
       }
     });
-    _daemonThread = new Thread(new Runnable() {
+    _oldFileDaemonThread = new Thread(new Runnable() {
       @Override
       public void run() {
         while (_running.get()) {
@@ -130,10 +137,40 @@ public class BaseCache extends Cache implements Closeable {
         }
       }
     });
-    _daemonThread.setDaemon(true);
-    _daemonThread.setName("BaseCacheCleanup");
-    _daemonThread.setPriority(Thread.MIN_PRIORITY);
-    _daemonThread.start();
+    _oldFileDaemonThread.setDaemon(true);
+    _oldFileDaemonThread.setName("BaseCacheOldFileCleanup");
+    _oldFileDaemonThread.setPriority(Thread.MIN_PRIORITY);
+    _oldFileDaemonThread.start();
+    
+    _oldCacheValueDaemonThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (_running.get()) {
+          cleanupOldCacheValues();
+          try {
+            Thread.sleep(_1_SECOND);
+          } catch (InterruptedException e) {
+            return;
+          }
+        }
+      }
+    });
+    _oldCacheValueDaemonThread.setDaemon(true);
+    _oldCacheValueDaemonThread.setName("BaseCacheCleanupCacheValues");
+    _oldCacheValueDaemonThread.start();
+  }
+
+  protected void cleanupOldCacheValues() {
+    Iterator<CacheValue> iterator = _releaseQueue.iterator();
+    while (iterator.hasNext()) {
+      CacheValue value = iterator.next();
+      if (value.refCount() == 0) {
+        value.release();
+        iterator.remove();
+        long capacity = _cacheMap.capacity();
+        _cacheMap.setCapacity(capacity + value.size());
+      }
+    }
   }
 
   protected void cleanupOldFiles() {
@@ -145,6 +182,7 @@ public class BaseCache extends Cache implements Closeable {
         CacheValue remove = _cacheMap.remove(key);
         if (remove != null) {
           _removals.mark();
+          addToReleaseQueue(remove);
         }
       }
     }
@@ -154,17 +192,23 @@ public class BaseCache extends Cache implements Closeable {
   public void close() throws IOException {
     _running.set(false);
     _cacheMap.clear();
-    _daemonThread.interrupt();
+    _oldFileDaemonThread.interrupt();
+    _oldCacheValueDaemonThread.interrupt();
+    for (CacheValue value : _releaseQueue) {
+      value.release();
+    }
   }
 
   private void addToReleaseQueue(CacheValue value) {
     if (value != null) {
-      _evictions.mark();
       if (value.refCount() == 0) {
         value.release();
         return;
       }
+      long capacity = _cacheMap.capacity();
+      _cacheMap.setCapacity(capacity - value.size());
       LOG.info("CacheValue was not released [{0}]", value);
+      _releaseQueue.add(value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
index 8967f99..a91cf2e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
@@ -60,6 +60,79 @@ public class CacheIndexInput extends IndexInput {
   }
 
   @Override
+  public int readVInt() throws IOException {
+    if (_cacheValue != null && remaining() >= 5) {
+      byte b = readByteFromCache();
+      if (b >= 0)
+        return b;
+      int i = b & 0x7F;
+      b = readByteFromCache();
+      i |= (b & 0x7F) << 7;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7F) << 14;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7F) << 21;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors:
+      i |= (b & 0x0F) << 28;
+      if ((b & 0xF0) == 0)
+        return i;
+      throw new IOException("Invalid vInt detected (too many bits)");
+    }
+    return super.readVInt();
+  }
+
+  @Override
+  public long readVLong() throws IOException {
+    if (_cacheValue != null && remaining() >= 9) {
+      byte b = readByteFromCache();
+      if (b >= 0)
+        return b;
+      long i = b & 0x7FL;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 7;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 14;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 21;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 28;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 35;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 42;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 49;
+      if (b >= 0)
+        return i;
+      b = readByteFromCache();
+      i |= (b & 0x7FL) << 56;
+      if (b >= 0)
+        return i;
+      throw new IOException("Invalid vLong detected (negative values disallowed)");
+    }
+    return super.readVLong();
+  }
+
+  @Override
   public byte readByte() throws IOException {
     ensureOpen();
     tryToFill();
@@ -185,6 +258,13 @@ public class CacheIndexInput extends IndexInput {
     close();
   }
 
+  private byte readByteFromCache() {
+    byte b = _cacheValue.read(_blockPosition);
+    _position++;
+    _blockPosition++;
+    return b;
+  }
+
   private int remaining() {
     return _cacheValue.length() - _blockPosition;
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/57810c8f/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
index 13b6299..5c6136f 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
@@ -120,6 +120,7 @@ public abstract class BaseCacheValue extends AtomicLong implements CacheValue
{
 
   @Override
   protected void finalize() throws Throwable {
+    // @TODO this may not be needed.
     if (!_released) {
       release();
     }


Mime
View raw message