incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Improving the leveling logic, and adding a test.
Date Sun, 08 Dec 2013 20:04:39 GMT
Updated Branches:
  refs/heads/master 043e52e5c -> 346a20119


Improving the leveling logic, and adding a test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/346a2011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/346a2011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/346a2011

Branch: refs/heads/master
Commit: 346a201190916ad1bbae92ffd83992e15963f466
Parents: 043e52e
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Dec 8 15:03:41 2013 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Dec 8 15:03:41 2013 -0500

----------------------------------------------------------------------
 .../MasterBasedDistributedLayoutFactory.java    |  10 +-
 .../manager/indexserver/MasterBasedLeveler.java |  68 ++++-
 .../indexserver/MasterBasedLevelerTest.java     | 267 +++++++++++++++++++
 3 files changed, 333 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/346a2011/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
index 8102115..b9faf54 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -57,6 +58,12 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
   private final ZooKeeperLockManager _zooKeeperLockManager;
   private final String _tableStoragePath;
   private final String _locksStoragePath;
+  private final ThreadLocal<Random> _random = new ThreadLocal<Random>() {
+    @Override
+    protected Random initialValue() {
+      return new Random();
+    }
+  };
 
   public MasterBasedDistributedLayoutFactory(ZooKeeper zooKeeper, String storagePath) {
     _zooKeeper = zooKeeper;
@@ -198,7 +205,8 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
 
       LOG.info("Leveling any shard hotspots for table [{0}] for layout [{1}]", table, newLayoutMap);
       // Level shards
-      MasterBasedLeveler.level(shardList.size(), shardServerSet.size(), onlineServerShardCount,
newLayoutMap, table);
+      MasterBasedLeveler.level(shardList.size(), shardServerSet.size(), onlineServerShardCount,
newLayoutMap, table,
+          _random.get());
       return newLayoutMap;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/346a2011/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
index 18fb611..5ffea89 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -35,8 +36,8 @@ public class MasterBasedLeveler {
 
   private static final Log LOG = LogFactory.getLog(MasterBasedLeveler.class);
 
-  public static void level(int totalShards, int totalShardServers, Map<String, Integer>
onlineServerShardCount,
-      Map<String, String> newLayoutMap, String table) {
+  public static int level(int totalShards, int totalShardServers, Map<String, Integer>
onlineServerShardCount,
+      Map<String, String> newLayoutMap, String table, Random random) {
     List<Entry<String, Integer>> onlineServerShardCountList = new ArrayList<Map.Entry<String,
Integer>>(
         onlineServerShardCount.entrySet());
     Collections.sort(onlineServerShardCountList, new Comparator<Entry<String, Integer>>()
{
@@ -55,17 +56,17 @@ public class MasterBasedLeveler {
 
     Set<String> overAllocatedSet = new HashSet<String>();
     Set<String> underAllocatedSet = new HashSet<String>();
-    LOG.info("Optimum server shard count [{0}] for table [{1}]", opt, table);
+    LOG.debug("Optimum server shard count [{0}] for table [{1}]", opt, table);
     for (Entry<String, Integer> e : onlineServerShardCountList) {
       int countInt = e.getValue();
       float count = countInt;
       String server = e.getKey();
       if (isNotInOptBalance(opt, count)) {
         if (count > opt) {
-          LOG.info("Level server [{0}] over allocated at [{1}]", server, e.getValue());
+          LOG.debug("Level server [{0}] over allocated at [{1}]", server, e.getValue());
           overAllocatedSet.add(server);
         } else {
-          LOG.info("Level server [{0}] under allocated at [{1}]", server, e.getValue());
+          LOG.debug("Level server [{0}] under allocated at [{1}]", server, e.getValue());
           underAllocatedSet.add(server);
         }
       }
@@ -83,13 +84,56 @@ public class MasterBasedLeveler {
       shards.add(shard);
     }
 
+    int moves = 0;
     while (!underAllocatedSet.isEmpty() && !overAllocatedSet.isEmpty()) {
       String overAllocatedServer = getFirst(overAllocatedSet);
       String underAllocatedServer = getFirst(underAllocatedSet);
-      LOG.info("Over allocated server [{0}] under allocated server [{1}]", overAllocatedServer,
underAllocatedServer);
+      LOG.debug("Over allocated server [{0}] under allocated server [{1}]", overAllocatedServer,
underAllocatedServer);
       moveSingleShard(overAllocatedServer, underAllocatedServer, opt, overAllocatedSet, underAllocatedSet,
           newLayoutMap, onlineServerShardCount, serverToShards, table);
+      moves++;
     }
+
+    if (overAllocatedSet.size() > 0) {
+      int count = (int) opt;
+      LOG.debug("There are still [{0}] over allocated servers for table [{1}]", overAllocatedSet.size(),
table);
+      while (!overAllocatedSet.isEmpty()) {
+        String overAllocatedServer = getFirst(overAllocatedSet);
+        String underAllocatedServer = findServerWithCount(onlineServerShardCount, count,
table, random);
+        LOG.debug("Over allocated server [{0}] under allocated server [{1}]", overAllocatedServer,
underAllocatedServer);
+        moveSingleShard(overAllocatedServer, underAllocatedServer, opt, overAllocatedSet,
underAllocatedSet,
+            newLayoutMap, onlineServerShardCount, serverToShards, table);
+        moves++;
+      }
+    }
+
+    if (underAllocatedSet.size() > 0) {
+      int count = (int) Math.ceil(opt);
+      LOG.info("There are still [{0}] under allocated servers for table [{1}]", underAllocatedSet.size(),
table);
+      while (!underAllocatedSet.isEmpty()) {
+        String overAllocatedServer = findServerWithCount(onlineServerShardCount, count, table,
random);
+        String underAllocatedServer = getFirst(underAllocatedSet);
+        LOG.debug("Over allocated server [{0}] under allocated server [{1}]", overAllocatedServer,
underAllocatedServer);
+        moveSingleShard(overAllocatedServer, underAllocatedServer, opt, overAllocatedSet,
underAllocatedSet,
+            newLayoutMap, onlineServerShardCount, serverToShards, table);
+        moves++;
+      }
+    }
+    return moves;
+  }
+
+  private static String findServerWithCount(Map<String, Integer> onlineServerShardCount,
int count, String table,
+      Random random) {
+    LOG.debug("Looking for server with shard count [{0}] for table [{1}]", count, table);
+    List<Entry<String, Integer>> list = new ArrayList<Entry<String, Integer>>(onlineServerShardCount.entrySet());
+    Collections.shuffle(list, random);
+    for (Entry<String, Integer> e : list) {
+      int serverCount = e.getValue();
+      if (serverCount == count) {
+        return e.getKey();
+      }
+    }
+    throw new RuntimeException("This should never happen");
   }
 
   private static boolean isNotInOptBalance(float opt, float count) {
@@ -115,24 +159,26 @@ public class MasterBasedLeveler {
       serverToShards.put(distServer, distShards);
     }
 
-    LOG.info("Source server shard list for table [{0}] is [{1}]", table, srcShards);
-    LOG.info("Destination server shard list for table [{0}] is [{1}]", table, distShards);
+    LOG.debug("Source server shard list for table [{0}] is [{1}]", table, srcShards);
+    LOG.debug("Destination server shard list for table [{0}] is [{1}]", table, distShards);
 
     String srcShard = getFirst(srcShards);
 
-    LOG.info("Moving shard [{0}] from [{1}] to [{2}] for table [{3}]", srcShard, srcServer,
distServer, table);
+    LOG.debug("Moving shard [{0}] from [{1}] to [{2}] for table [{3}]", srcShard, srcServer,
distServer, table);
 
     srcShards.remove(srcShard);
     distShards.add(srcShard);
 
     if (!isNotInOptBalance(opt, srcShards.size())) {
-      LOG.info("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", srcServer,
srcShards.size(), opt);
+      LOG.debug("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", srcServer,
srcShards.size(),
+          opt);
       overAllocatedServerSet.remove(srcServer);
       underAllocatedServerSet.remove(srcServer);
     }
 
     if (!isNotInOptBalance(opt, distShards.size())) {
-      LOG.info("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", distServer,
distShards.size(), opt);
+      LOG.debug("Source server [{0}] is in balance with size [{1}] optimum size [{2}]", distServer,
distShards.size(),
+          opt);
       overAllocatedServerSet.remove(distServer);
       underAllocatedServerSet.remove(distServer);
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/346a2011/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedLevelerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedLevelerTest.java
b/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedLevelerTest.java
new file mode 100644
index 0000000..712a218
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedLevelerTest.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.indexserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.blur.utils.BlurUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MasterBasedLevelerTest {
+
+  private static final String TABLE = "test";
+  private static final int MAX_SERVERS = 10000;
+  private static final int MAX_SHARDS = 100000;
+  private long _seed;
+  private Random _random;
+
+  @Before
+  public void setup() {
+    _random = new Random();
+    _seed = _random.nextLong();
+//    _seed = 9095818143550884754L;
+  }
+
+  @Test
+  public void testLeveler1() {
+    try {
+      Random random = new Random(_seed);
+
+      int totalShardServers = atLeastOne(random.nextInt(MAX_SERVERS));
+      int shards = atLeastOne(random.nextInt(MAX_SHARDS));
+
+      Map<String, String> newLayoutMap = new TreeMap<String, String>();
+      List<String> onlineServers = getOnlineServers(totalShardServers);
+      populateCurrentLayout(random, newLayoutMap, shards, onlineServers);
+
+      testLeveler(random, shards, newLayoutMap, onlineServers);
+
+      // now test adding a server. no more that ceil(opt) should move
+      onlineServers.add("newserver");
+      {
+        float opt = shards / (float) onlineServers.size();
+        int moves = testLeveler(random, shards, newLayoutMap, onlineServers);
+        assertTrue(moves <= Math.ceil(opt));
+      }
+
+      // now test removing a server. no more that ceil(opt) should move
+      int index = random.nextInt(onlineServers.size());
+      String serverToRemove = onlineServers.remove(index);
+      System.out.println("removing [" + serverToRemove + "]");
+      reassign(serverToRemove, newLayoutMap, onlineServers);
+      {
+        float opt = shards / (float) onlineServers.size();
+        int moves = testLeveler(random, shards, newLayoutMap, onlineServers);
+        assertTrue(moves <= Math.ceil(opt));
+      }
+
+    } catch (Throwable t) {
+      t.printStackTrace();
+      fail("Seed [" + _seed + "] exception");
+    }
+  }
+
+  private int atLeastOne(int i) {
+    if (i == 0) {
+      return 1;
+    }
+    return i;
+  }
+
+  private void reassign(String serverToRemove, Map<String, String> newLayoutMap, List<String>
onlineServers) {
+    System.out.println("reassign - starting");
+    Set<String> offlineShards = new HashSet<String>();
+    for (Entry<String, String> entry : newLayoutMap.entrySet()) {
+      if (entry.getValue().equals(serverToRemove)) {
+        offlineShards.add(entry.getKey());
+      }
+    }
+    Map<String, Integer> counts = populateCounts(newLayoutMap, onlineServers);
+    counts.remove(serverToRemove);
+    for (String offlineShard : offlineShards) {
+      int count = Integer.MAX_VALUE;
+      String server = null;
+      for (Entry<String, Integer> e : counts.entrySet()) {
+        if (e.getValue() < count) {
+          count = e.getValue();
+          server = e.getKey();
+        }
+      }
+      if (server == null) {
+        fail("Seed [" + _seed + "]");
+      }
+      newLayoutMap.put(offlineShard, server);
+      Integer serverCount = counts.get(server);
+      if (serverCount == null) {
+        counts.put(server, 1);
+      } else {
+        counts.put(server, count + 1);
+      }
+    }
+    System.out.println("reassign - ending");
+  }
+
+  private int testLeveler(Random random, int shards, Map<String, String> newLayoutMap,
List<String> onlineServers) {
+    int totalShardServers = onlineServers.size();
+    float opt = shards / (float) totalShardServers;
+
+    Map<String, Integer> beforeOnlineServerShardCount = populateCounts(newLayoutMap,
onlineServers);
+    int beforeLowCount = getLowCount(beforeOnlineServerShardCount);
+    int beforeHighCount = getHighCount(beforeOnlineServerShardCount);
+    System.out.println("Opt [" + opt + "] Before Low [" + beforeLowCount + "] High [" + beforeHighCount
+ "]");
+    long s = System.nanoTime();
+    int moves = MasterBasedLeveler.level(shards, totalShardServers, beforeOnlineServerShardCount,
newLayoutMap, TABLE,
+        random);
+    long e = System.nanoTime();
+
+    Map<String, Integer> afterOnlineServerShardCount = populateCounts(newLayoutMap,
onlineServers);
+    int afterLowCount = getLowCount(afterOnlineServerShardCount);
+    int afterHighCount = getHighCount(afterOnlineServerShardCount);
+    System.out.println("Opt [" + opt + "] After Low [" + afterLowCount + "] High [" + afterHighCount
+ "]");
+
+    System.out.println("Total servers [" + totalShardServers + "] Total Shards [" + shards
+ "] Total moves [" + moves
+        + "] in [" + (e - s) / 1000000.0 + " ms]");
+    if (afterLowCount == afterHighCount) {
+      assertEquals("Seed [" + _seed + "]", Math.round(opt), afterLowCount);
+    } else if (afterLowCount + 1 == afterHighCount) {
+      assertEquals("Seed [" + _seed + "]", (int) opt, afterLowCount);
+    } else {
+      fail("Seed [" + _seed + "]");
+    }
+    return moves;
+  }
+
+  @Test
+  public void testLeveler2() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler3() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler4() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler5() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler6() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler7() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler8() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler9() {
+    testLeveler1();
+  }
+
+  @Test
+  public void testLeveler10() {
+    testLeveler1();
+  }
+
+  public void testLevelerALot() {
+    for (int i = 0; i < 1000; i++) {
+      _seed = _random.nextLong();
+      testLeveler1();
+    }
+  }
+
+  private List<String> getOnlineServers(int totalShardServers) {
+    List<String> servers = new ArrayList<String>();
+    for (int i = 0; i < totalShardServers; i++) {
+      servers.add("server-" + i);
+    }
+    return servers;
+  }
+
+  private int getLowCount(Map<String, Integer> onlineServerShardCount) {
+    int lowCount = Integer.MAX_VALUE;
+    for (Entry<String, Integer> e : onlineServerShardCount.entrySet()) {
+      int count = e.getValue();
+      if (lowCount > count) {
+        lowCount = count;
+      }
+    }
+    return lowCount;
+  }
+
+  private int getHighCount(Map<String, Integer> onlineServerShardCount) {
+    int highCount = Integer.MIN_VALUE;
+    for (Entry<String, Integer> e : onlineServerShardCount.entrySet()) {
+      int count = e.getValue();
+      if (highCount < count) {
+        highCount = count;
+      }
+    }
+    return highCount;
+  }
+
+  private Map<String, Integer> populateCounts(Map<String, String> newLayoutMap,
List<String> servers) {
+    Map<String, Integer> onlineServerShardCount = new TreeMap<String, Integer>();
+    for (String server : servers) {
+      onlineServerShardCount.put(server, 0);
+    }
+    for (Entry<String, String> e : newLayoutMap.entrySet()) {
+      String value = e.getValue();
+      Integer count = onlineServerShardCount.get(value);
+      if (count == null) {
+        onlineServerShardCount.put(value, 1);
+      } else {
+        onlineServerShardCount.put(value, count + 1);
+      }
+    }
+    return onlineServerShardCount;
+  }
+
+  private void populateCurrentLayout(Random random, Map<String, String> newLayoutMap,
int shards, List<String> servers) {
+    for (int i = 0; i < shards; i++) {
+      String shardName = BlurUtil.getShardName(i);
+      int server = random.nextInt(servers.size());
+      newLayoutMap.put(shardName, servers.get(server));
+    }
+  }
+
+}


Mime
View raw message