incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [3/3] git commit: Working on BLUR-433
Date Mon, 08 Jun 2015 14:12:33 GMT
Working on BLUR-433


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/131ddb6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/131ddb6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/131ddb6d

Branch: refs/heads/master
Commit: 131ddb6d9669f558b88aef699c68d493049fd1f5
Parents: 562f6c6
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Jun 8 10:12:04 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Jun 8 10:12:04 2015 -0400

----------------------------------------------------------------------
 .../blur/store/blockcache_v2/BaseCache.java     |   2 -
 .../SimpleCacheValueBufferPool.java             |  10 +-
 .../SlabAllocationCacheValueBufferPool.java     | 169 +++++++++++++++++--
 .../SimpleCacheValueBufferPoolTest.java         |  73 ++++++++
 .../SlabAllocationCacheValueBufferPoolTest.java |  62 +++++++
 5 files changed, 293 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/131ddb6d/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
index 5b0da07..5dc473c 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/BaseCache.java
@@ -66,7 +66,6 @@ public class BaseCache extends Cache implements Closeable {
   class BaseCacheEvictionListener implements EvictionListener<CacheKey, CacheValue>
{
     @Override
     public void onEviction(CacheKey key, CacheValue value) {
-      System.out.println("onEviction [" + key + "] [" + value + "]");
       _evictions.mark();
       _cacheValueBufferPool.returnToPool(value.detachFromCache());
     }
@@ -290,7 +289,6 @@ public class BaseCache extends Cache implements Closeable {
   public void put(CacheDirectory directory, String fileName, CacheKey key, CacheValue value)
{
     CacheValue cacheValue = _cacheMap.put(key, value);
     if (cacheValue != null) {
-      System.out.println("put [" + key + "] [" + value + "]");
       _evictions.mark();
       _cacheValueBufferPool.returnToPool(cacheValue.detachFromCache());
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/131ddb6d/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPool.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPool.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPool.java
index e6fa9a2..de69df9 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPool.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPool.java
@@ -33,11 +33,11 @@ public class SimpleCacheValueBufferPool extends BaseCacheValueBufferPool
{
   private static final Log LOG = LogFactory.getLog(SimpleCacheValueBufferPool.class);
 
   private final ConcurrentMap<Integer, BlockingQueue<CacheValue>> _cacheValuePool
= new ConcurrentHashMap<Integer, BlockingQueue<CacheValue>>();
-  private final int _capacity;
+  private final int _queueDepth;
 
-  public SimpleCacheValueBufferPool(STORE store, int capacity) {
+  public SimpleCacheValueBufferPool(STORE store, int queueDepth) {
     super(store);
-    _capacity = capacity;
+    _queueDepth = queueDepth;
   }
 
   public CacheValue getCacheValue(int cacheBlockSize) {
@@ -60,8 +60,8 @@ public class SimpleCacheValueBufferPool extends BaseCacheValueBufferPool
{
   }
 
   private BlockingQueue<CacheValue> buildNewBlockQueue(int cacheBlockSize) {
-    LOG.info("Allocating new ArrayBlockingQueue with capacity [{0}]", _capacity);
-    BlockingQueue<CacheValue> value = new ArrayBlockingQueue<CacheValue>(_capacity);
+    LOG.info("Allocating new ArrayBlockingQueue with capacity [{0}]", _queueDepth);
+    BlockingQueue<CacheValue> value = new ArrayBlockingQueue<CacheValue>(_queueDepth);
     _cacheValuePool.putIfAbsent(cacheBlockSize, value);
     return _cacheValuePool.get(cacheBlockSize);
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/131ddb6d/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPool.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPool.java
b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPool.java
index 7891fbf..3b9a5c5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPool.java
+++ b/blur-store/src/main/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPool.java
@@ -17,53 +17,190 @@
 package org.apache.blur.store.blockcache_v2;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.blur.store.blockcache.BlockLocks;
 import org.apache.blur.store.blockcache_v2.BaseCache.STORE;
+import org.apache.blur.store.blockcache_v2.cachevalue.UnsafeWrappedCacheValue;
+import org.apache.blur.store.blockcache_v2.cachevalue.UnsafeWrapperMultiCacheValue;
+import org.apache.blur.store.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
 
 public class SlabAllocationCacheValueBufferPool extends BaseCacheValueBufferPool {
 
-  public SlabAllocationCacheValueBufferPool(STORE store) {
-    super(store);
+  private static final Unsafe _unsafe;
+
+  private final Collection<Slab> _slabs;
+  private final int _chunkSize;
+  private final int _slabSize;
+  private final int _numberOfBlocksPerSlab;
+
+  static {
+    _unsafe = UnsafeUtil.getUnsafe();
+  }
+
+  public SlabAllocationCacheValueBufferPool(int chunkSize, int slabSize) {
+    super(STORE.OFF_HEAP);
+    _chunkSize = chunkSize;
+    _slabSize = slabSize;
+    _numberOfBlocksPerSlab = _slabSize / _chunkSize;
+    _slabs = Collections.newSetFromMap(new ConcurrentHashMap<Slab, Boolean>());
   }
 
   static class Slab {
     final long _address;
     final BlockLocks _locks;
+    final int _chunkSize;
+    final long _maxAddress;
 
-    Slab(long address, int numberOfBlocks) {
+    Slab(long address, int numberOfChunks, int chunkSize) {
       _address = address;
-      _locks = new BlockLocks(numberOfBlocks);
+      _maxAddress = _address + ((long) numberOfChunks * (long) chunkSize);
+      _locks = new BlockLocks(numberOfChunks);
+      _chunkSize = chunkSize;
+    }
+
+    long findChunk() {
+      while (true) {
+        int chunkId = _locks.nextClearBit(0);
+        if (chunkId < 0) {
+          return -1L;
+        }
+        if (_locks.set(chunkId)) {
+          return _address + ((long) chunkId * (long) _chunkSize);
+        }
+      }
+    }
+
+    boolean releaseIfValid(long address) {
+      if (address >= _address && address < _maxAddress) {
+        long offset = address - _address;
+        int index = (int) (offset / _chunkSize);
+        _locks.clear(index);
+      }
+      return false;
     }
 
-    int findChunk() {
-      throw new RuntimeException("Not implemented.");
+    void release() {
+      _unsafe.freeMemory(_address);
     }
   }
 
   @Override
   public CacheValue getCacheValue(int cacheBlockSize) {
-    
-    List<Slab> slabs = getSlabs();
-    for (Slab slab : slabs) {
-      int chunkId = slab.findChunk();
+    validCacheBlockSize(cacheBlockSize);
+    int numberOfChunks = getNumberOfChunks(cacheBlockSize);
+    if (numberOfChunks == 1) {
+      while (true) {
+        Collection<Slab> slabs = getSlabs();
+        for (Slab slab : slabs) {
+          final long chunkAddress = slab.findChunk();
+          if (chunkAddress >= 0) {
+            // found one!
+            return new UnsafeWrappedCacheValue(cacheBlockSize, chunkAddress) {
+              @Override
+              protected void releaseInternal() {
+                releaseChunk(chunkAddress);
+              }
+            };
+          }
+        }
+        maybeAllocateNewSlab(slabs.size());
+      }
+    } else {
+      final long[] chunkAddresses = new long[numberOfChunks];
+      int chunksFound = 0;
+      while (true) {
+        Collection<Slab> slabs = getSlabs();
+        for (Slab slab : slabs) {
+          INNER: while (chunksFound < numberOfChunks) {
+            long chunkAddress = slab.findChunk();
+            if (chunkAddress >= 0) {
+              // found one!
+              chunkAddresses[chunksFound] = chunkAddress;
+              chunksFound++;
+            } else {
+              break INNER;
+            }
+          }
+        }
+        if (chunksFound == numberOfChunks) {
+          return new UnsafeWrapperMultiCacheValue(cacheBlockSize, chunkAddresses, _chunkSize)
{
+            @Override
+            protected void releaseInternal() {
+              releaseChunks(chunkAddresses);
+            }
+          };
+        }
+        maybeAllocateNewSlab(slabs.size());
+      }
+    }
+
+  }
+
+  private synchronized void maybeAllocateNewSlab(int numberOfSlabs) {
+    Collection<Slab> slabs = getSlabs();
+    if (slabs.size() == numberOfSlabs) {
+      allocateNewSlab();
     }
-    throw new RuntimeException("Not implemented.");
+    return;
+  }
+
+  private void allocateNewSlab() {
+    long address = _unsafe.allocateMemory(_slabSize);
+    _slabs.add(new Slab(address, _numberOfBlocksPerSlab, _chunkSize));
   }
 
-  private List<Slab> getSlabs() {
-    throw new RuntimeException("Not implemented.");
+  private Collection<Slab> getSlabs() {
+    return _slabs;
+  }
+
+  private void releaseChunks(long[] addresses) {
+    for (long address : addresses) {
+      releaseChunk(address);
+    }
+  }
+
+  private void releaseChunk(long address) {
+    Collection<Slab> slabs = getSlabs();
+    for (Slab slab : slabs) {
+      if (slab.releaseIfValid(address)) {
+        return;
+      }
+    }
   }
 
   @Override
   public void returnToPool(CacheValue cacheValue) {
-    throw new RuntimeException("Not implemented.");
+    cacheValue.release();
   }
 
   @Override
   public void close() throws IOException {
-    throw new RuntimeException("Not implemented.");
+    Collection<Slab> slabs = getSlabs();
+    for (Slab slab : slabs) {
+      slab.release();
+    }
+  }
+
+  private void validCacheBlockSize(int cacheBlockSize) {
+    if (cacheBlockSize >= 1) {
+      return;
+    }
+    throw new RuntimeException("CacheBlockSize requested [" + cacheBlockSize + "] is invalid.");
   }
 
+  private int getNumberOfChunks(int cacheBlockSize) {
+    if (cacheBlockSize <= _chunkSize) {
+      return 1;
+    }
+    int chunks = cacheBlockSize / _chunkSize;
+    if (cacheBlockSize % _chunkSize == 0) {
+      return chunks;
+    }
+    return chunks + 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/131ddb6d/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPoolTest.java
b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPoolTest.java
new file mode 100644
index 0000000..b108dc5
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SimpleCacheValueBufferPoolTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.blur.store.blockcache_v2.BaseCache.STORE;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimpleCacheValueBufferPoolTest {
+
+  private static final int QUEUE_DEPTH = 100;
+
+  private SimpleCacheValueBufferPool _pool;
+
+  @Before
+  public void setup() {
+    _pool = new SimpleCacheValueBufferPool(STORE.ON_HEAP, QUEUE_DEPTH);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    _pool.close();
+  }
+
+  @Test
+  public void test1() {
+    long count = _pool._created.count();
+    CacheValue cacheValue1 = _pool.getCacheValue(1);
+    assertEquals(count + 1L, _pool._created.count());
+    _pool.returnToPool(cacheValue1);
+    long count2 = _pool._reused.count();
+    CacheValue cacheValue2 = _pool.getCacheValue(1);
+    assertEquals(count2 + 1L, _pool._reused.count());
+    _pool.returnToPool(cacheValue2);
+  }
+
+  @Test
+  public void test2() {
+    List<CacheValue> list = new ArrayList<CacheValue>();
+    long count = _pool._created.count();
+    int total = 10000;
+    for (int i = 0; i < total; i++) {
+      list.add(_pool.getCacheValue(10000));
+    }
+    assertEquals(count + total, _pool._created.count());
+    long count2 = _pool._detroyed.count();
+    for (CacheValue cacheValue : list) {
+      _pool.returnToPool(cacheValue);
+    }
+    assertEquals(count2 + (total - QUEUE_DEPTH), _pool._detroyed.count());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/131ddb6d/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPoolTest.java
b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPoolTest.java
new file mode 100644
index 0000000..b3a77a9
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/blockcache_v2/SlabAllocationCacheValueBufferPoolTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.blockcache_v2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SlabAllocationCacheValueBufferPoolTest {
+
+  private SlabAllocationCacheValueBufferPool _pool;
+
+  @Before
+  public void setup() {
+    int chunkSize = 1000;
+    int slabSize = chunkSize * 10000;
+    _pool = new SlabAllocationCacheValueBufferPool(chunkSize, slabSize);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    _pool.close();
+  }
+
+  @Test
+  public void test1() {
+    CacheValue cacheValue1 = _pool.getCacheValue(1);
+    _pool.returnToPool(cacheValue1);
+    CacheValue cacheValue2 = _pool.getCacheValue(1);
+    _pool.returnToPool(cacheValue2);
+  }
+
+  @Test
+  public void test2() {
+    List<CacheValue> list = new ArrayList<CacheValue>();
+    int total = 10000;
+    for (int i = 0; i < total; i++) {
+      list.add(_pool.getCacheValue(10000));
+    }
+    for (CacheValue cacheValue : list) {
+      _pool.returnToPool(cacheValue);
+    }
+  }
+}


Mime
View raw message