incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Adding more logging to the layout calculation, and now the system will recalculate once every 5 minutes by default.
Date Fri, 06 Dec 2013 15:54:05 GMT
Updated Branches:
  refs/heads/apache-blur-0.2 4c2882f4a -> 2da9ab814


Adding more logging to the layout calculation, and now the system will recalculate once every
5 minutes by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/8a560362
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/8a560362
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/8a560362

Branch: refs/heads/apache-blur-0.2
Commit: 8a560362d45e3a99bc8b9d00f725750c765e0db1
Parents: 4c2882f
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Fri Dec 6 10:14:00 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Fri Dec 6 10:14:00 2013 -0500

----------------------------------------------------------------------
 .../indexserver/DistributedIndexServer.java       | 15 ++++++++++++---
 .../MasterBasedDistributedLayoutFactory.java      | 17 ++++++++++++-----
 .../manager/indexserver/MasterBasedLeveler.java   | 18 +++++++++++++-----
 .../apache/blur/thrift/ThriftBlurShardServer.java |  4 +++-
 .../java/org/apache/blur/utils/BlurConstants.java |  1 +
 .../org/apache/blur/zookeeper/WatchChildren.java  | 10 ++++++++--
 .../src/main/resources/blur-default.properties    |  3 +++
 7 files changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
index 6e91666..a0b83de 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/DistributedIndexServer.java
@@ -116,13 +116,15 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
   private final ConcurrentMap<String, Map<String, BlurIndex>> _indexes = new
ConcurrentHashMap<String, Map<String, BlurIndex>>();
   private final ShardStateManager _shardStateManager = new ShardStateManager();
   private final Closer _closer;
+  private final long _balancerTime;
 
   public DistributedIndexServer(Configuration configuration, ZooKeeper zookeeper, ClusterStatus
clusterStatus,
       BlurIndexWarmup warmup, BlurFilterCache filterCache, BlockCacheDirectoryFactory blockCacheDirectoryFactory,
       DistributedLayoutFactory distributedLayoutFactory, String cluster, String nodeName,
long safeModeDelay,
-      int shardOpenerThreadCount, int internalSearchThreads, int warmupThreads, int maxMergeThreads)
+      int shardOpenerThreadCount, int internalSearchThreads, int warmupThreads, int maxMergeThreads,
long balancerTime)
       throws KeeperException, InterruptedException {
     super(clusterStatus, configuration, nodeName, cluster);
+    _balancerTime = balancerTime;
     _closer = Closer.create();
     _shardOpenerThreadCount = shardOpenerThreadCount;
     _zookeeper = zookeeper;
@@ -248,6 +250,7 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
   }
 
   private WatchChildren watchForShardServerChanges() {
+    
     WatchChildren watchOnlineShards = new WatchChildren(_zookeeper,
         ZookeeperPathConstants.getOnlineShardsPath(_cluster)).watch(new OnChange() {
       private List<String> _prevOnlineShards = new ArrayList<String>();
@@ -257,22 +260,28 @@ public class DistributedIndexServer extends AbstractDistributedIndexServer
{
         List<String> oldOnlineShards = _prevOnlineShards;
         _prevOnlineShards = onlineShards;
         _layout.clear();
-        LOG.info("Online shard servers changed, clearing layout managers and cache.");
+        LOG.info("Layouts cleared, possible node change or rebalance.");
+        boolean change = false;
         if (oldOnlineShards == null) {
           oldOnlineShards = new ArrayList<String>();
         }
         for (String oldOnlineShard : oldOnlineShards) {
           if (!onlineShards.contains(oldOnlineShard)) {
             LOG.info("Node went offline [{0}]", oldOnlineShard);
+            change = true;
           }
         }
         for (String onlineShard : onlineShards) {
           if (!oldOnlineShards.contains(onlineShard)) {
             LOG.info("Node came online [{0}]", onlineShard);
+            change = true;
           }
         }
+        if (change) {
+          LOG.info("Online shard servers changed, clearing layout managers and cache.");
+        }
       }
-    });
+    }, _balancerTime, TimeUnit.MILLISECONDS);
     return _closer.register(watchOnlineShards);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
index ee51ee5..8102115 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
@@ -95,14 +95,17 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
     try {
       _zooKeeperLockManager.lock(table);
       String storagePath = getStoragePath(table);
+      LOG.info("Checking for existing layout for table [{0}]", table);
       Stat stat = _zooKeeper.exists(storagePath, false);
       MasterBasedDistributedLayout existingLayout = null;
       if (stat != null) {
+        LOG.info("Existing layout found for table [{0}]", table);
         byte[] data = _zooKeeper.getData(storagePath, false, stat);
         if (data != null) {
           MasterBasedDistributedLayout storedLayout = fromBytes(data);
+          LOG.info("Checking if layout is out of date for table [{0}]", table);
           if (!storedLayout.isOutOfDate(onlineShardServerList, shardServerList)) {
-            LOG.info("New layout fetched.");
+            LOG.info("Layout is up-to-date for table [{0}]", table);
             return storedLayout;
           }
           // If there was a stored layout, use the stored layout as a
@@ -110,11 +113,13 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
           existingLayout = storedLayout;
         }
       }
+      LOG.info("Calculating new layout for table [{0}]", table);
       // recreate
       Map<String, String> newCalculatedLayout = calculateNewLayout(table, existingLayout,
onlineShardServerList,
           shardServerList);
       MasterBasedDistributedLayout layout = new MasterBasedDistributedLayout(newCalculatedLayout,
           onlineShardServerList, shardServerList);
+      LOG.info("New layout created for table [{0}]", table);
       if (_zooKeeper.exists(storagePath, false) == null) {
         _zooKeeper.create(storagePath, toBytes(layout), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
       } else {
@@ -168,12 +173,14 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
           increment(onlineServerShardCount, server);
         }
       }
+      LOG.info("Existing layout counts for table [{0}] are [{1}] and offline shards are [{2}]",
table,
+          onlineServerShardCount, shardsThatAreOffline);
 
-      LOG.info("Adding in new shard servers for table [{0}]", table);
+      LOG.info("Adding in new shard servers for table [{0}] current shard servers are [{1}]",
table, shardServerSet);
       // Add counts for new shard servers
       for (String server : shardServerSet) {
         if (!onlineServerShardCount.containsKey(server)) {
-          LOG.info("New shard server [{0}]", server);
+          LOG.info("New shard server found [{0}]", server);
           onlineServerShardCount.put(server, 0);
         }
       }
@@ -189,9 +196,9 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
         increment(onlineServerShardCount, server);
       }
 
-      LOG.info("Leveling any shard hotspots for table [{0}]", table);
+      LOG.info("Leveling any shard hotspots for table [{0}] for layout [{1}]", table, newLayoutMap);
       // Level shards
-      MasterBasedLeveler.level(shardList.size(), shardServerSet.size(), onlineServerShardCount,
newLayoutMap);
+      MasterBasedLeveler.level(shardList.size(), shardServerSet.size(), onlineServerShardCount,
newLayoutMap, table);
       return newLayoutMap;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
index 2bf7493..18fb611 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
@@ -36,7 +36,7 @@ public class MasterBasedLeveler {
   private static final Log LOG = LogFactory.getLog(MasterBasedLeveler.class);
 
   public static void level(int totalShards, int totalShardServers, Map<String, Integer>
onlineServerShardCount,
-      Map<String, String> newLayoutMap) {
+      Map<String, String> newLayoutMap, String table) {
     List<Entry<String, Integer>> onlineServerShardCountList = new ArrayList<Map.Entry<String,
Integer>>(
         onlineServerShardCount.entrySet());
     Collections.sort(onlineServerShardCountList, new Comparator<Entry<String, Integer>>()
{
@@ -55,7 +55,7 @@ public class MasterBasedLeveler {
 
     Set<String> overAllocatedSet = new HashSet<String>();
     Set<String> underAllocatedSet = new HashSet<String>();
-    LOG.info("Optimum server shard count [{0}]", opt);
+    LOG.info("Optimum server shard count [{0}] for table [{1}]", opt, table);
     for (Entry<String, Integer> e : onlineServerShardCountList) {
       int countInt = e.getValue();
       float count = countInt;
@@ -86,8 +86,9 @@ public class MasterBasedLeveler {
     while (!underAllocatedSet.isEmpty() && !overAllocatedSet.isEmpty()) {
       String overAllocatedServer = getFirst(overAllocatedSet);
       String underAllocatedServer = getFirst(underAllocatedSet);
+      LOG.info("Over allocated server [{0}] under allocated server [{1}]", overAllocatedServer,
underAllocatedServer);
       moveSingleShard(overAllocatedServer, underAllocatedServer, opt, overAllocatedSet, underAllocatedSet,
-          newLayoutMap, onlineServerShardCount, serverToShards);
+          newLayoutMap, onlineServerShardCount, serverToShards, table);
     }
   }
 
@@ -101,7 +102,7 @@ public class MasterBasedLeveler {
 
   private static void moveSingleShard(String srcServer, String distServer, float opt,
       Set<String> overAllocatedServerSet, Set<String> underAllocatedServerSet,
Map<String, String> newLayoutMap,
-      Map<String, Integer> onlineServerShardCount, Map<String, SortedSet<String>>
serverToShards) {
+      Map<String, Integer> onlineServerShardCount, Map<String, SortedSet<String>>
serverToShards, String table) {
 
     SortedSet<String> srcShards = serverToShards.get(srcServer);
     if (srcShards == null) {
@@ -114,17 +115,24 @@ public class MasterBasedLeveler {
       serverToShards.put(distServer, distShards);
     }
 
+    LOG.info("Source server shard list for table [{0}] is [{1}]", table, srcShards);
+    LOG.info("Destination server shard list for table [{0}] is [{1}]", table, distShards);
+
     String srcShard = getFirst(srcShards);
 
+    LOG.info("Moving shard [{0}] from [{1}] to [{2}] for table [{3}]", srcShard, srcServer,
distServer, table);
+
     srcShards.remove(srcShard);
     distShards.add(srcShard);
 
     if (!isNotInOptBalance(opt, srcShards.size())) {
+      LOG.info("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", srcServer,
srcShards.size(), opt);
       overAllocatedServerSet.remove(srcServer);
       underAllocatedServerSet.remove(srcServer);
     }
 
     if (!isNotInOptBalance(opt, distShards.size())) {
+      LOG.info("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", distServer,
distShards.size(), opt);
       overAllocatedServerSet.remove(distServer);
       underAllocatedServerSet.remove(distServer);
     }
@@ -150,7 +158,7 @@ public class MasterBasedLeveler {
     } else {
       int value = i - 1;
       if (value < 0) {
-        value = 0;  
+        value = 0;
       }
       map.put(key, value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 4159b91..a511ea8 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -26,6 +26,7 @@ import static org.apache.blur.utils.BlurConstants.BLUR_INDEXMANAGER_SEARCH_THREA
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_CLAUSE_COUNT;
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_HEAP_PER_ROW_FETCH;
 import static org.apache.blur.utils.BlurConstants.BLUR_MAX_RECORDS_PER_ROW_FETCH_REQUEST;
+import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BALANCER_PERIOD;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_ADDRESS;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BIND_PORT;
 import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_BLOCK_CACHE_TOTAL_SIZE;
@@ -207,9 +208,10 @@ public class ThriftBlurShardServer extends ThriftServer {
     int internalSearchThreads = configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 16);
     int warmupThreads = configuration.getInt(BLUR_SHARD_WARMUP_THREAD_COUNT, 16);
     int maxMergeThreads = configuration.getInt(BLUR_SHARD_MERGE_THREAD_COUNT, 3);
+    long shardBalancerPeriod = configuration.getLong(BLUR_SHARD_BALANCER_PERIOD, TimeUnit.MINUTES.toMillis(5));
     final DistributedIndexServer indexServer = new DistributedIndexServer(config, zooKeeper,
clusterStatus,
         indexWarmup, filterCache, blockCacheDirectoryFactory, distributedLayoutFactory, cluster,
nodeName,
-        safeModeDelay, shardOpenerThreadCount, internalSearchThreads, warmupThreads, maxMergeThreads);
+        safeModeDelay, shardOpenerThreadCount, internalSearchThreads, warmupThreads, maxMergeThreads,
shardBalancerPeriod);
 
     BooleanQuery.setMaxClauseCount(configuration.getInt(BLUR_MAX_CLAUSE_COUNT, 1024));
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
index 8aa6f07..513409a 100644
--- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
+++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java
@@ -36,6 +36,7 @@ public class BlurConstants {
   public static final String SUPER = "super";
   public static final String SEP = ".";
 
+  public static final String BLUR_SHARD_BALANCER_PERIOD = "blur.shard.balancer.period";
   public static final String BLUR_TABLE_PATH = "blur.table.path";
   public static final String BLUR_ZOOKEEPER_CONNECTION = "blur.zookeeper.connection";
   public static final String BLUR_ZOOKEEPER_TRACE_PATH = "blur.zookeeper.trace.path";

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
index 2d64511..56d7eb1 100644
--- a/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
+++ b/blur-util/src/main/java/org/apache/blur/zookeeper/WatchChildren.java
@@ -58,6 +58,10 @@ public class WatchChildren implements Closeable {
   }
 
   public WatchChildren watch(final OnChange onChange) {
+    return watch(onChange, 0, TimeUnit.SECONDS);
+  }
+
+  public WatchChildren watch(final OnChange onChange, long fireAnywayTime, TimeUnit timeUnit)
{
     if (_debug) {
       StringWriter writer = new StringWriter();
       PrintWriter printWriter = new PrintWriter(writer);
@@ -65,6 +69,7 @@ public class WatchChildren implements Closeable {
       printWriter.close();
       _debugStackTrace = writer.toString();
     }
+    final long timeToFireAnywayTime = TimeUnit.MILLISECONDS.convert(fireAnywayTime, timeUnit);
     _watchThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -87,7 +92,7 @@ public class WatchChildren implements Closeable {
               } catch (Throwable t) {
                 LOG.error("Unknown error during onchange action [" + this + "].", t);
               }
-              _lock.wait();
+              _lock.wait(timeToFireAnywayTime);
             } catch (KeeperException e) {
               if (!_running.get()) {
                 LOG.info("Error [{0}]", e.getMessage());
@@ -95,7 +100,8 @@ public class WatchChildren implements Closeable {
               }
               if (e.code() == Code.NONODE) {
                 if (_debug) {
-                  LOG.debug("Path for watching not found [{0}], no longer watching, debug
[{1}].", _path, _debugStackTrace);
+                  LOG.debug("Path for watching not found [{0}], no longer watching, debug
[{1}].", _path,
+                      _debugStackTrace);
                 } else {
                   LOG.debug("Path for watching not found [{0}], no longer watching.", _path);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/8a560362/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties
index 75b487a..07f7193 100644
--- a/blur-util/src/main/resources/blur-default.properties
+++ b/blur-util/src/main/resources/blur-default.properties
@@ -142,6 +142,9 @@ blur.shard.time.between.refreshs=3000
 # The max number of threads used during index merges
 blur.shard.merge.thread.count=3
 
+# The time period in which the balancer will recalculate the shard distribution.  NOTE: This
is time is per shard server so the checking may occur more often.
+blur.shard.balancer.period=300000
+
 # The maximum number of clauses in a BooleanQuery
 blur.max.clause.count=1024
 


Mime
View raw message