incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/4] git commit: Preventing the same queries from occuring at the same time. Now the result of one request can now answer all blocking queries.
Date Mon, 02 Mar 2015 14:54:50 GMT
Preventing the same queries from occuring at the same time.  Now the result of one request
can now answer all blocking queries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b4dfa04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b4dfa04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b4dfa04f

Branch: refs/heads/master
Commit: b4dfa04f57db095cbeec526577825cacd792df85
Parents: 8f26d68
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Mon Mar 2 09:45:56 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Mon Mar 2 09:45:56 2015 -0500

----------------------------------------------------------------------
 .../blur/server/cache/ThriftCacheServer.java    | 139 ++++++++++++++-----
 1 file changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b4dfa04f/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCacheServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCacheServer.java b/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCacheServer.java
index 7264801..a2f3610 100644
--- a/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCacheServer.java
+++ b/blur-core/src/main/java/org/apache/blur/server/cache/ThriftCacheServer.java
@@ -24,6 +24,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.manager.IndexServer;
@@ -43,6 +49,7 @@ public class ThriftCacheServer extends FilteredBlurServer {
 
   private final ThriftCache _thriftCache;
   private final IndexServer _indexServer;
+  private final ConcurrentMap<ThriftCacheKey<?>, Lock> _lockMap = new ConcurrentHashMap<ThriftCacheKey<?>,
Lock>();
 
   public ThriftCacheServer(BlurConfiguration configuration, Iface iface, IndexServer indexServer,
       ThriftCache thriftCache) {
@@ -54,26 +61,16 @@ public class ThriftCacheServer extends FilteredBlurServer {
   @Override
   public TableStats tableStats(String table) throws BlurException, TException {
     ThriftCacheKey<TableStats> key = _thriftCache.getKey(table, getShards(table), null,
TableStats.class);
-    TableStats results = _thriftCache.get(key, TableStats.class);
-    if (results != null) {
-      return results;
-    }
-    return _thriftCache.put(key, super.tableStats(table));
-  }
-
-  private int[] getShards(String table) throws BlurException {
+    Lock lock = getOrCreateLock(key);
     try {
-      Set<String> keySet = _indexServer.getIndexes(table).keySet();
-      int[] shards = new int[keySet.size()];
-      int i = 0;
-      for (String s : keySet) {
-        int shardIndex = ShardUtil.getShardIndex(s);
-        shards[i++] = shardIndex;
+      lock.lock();
+      TableStats results = _thriftCache.get(key, TableStats.class);
+      if (results != null) {
+        return results;
       }
-      Arrays.sort(shards);
-      return shards;
-    } catch (IOException e) {
-      throw new BException("Unknown error while trying to get current shards for table [{0}]",
e, table);
+      return _thriftCache.put(key, super.tableStats(table));
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -92,28 +89,40 @@ public class ThriftCacheServer extends FilteredBlurServer {
     copy.cacheResult = false;
     copy.startTime = 0;
     ThriftCacheKey<BlurQuery> key = _thriftCache.getKey(table, getShards(table), copy,
BlurQuery.class);
-    if (useCacheIfPresent) {
-      BlurResults results = _thriftCache.get(key, BlurResults.class);
-      if (results != null) {
-        return results;
+    Lock lock = getOrCreateLock(key);
+    try {
+      lock.lock();
+      if (useCacheIfPresent) {
+        BlurResults results = _thriftCache.get(key, BlurResults.class);
+        if (results != null) {
+          return results;
+        }
       }
+      BlurResults blurResults = super.query(table, blurQuery);
+      if (cacheResult) {
+        return _thriftCache.put(key, blurResults);
+      }
+      return blurResults;
+    } finally {
+      lock.unlock();
     }
-    BlurResults blurResults = super.query(table, blurQuery);
-    if (cacheResult) {
-      return _thriftCache.put(key, blurResults);
-    }
-    return blurResults;
   }
 
   @Override
   public FetchResult fetchRow(String table, Selector selector) throws BlurException, TException
{
     Selector copy = new Selector(selector);
     ThriftCacheKey<Selector> key = _thriftCache.getKey(table, getShards(table), copy,
Selector.class);
-    FetchResult results = _thriftCache.get(key, FetchResult.class);
-    if (results != null) {
-      return results;
+    Lock lock = getOrCreateLock(key);
+    try {
+      lock.lock();
+      FetchResult results = _thriftCache.get(key, FetchResult.class);
+      if (results != null) {
+        return results;
+      }
+      return _thriftCache.put(key, super.fetchRow(table, selector));
+    } finally {
+      lock.unlock();
     }
-    return _thriftCache.put(key, super.fetchRow(table, selector));
   }
 
   @Override
@@ -158,4 +167,72 @@ public class ThriftCacheServer extends FilteredBlurServer {
     return new ArrayList<FetchResult>(resultMap.values());
   }
 
+  private Lock getOrCreateLock(ThriftCacheKey<?> key) {
+    Lock lock = _lockMap.get(key);
+    if (lock == null) {
+      Lock nLock = new ReentrantReadWriteLock().writeLock();
+      Lock pLock = _lockMap.putIfAbsent(key, new InternalLock(nLock, _lockMap, key));
+      if (pLock == null) {
+        return nLock;
+      } else {
+        return pLock;
+      }
+    } else {
+      return lock;
+    }
+  }
+
+  private static class InternalLock implements Lock {
+    private final Lock _lock;
+    private final ConcurrentMap<ThriftCacheKey<?>, Lock> _lockMap;
+    private final ThriftCacheKey<?> _key;
+
+    public InternalLock(Lock lock, ConcurrentMap<ThriftCacheKey<?>, Lock> lockMap,
ThriftCacheKey<?> key) {
+      _lock = lock;
+      _lockMap = lockMap;
+      _key = key;
+    }
+
+    public void lock() {
+      _lock.lock();
+    }
+
+    public void lockInterruptibly() throws InterruptedException {
+      _lock.lockInterruptibly();
+    }
+
+    public boolean tryLock() {
+      return _lock.tryLock();
+    }
+
+    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+      return _lock.tryLock(time, unit);
+    }
+
+    public void unlock() {
+      _lock.unlock();
+      _lockMap.remove(_key);
+    }
+
+    public Condition newCondition() {
+      return _lock.newCondition();
+    }
+
+  }
+
+  private int[] getShards(String table) throws BlurException {
+    try {
+      Set<String> keySet = _indexServer.getIndexes(table).keySet();
+      int[] shards = new int[keySet.size()];
+      int i = 0;
+      for (String s : keySet) {
+        int shardIndex = ShardUtil.getShardIndex(s);
+        shards[i++] = shardIndex;
+      }
+      Arrays.sort(shards);
+      return shards;
+    } catch (IOException e) {
+      throw new BException("Unknown error while trying to get current shards for table [{0}]",
e, table);
+    }
+  }
 }


Mime
View raw message