incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixing a couple of issues while performing a burn in test. In the IndexManager, multiple index readers were being used and during edge race conditions the document ds would change during a row fetch. The act of detaching cache values only o
Date Fri, 13 Dec 2013 15:38:52 GMT
Updated Branches:
  refs/heads/apache-blur-0.2 38f85e9a1 -> 2fd1011a9


Fixing a couple of issues while performing a burn in test.  In the IndexManager, multiple
index readers were being used and during edge race conditions the document ds would change
during a row fetch.  The act of detaching cache values only occurs now when it is necessary.
 Meaning that when another object is holding a reference to the cache value will it make a
safe copy.  This will reduce the GC overhead greatly.  The last is fixing the CacheDecompressor,
there was an issue with the copying from the cached bytesref to the destination bytesref.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/2fd1011a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/2fd1011a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/2fd1011a

Branch: refs/heads/apache-blur-0.2
Commit: 2fd1011a9903cc4ed35d5dc34ec165587b28dfb2
Parents: 38f85e9
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Dec 13 10:38:50 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Dec 13 10:38:50 2013 -0500

----------------------------------------------------------------------
 .../org/apache/blur/manager/IndexManager.java   | 58 +++++++++++---------
 .../lucene/codec/Blur022StoredFieldsFormat.java | 24 +++++++-
 .../blur/lucene/codec/CachedDecompressor.java   | 46 ++++++++--------
 .../org/apache/blur/lucene/codec/CachedKey.java | 51 +++++++++++++++++
 .../store/blockcache_v2/CacheIndexInput.java    |  7 +++
 .../store/blockcache_v2/CacheIndexOutput.java   |  2 +
 .../blur/store/blockcache_v2/CacheValue.java    |  4 ++
 .../blockcache_v2/CacheValueBufferPool.java     |  3 +
 .../cachevalue/BaseCacheValue.java              | 11 ++++
 .../cachevalue/DetachableCacheValue.java        | 46 ++++++++++++----
 10 files changed, 190 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 1b10fc3..babe21f 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -235,14 +235,18 @@ public class IndexManager {
 
   public void fetchRow(String table, Selector selector, FetchResult fetchResult) throws BlurException
{
     validSelector(selector);
-    BlurIndex index;
-    String shard;
+    BlurIndex index = null;
+    String shard = null;
     Tracer trace = Trace.trace("manager fetch", Trace.param("table", table));
+    IndexSearcherClosable searcher = null;
     try {
       if (selector.getLocationId() == null) {
         // Not looking up by location id so we should resetSearchers.
         ShardServerContext.resetSearchers();
-        populateSelector(table, selector);
+        shard = MutationHelper.getShardName(table, selector.rowId, getNumberOfShards(table),
_blurPartitioner);
+        index = getBlurIndex(table,shard);
+        searcher = index.getIndexSearcher();
+        populateSelector(searcher, shard, table, selector);
       }
       String locationId = selector.getLocationId();
       if (locationId.equals(NOT_FOUND)) {
@@ -250,20 +254,11 @@ public class IndexManager {
         fetchResult.setExists(false);
         return;
       }
-      shard = getShard(locationId);
-      Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
-      if (blurIndexes == null) {
-        LOG.error("Table [{0}] not found", table);
-        // @TODO probably should make a enum for not found on this server so the
-        // controller knows to try another server.
-        throw new BException("Table [" + table + "] not found");
+      if (shard == null) {
+        shard = getShard(locationId);
       }
-      index = blurIndexes.get(shard);
       if (index == null) {
-        LOG.error("Shard [{0}] not found in table [{1}]", shard, table);
-        // @TODO probably should make a enum for not found on this server so the
-        // controller knows to try another server.
-        throw new BException("Shard [" + shard + "] not found in table [" + table + "]");
+        index = getBlurIndex(table,shard);
       }
     } catch (BlurException e) {
       throw e;
@@ -271,7 +266,6 @@ public class IndexManager {
       LOG.error("Unknown error while trying to get the correct index reader for selector
[{0}].", e, selector);
       throw new BException(e.getMessage(), e);
     }
-    IndexSearcherClosable searcher = null;
     TimerContext timerContext = _fetchTimer.time();
     boolean usedCache = true;
     try {
@@ -280,10 +274,10 @@ public class IndexManager {
         searcher = shardServerContext.getIndexSearcherClosable(table, shard);
       }
       if (searcher == null) {
+        // Was not pulled from cache, get a fresh one from the index.
         searcher = index.getIndexSearcher();
         usedCache = false;
       }
-
       TableContext tableContext = getTableContext(table);
       FieldManager fieldManager = tableContext.getFieldManager();
 
@@ -318,6 +312,24 @@ public class IndexManager {
     }
   }
 
+  private BlurIndex getBlurIndex(String table, String shard) throws BException, IOException
{
+    Map<String, BlurIndex> blurIndexes = _indexServer.getIndexes(table);
+    if (blurIndexes == null) {
+      LOG.error("Table [{0}] not found", table);
+      // @TODO probably should make a enum for not found on this server so the
+      // controller knows to try another server.
+      throw new BException("Table [" + table + "] not found");
+    }
+    BlurIndex index = blurIndexes.get(shard);
+    if (index == null) {
+      LOG.error("Shard [{0}] not found in table [{1}]", shard, table);
+      // @TODO probably should make a enum for not found on this server so the
+      // controller knows to try another server.
+      throw new BException("Shard [" + shard + "] not found in table [" + table + "]");
+    }
+    return index;
+  }
+
   private Query getHighlightQuery(Selector selector, String table, FieldManager fieldManager)
throws ParseException,
       BlurException {
     HighlightOptions highlightOptions = selector.getHighlightOptions();
@@ -337,17 +349,11 @@ public class IndexManager {
         getScoreType(query.scoreType), context);
   }
 
-  private void populateSelector(String table, Selector selector) throws IOException, BlurException
{
+  private void populateSelector(IndexSearcherClosable searcher, String shardName, String
table, Selector selector)
+      throws IOException, BlurException {
     Tracer trace = Trace.trace("populate selector");
     String rowId = selector.rowId;
     String recordId = selector.recordId;
-    String shardName = MutationHelper.getShardName(table, rowId, getNumberOfShards(table),
_blurPartitioner);
-    Map<String, BlurIndex> indexes = _indexServer.getIndexes(table);
-    BlurIndex blurIndex = indexes.get(shardName);
-    if (blurIndex == null) {
-      throw new BException("Shard [" + shardName + "] is not being servered by this shardserver.");
-    }
-    IndexSearcherClosable searcher = blurIndex.getIndexSearcher();
     try {
       BooleanQuery query = new BooleanQuery();
       if (selector.recordOnly) {
@@ -372,8 +378,6 @@ public class IndexManager {
         selector.setLocationId(NOT_FOUND);
       }
     } finally {
-      // this will allow for closing of index
-      searcher.close();
       trace.done();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/lucene/codec/Blur022StoredFieldsFormat.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/Blur022StoredFieldsFormat.java
b/blur-store/src/main/java/org/apache/blur/lucene/codec/Blur022StoredFieldsFormat.java
index c43b0b2..04b2526 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/codec/Blur022StoredFieldsFormat.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/Blur022StoredFieldsFormat.java
@@ -34,6 +34,11 @@ import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
+import org.apache.lucene.util.BytesRef;
+
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.googlecode.concurrentlinkedhashmap.Weigher;
+
 public final class Blur022StoredFieldsFormat extends StoredFieldsFormat {
 
   static final String STORED_FIELDS_FORMAT_CHUNK_SIZE = "StoredFieldsFormat.chunkSize";
@@ -42,18 +47,31 @@ public final class Blur022StoredFieldsFormat extends StoredFieldsFormat
{
   private static final String SEGMENT_SUFFIX = "";
   private final int _chunkSize;
   private final CompressionMode _compressionMode;
+  private final ConcurrentLinkedHashMap<CachedKey, BytesRef> _cache;
+  private final int capacity = 16 * 1024 * 1024;
 
   public Blur022StoredFieldsFormat(int chunkSize, CompressionMode compressionMode) {
     _chunkSize = chunkSize;
     _compressionMode = compressionMode;
+    _cache = new ConcurrentLinkedHashMap.Builder<CachedKey, BytesRef>().weigher(new
Weigher<BytesRef>() {
+      @Override
+      public int weightOf(BytesRef value) {
+        return value.bytes.length;
+      }
+    }).maximumWeightedCapacity(capacity).build();
   }
 
   static class CachedCompressionMode extends CompressionMode {
 
     final CompressionMode _compressionMode;
+    final SegmentInfo _si;
+    final ConcurrentLinkedHashMap<CachedKey, BytesRef> _cache;
 
-    CachedCompressionMode(CompressionMode compressionMode, Directory directory, SegmentInfo
si) {
+    CachedCompressionMode(CompressionMode compressionMode, SegmentInfo si,
+        ConcurrentLinkedHashMap<CachedKey, BytesRef> cache) {
       _compressionMode = compressionMode;
+      _si = si;
+      _cache = cache;
     }
 
     @Override
@@ -63,7 +81,7 @@ public final class Blur022StoredFieldsFormat extends StoredFieldsFormat
{
 
     @Override
     public Decompressor newDecompressor() {
-      return new CachedDecompressor(_compressionMode.newDecompressor());
+      return new CachedDecompressor(_compressionMode.newDecompressor(), _si, _cache);
     }
 
     @Override
@@ -76,7 +94,7 @@ public final class Blur022StoredFieldsFormat extends StoredFieldsFormat
{
   @Override
   public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos
fn, IOContext context)
       throws IOException {
-    CompressionMode compressionMode = new CachedCompressionMode(getCompressionMode(si), directory,
si);
+    CompressionMode compressionMode = new CachedCompressionMode(getCompressionMode(si), si,
_cache);
     return new CompressingStoredFieldsReader(directory, si, SEGMENT_SUFFIX, fn, context,
FORMAT_NAME, compressionMode);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java
b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java
index 5483891..69f892a 100644
--- a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java
+++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedDecompressor.java
@@ -19,45 +19,47 @@ package org.apache.blur.lucene.codec;
 import java.io.IOException;
 
 import org.apache.lucene.codecs.compressing.Decompressor;
+import org.apache.lucene.index.SegmentInfo;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
 
-public class CachedDecompressor extends Decompressor {
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 
-  static class Entry {
-    String _inputName;
-    long _position = -1;
-    BytesRef _bytesRef = new BytesRef();
-  }
+public class CachedDecompressor extends Decompressor {
 
   private final Decompressor _decompressor;
-  private final ThreadLocal<Entry> _cache = new ThreadLocal<Entry>() {
-    @Override
-    protected Entry initialValue() {
-      return new Entry();
-    }
-  };
+  private final ConcurrentLinkedHashMap<CachedKey, BytesRef> _cache;
+  private final SegmentInfo _si;
 
-  public CachedDecompressor(Decompressor decompressor) {
+  public CachedDecompressor(Decompressor decompressor, SegmentInfo si,
+      ConcurrentLinkedHashMap<CachedKey, BytesRef> cache) {
+    _si = si;
     _decompressor = decompressor;
+    _cache = cache;
   }
 
   @Override
-  public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef
bytes) throws IOException {
+  public void decompress(final DataInput in, final int originalLength, final int offset,
final int length,
+      final BytesRef bytes) throws IOException {
     if (in instanceof IndexInput) {
       IndexInput indexInput = (IndexInput) in;
       String name = indexInput.toString();
-      Entry entry = _cache.get();
       long filePointer = indexInput.getFilePointer();
-      BytesRef cachedRef = entry._bytesRef;
-      if (!name.equals(entry._inputName) || entry._position != filePointer) {
-        cachedRef.grow(originalLength);
-        _decompressor.decompress(in, originalLength, 0, originalLength, cachedRef);
-        entry._inputName = name;
-        entry._position = filePointer;
+      CachedKey key = new CachedKey(name, filePointer, _si);
+      BytesRef cachedRef = _cache.get(key);
+      if (cachedRef == null) {
+        cachedRef = new BytesRef(originalLength + 7);
+        _decompressor.decompress(indexInput, originalLength, 0, originalLength, cachedRef);
+        _cache.put(key, cachedRef);
+        cachedRef.length = originalLength;
+        cachedRef.offset = 0;
+      }
+      if (bytes.bytes.length < originalLength + 7) {
+        bytes.bytes = new byte[ArrayUtil.oversize(originalLength + 7, 1)];
       }
-      bytes.copyBytes(cachedRef);
+      System.arraycopy(cachedRef.bytes, cachedRef.offset, bytes.bytes, 0, length + offset);
       bytes.offset = offset;
       bytes.length = length;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedKey.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedKey.java b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedKey.java
new file mode 100644
index 0000000..d7b099c
--- /dev/null
+++ b/blur-store/src/main/java/org/apache/blur/lucene/codec/CachedKey.java
@@ -0,0 +1,51 @@
+package org.apache.blur.lucene.codec;
+
+import org.apache.lucene.index.SegmentInfo;
+
+class CachedKey {
+
+  final long _filePointer;
+  final String _name;
+  final SegmentInfo _si;
+
+  public CachedKey(String name, long filePointer, SegmentInfo si) {
+    _name = name;
+    _filePointer = filePointer;
+    _si = si;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (_filePointer ^ (_filePointer >>> 32));
+    result = prime * result + ((_name == null) ? 0 : _name.hashCode());
+    result = prime * result + ((_si == null) ? 0 : _si.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    CachedKey other = (CachedKey) obj;
+    if (_filePointer != other._filePointer)
+      return false;
+    if (_name == null) {
+      if (other._name != null)
+        return false;
+    } else if (!_name.equals(other._name))
+      return false;
+    if (_si == null) {
+      if (other._si != null)
+        return false;
+    } else if (!_si.equals(other._si))
+      return false;
+    return true;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
index 54f1097..df81832 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexInput.java
@@ -257,6 +257,9 @@ public class CacheIndexInput extends IndexInput {
     clone._key = _key.clone();
     clone._indexInput = _indexInput.clone();
     clone._quiet = _cache.shouldBeQuiet(_directory, _fileName);
+    if (clone._cacheValue != null) {
+      clone._cacheValue.incRef();
+    }
     return clone;
   }
 
@@ -287,6 +290,7 @@ public class CacheIndexInput extends IndexInput {
 
   private void releaseCache() {
     if (_cacheValue != null) {
+      _cacheValue.decRef();
       _cacheValue = null;
     }
   }
@@ -296,6 +300,7 @@ public class CacheIndexInput extends IndexInput {
     _cacheValue = get(_key);
     if (_cacheValue == null) {
       _cacheValue = _cache.newInstance(_directory, _fileName);
+      _cacheValue.incRef();
       long filePosition = getFilePosition();
       _indexInput.seek(filePosition);
       byte[] buffer = _store.takeBuffer(_bufferSize);
@@ -310,6 +315,8 @@ public class CacheIndexInput extends IndexInput {
       }
       _store.putBuffer(buffer);
       _cache.put(_key.clone(), _cacheValue);
+    } else {
+      _cacheValue.incRef();
     }
     _blockPosition = getBlockPosition();
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
index c5dde83..7c15cf2 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheIndexOutput.java
@@ -94,11 +94,13 @@ public class CacheIndexOutput extends IndexOutput {
       return;
     }
     CacheValue cacheValue = _cache.newInstance(_directory, _fileName);
+    cacheValue.incRef();
     writeBufferToOutputStream(length);
     cacheValue.write(0, _buffer, 0, length);
     long blockId = (_position - length) / _cacheBlockSize;
     cacheValue = cacheValue.trim(length);
     _cache.put(new CacheKey(_fileId, blockId), cacheValue);
+    cacheValue.decRef();
     _bufferPosition = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
index 2e0a5bf..997f24d 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValue.java
@@ -130,4 +130,8 @@ public interface CacheValue {
    */
   CacheValue trim(int length);
 
+  void decRef();
+
+  void incRef();
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValueBufferPool.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValueBufferPool.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValueBufferPool.java
index 3904642..7929df2 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValueBufferPool.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/CacheValueBufferPool.java
@@ -91,6 +91,9 @@ public class CacheValueBufferPool implements Closeable {
   }
 
   public void returnToPool(CacheValue cacheValue) {
+    if (cacheValue == null) {
+      return;
+    }
     BlockingQueue<CacheValue> blockingQueue = getPool(cacheValue.length());
     if (!blockingQueue.offer(cacheValue)) {
       _detroyed.mark();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
index 62aeb93..dc45eab 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/BaseCacheValue.java
@@ -153,4 +153,15 @@ public abstract class BaseCacheValue implements CacheValue {
   public CacheValue detachFromCache() {
     throw new RuntimeException("Not implemented.");
   }
+
+  @Override
+  public void decRef() {
+
+  }
+
+  @Override
+  public void incRef() {
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/2fd1011a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
index fe9e0b9..959e747 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/cachevalue/DetachableCacheValue.java
@@ -21,6 +21,7 @@ import static org.apache.blur.metrics.MetricsConstants.DETACHES;
 import static org.apache.blur.metrics.MetricsConstants.ORG_APACHE_BLUR;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.blur.store.blockcache_v2.CacheValue;
 
@@ -28,29 +29,44 @@ import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.core.MetricName;
 
-public class DetachableCacheValue implements CacheValue {
+@SuppressWarnings("serial")
+public class DetachableCacheValue extends AtomicInteger implements CacheValue {
 
   private static final Meter _detaches;
 
   static {
     _detaches = Metrics.newMeter(new MetricName(ORG_APACHE_BLUR, CACHE, DETACHES), DETACHES,
TimeUnit.SECONDS);
   }
-  
+
   private volatile CacheValue _baseCacheValue;
-  
+
   public DetachableCacheValue(CacheValue cacheValue) {
     _baseCacheValue = cacheValue;
   }
 
   @Override
   public CacheValue detachFromCache() {
-    _detaches.mark();
-    CacheValue result = _baseCacheValue;
-    int length = _baseCacheValue.length();
-    ByteArrayCacheValue byteArrayCacheValue = new ByteArrayCacheValue(length);
-    _baseCacheValue.read(0, byteArrayCacheValue._buffer, 0, length);
-    _baseCacheValue = byteArrayCacheValue;
-    return result;
+    if (_baseCacheValue instanceof ByteArrayCacheValue) {
+      // already detached
+      return null;
+    } else if (_baseCacheValue instanceof UnsafeCacheValue) {
+      final CacheValue result = _baseCacheValue;
+      if (get() == 0) {
+        // No one is using this so don't copy
+        // NULL out reference so just in case there can't be a seg fault.
+        _baseCacheValue = null;
+      } else {
+        // Copy data, because someone might access at some point
+        _detaches.mark();
+        int length = _baseCacheValue.length();
+        ByteArrayCacheValue byteArrayCacheValue = new ByteArrayCacheValue(length);
+        _baseCacheValue.read(0, byteArrayCacheValue._buffer, 0, length);
+        _baseCacheValue = byteArrayCacheValue;
+      }
+      return result;
+    } else {
+      throw new RuntimeException("Unsupported type of [" + _baseCacheValue + "]");
+    }
   }
 
   @Override
@@ -98,4 +114,14 @@ public class DetachableCacheValue implements CacheValue {
     return this;
   }
 
+  @Override
+  public void decRef() {
+    decrementAndGet();
+  }
+
+  @Override
+  public void incRef() {
+    incrementAndGet();
+  }
+
 }
\ No newline at end of file


Mime
View raw message