incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cr...@apache.org
Subject [08/27] git commit: Adding in new layout code, this should help to reduce shard movement during new server setup and server failure.
Date Wed, 16 Oct 2013 03:32:51 GMT
Adding in new layout code, this should help to reduce shard movement during new server setup
and server failure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/b824561e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/b824561e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/b824561e

Branch: refs/heads/blur-console-v2
Commit: b824561e67fa5a409d092e9618315d9161f6e5bf
Parents: 3af3287
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Wed Oct 9 09:37:51 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Wed Oct 9 09:39:02 2013 -0400

----------------------------------------------------------------------
 .../MasterBasedDistributedLayoutFactory.java    |  19 ++-
 .../manager/indexserver/MasterBasedLeveler.java | 158 +++++++++++++++++++
 ...MasterBasedDistributedLayoutFactoryTest.java |  94 ++++++++---
 3 files changed, 241 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b824561e/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
index a7dbf39..b5fc5ab 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactory.java
@@ -23,14 +23,13 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -129,11 +128,11 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
 
   private Map<String, String> calculateNewLayout(String table, MasterBasedDistributedLayout
existingLayout,
       List<String> shardList, List<String> shardServerList) {
-    Set<String> shardServerSet = new HashSet<String>(shardServerList);
+    Set<String> shardServerSet = new TreeSet<String>(shardServerList);
     if (existingLayout == null) {
       // blind setup, basic round robin
       LOG.info("Blind shard layout.");
-      Map<String, String> newLayoutMap = new HashMap<String, String>();
+      Map<String, String> newLayoutMap = new TreeMap<String, String>();
       Iterator<String> iterator = shardServerList.iterator();
       for (String shard : shardList) {
         if (!iterator.hasNext()) {
@@ -145,9 +144,9 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
       return newLayoutMap;
     } else {
       LOG.info("Gather counts for table [{0}]", table);
+      Collection<String> shardsThatAreOffline = new TreeSet<String>();
+      Map<String, Integer> onlineServerShardCount = new TreeMap<String, Integer>();
       Map<String, String> existingLayoutMap = existingLayout.getLayout();
-      Collection<String> shardsThatAreOffline = new HashSet<String>();
-      Map<String, Integer> onlineServerShardCount = new HashMap<String, Integer>();
       for (Entry<String, String> e : existingLayoutMap.entrySet()) {
         String shard = e.getKey();
         String server = e.getValue();
@@ -169,7 +168,7 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
 
       LOG.info("Assigning any missing shards [{1}] for table [{0}]", table, shardsThatAreOffline);
       // Assign missing shards
-      Map<String, String> newLayoutMap = new HashMap<String, String>(existingLayoutMap);
+      Map<String, String> newLayoutMap = new TreeMap<String, String>(existingLayoutMap);
       for (String offlineShard : shardsThatAreOffline) {
         // Find lowest shard count.
         String server = getServerWithTheLowest(onlineServerShardCount);
@@ -177,10 +176,10 @@ public class MasterBasedDistributedLayoutFactory implements DistributedLayoutFac
         newLayoutMap.put(offlineShard, server);
         increment(onlineServerShardCount, server);
       }
-      // Level shards
-      // code here
-
 
+      LOG.info("Leveling any shard hotspots for table [{0}]", table);
+      // Level shards
+      MasterBasedLeveler.level(shardList.size(), shardServerList.size(), onlineServerShardCount,
newLayoutMap);
       return newLayoutMap;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b824561e/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
new file mode 100644
index 0000000..2bf7493
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/manager/indexserver/MasterBasedLeveler.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.manager.indexserver;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+public class MasterBasedLeveler {
+
+  private static final Log LOG = LogFactory.getLog(MasterBasedLeveler.class);
+
+  public static void level(int totalShards, int totalShardServers, Map<String, Integer>
onlineServerShardCount,
+      Map<String, String> newLayoutMap) {
+    List<Entry<String, Integer>> onlineServerShardCountList = new ArrayList<Map.Entry<String,
Integer>>(
+        onlineServerShardCount.entrySet());
+    Collections.sort(onlineServerShardCountList, new Comparator<Entry<String, Integer>>()
{
+      @Override
+      public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2)
{
+        int value1 = o1.getValue();
+        int value2 = o2.getValue();
+        if (value1 == value2) {
+          return 0;
+        }
+        return value1 < value2 ? -1 : 1;
+      }
+    });
+
+    float opt = totalShards / (float) totalShardServers;
+
+    Set<String> overAllocatedSet = new HashSet<String>();
+    Set<String> underAllocatedSet = new HashSet<String>();
+    LOG.info("Optimum server shard count [{0}]", opt);
+    for (Entry<String, Integer> e : onlineServerShardCountList) {
+      int countInt = e.getValue();
+      float count = countInt;
+      String server = e.getKey();
+      if (isNotInOptBalance(opt, count)) {
+        if (count > opt) {
+          LOG.info("Level server [{0}] over allocated at [{1}]", server, e.getValue());
+          overAllocatedSet.add(server);
+        } else {
+          LOG.info("Level server [{0}] under allocated at [{1}]", server, e.getValue());
+          underAllocatedSet.add(server);
+        }
+      }
+    }
+
+    Map<String, SortedSet<String>> serverToShards = new HashMap<String, SortedSet<String>>();
+    for (Entry<String, String> e : newLayoutMap.entrySet()) {
+      String server = e.getValue();
+      SortedSet<String> shards = serverToShards.get(server);
+      if (shards == null) {
+        shards = new TreeSet<String>();
+        serverToShards.put(server, shards);
+      }
+      String shard = e.getKey();
+      shards.add(shard);
+    }
+
+    while (!underAllocatedSet.isEmpty() && !overAllocatedSet.isEmpty()) {
+      String overAllocatedServer = getFirst(overAllocatedSet);
+      String underAllocatedServer = getFirst(underAllocatedSet);
+      moveSingleShard(overAllocatedServer, underAllocatedServer, opt, overAllocatedSet, underAllocatedSet,
+          newLayoutMap, onlineServerShardCount, serverToShards);
+    }
+  }
+
+  private static boolean isNotInOptBalance(float opt, float count) {
+    return Math.abs(count - opt) >= 1.0f;
+  }
+
+  private static String getFirst(Set<String> set) {
+    return set.iterator().next();
+  }
+
+  private static void moveSingleShard(String srcServer, String distServer, float opt,
+      Set<String> overAllocatedServerSet, Set<String> underAllocatedServerSet,
Map<String, String> newLayoutMap,
+      Map<String, Integer> onlineServerShardCount, Map<String, SortedSet<String>>
serverToShards) {
+
+    SortedSet<String> srcShards = serverToShards.get(srcServer);
+    if (srcShards == null) {
+      srcShards = new TreeSet<String>();
+      serverToShards.put(srcServer, srcShards);
+    }
+    SortedSet<String> distShards = serverToShards.get(distServer);
+    if (distShards == null) {
+      distShards = new TreeSet<String>();
+      serverToShards.put(distServer, distShards);
+    }
+
+    String srcShard = getFirst(srcShards);
+
+    srcShards.remove(srcShard);
+    distShards.add(srcShard);
+
+    if (!isNotInOptBalance(opt, srcShards.size())) {
+      overAllocatedServerSet.remove(srcServer);
+      underAllocatedServerSet.remove(srcServer);
+    }
+
+    if (!isNotInOptBalance(opt, distShards.size())) {
+      overAllocatedServerSet.remove(distServer);
+      underAllocatedServerSet.remove(distServer);
+    }
+
+    newLayoutMap.put(srcShard, distServer);
+    decr(onlineServerShardCount, srcServer);
+    incr(onlineServerShardCount, distServer);
+  }
+
+  private static void incr(Map<String, Integer> map, String key) {
+    Integer i = map.get(key);
+    if (i == null) {
+      map.put(key, 1);
+    } else {
+      map.put(key, i + 1);
+    }
+  }
+
+  private static void decr(Map<String, Integer> map, String key) {
+    Integer i = map.get(key);
+    if (i == null) {
+      map.put(key, 0);
+    } else {
+      int value = i - 1;
+      if (value < 0) {
+        value = 0;  
+      }
+      map.put(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/b824561e/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactoryTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactoryTest.java
b/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactoryTest.java
index 92fb849..65bea5b 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactoryTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/indexserver/MasterBasedDistributedLayoutFactoryTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.blur.manager.indexserver;
 
+import static org.junit.Assert.*;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -23,22 +26,41 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 
+import org.apache.blur.MiniCluster;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+@SuppressWarnings("unchecked")
 public class MasterBasedDistributedLayoutFactoryTest {
 
+  private static String path = "./target/test-zk-MasterBasedDistributedLayoutFactoryTest";
+  private static MiniCluster miniCluster;
+
   private ZooKeeper _zooKeeper;
-  private String storagePath = "/proto_layout";
+  private String storagePath = "/MasterBasedDistributedLayoutFactoryTest";
+
+  @BeforeClass
+  public static void startZooKeeper() throws IOException {
+    new File(path).mkdirs();
+    miniCluster = new MiniCluster();
+    miniCluster.startZooKeeper(path, true);
+  }
+
+  @AfterClass
+  public static void stopZooKeeper() throws InterruptedException {
+    miniCluster.shutdownZooKeeper();
+  }
 
   @Before
   public void setup() throws IOException, KeeperException, InterruptedException {
-    _zooKeeper = new ZooKeeper("127.0.0.1", 30000, new Watcher() {
+    _zooKeeper = new ZooKeeper(miniCluster.getZkConnectionString(), 20000, new Watcher()
{
       @Override
       public void process(WatchedEvent event) {
 
@@ -58,22 +80,23 @@ public class MasterBasedDistributedLayoutFactoryTest {
     String table = "t1";
 
     DistributedLayout layout1 = factory.createDistributedLayout(table, shardList, shardServerList,
offlineShardServers);
+    Map<String, String> expected1 = map(e("shard-0", "server-0"), e("shard-1", "server-1"),
e("shard-2", "server-2"),
+        e("shard-3", "server-3"), e("shard-4", "server-4"), e("shard-5", "server-5"));
 
-    Map<String, String> map1 = new TreeMap<String, String>(layout1.getLayout());
-    for (Entry<String, String> e : map1.entrySet()) {
-      System.out.println(e.getKey() + " " + e.getValue());
-    }
+    Map<String, String> actual1 = new TreeMap<String, String>(layout1.getLayout());
+
+    assertEquals(expected1, actual1);
 
     List<String> newShardServerList = list("server-0", "server-1", "server-2", "server-3");
     List<String> newOfflineShardServers = list("server-4", "server-5");
 
     DistributedLayout layout2 = factory.createDistributedLayout(table, shardList, newShardServerList,
         newOfflineShardServers);
-    System.out.println("================");
-    Map<String, String> map2 = new TreeMap<String, String>(layout2.getLayout());
-    for (Entry<String, String> e : map2.entrySet()) {
-      System.out.println(e.getKey() + " " + e.getValue());
-    }
+
+    Map<String, String> expected2 = map(e("shard-0", "server-0"), e("shard-1", "server-1"),
e("shard-2", "server-2"),
+        e("shard-3", "server-3"), e("shard-4", "server-0"), e("shard-5", "server-1"));
+    Map<String, String> actual2 = new TreeMap<String, String>(layout2.getLayout());
+    assertEquals(expected2, actual2);
   }
 
   @Test
@@ -87,22 +110,24 @@ public class MasterBasedDistributedLayoutFactoryTest {
     String table = "t1";
 
     DistributedLayout layout1 = factory.createDistributedLayout(table, shardList, shardServerList,
offlineShardServers);
+    Map<String, String> expected1 = map(e("shard-0", "server-0"), e("shard-1", "server-1"),
e("shard-2", "server-2"),
+        e("shard-3", "server-3"), e("shard-4", "server-0"), e("shard-5", "server-1"));
 
-    Map<String, String> map1 = new TreeMap<String, String>(layout1.getLayout());
-    for (Entry<String, String> e : map1.entrySet()) {
-      System.out.println(e.getKey() + " " + e.getValue());
-    }
+    Map<String, String> actual1 = new TreeMap<String, String>(layout1.getLayout());
+
+    assertEquals(expected1, actual1);
 
     List<String> newShardServerList = list("server-0", "server-1", "server-2", "server-3",
"server-4", "server-5");
     List<String> newOfflineShardServers = list();
 
     DistributedLayout layout2 = factory.createDistributedLayout(table, shardList, newShardServerList,
         newOfflineShardServers);
-    System.out.println("================");
-    Map<String, String> map2 = new TreeMap<String, String>(layout2.getLayout());
-    for (Entry<String, String> e : map2.entrySet()) {
-      System.out.println(e.getKey() + " " + e.getValue());
-    }
+
+    Map<String, String> expected2 = map(e("shard-0", "server-4"), e("shard-1", "server-5"),
e("shard-2", "server-2"),
+        e("shard-3", "server-3"), e("shard-4", "server-0"), e("shard-5", "server-1"));
+
+    Map<String, String> actual2 = new TreeMap<String, String>(layout2.getLayout());
+    assertEquals(expected2, actual2);
   }
 
   private void rmr(ZooKeeper zooKeeper, String storagePath) throws KeeperException, InterruptedException
{
@@ -124,4 +149,33 @@ public class MasterBasedDistributedLayoutFactoryTest {
     }
     return lst;
   }
+
+  private static Map<String, String> map(Entry<String, String>... entries) {
+    Map<String, String> map = new TreeMap<String, String>();
+    for (Entry<String, String> e : entries) {
+      map.put(e.getKey(), e.getValue());
+    }
+    return map;
+  }
+
+  private static Entry<String, String> e(final String key, final String value) {
+    return new Entry<String, String>() {
+
+      @Override
+      public String getKey() {
+        return key;
+      }
+
+      @Override
+      public String getValue() {
+        return value;
+      }
+
+      @Override
+      public String setValue(String value) {
+        throw new RuntimeException("Not Supported");
+      }
+
+    };
+  }
 }


Mime
View raw message