accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1294288 - in /incubator/accumulo/branches/1.4/src/server/src: main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
Date Mon, 27 Feb 2012 19:21:06 GMT
Author: vines
Date: Mon Feb 27 19:21:05 2012
New Revision: 1294288

URL: http://svn.apache.org/viewvc?rev=1294288&view=rev
Log:
ACCUMULO-426 - ChaoticLoadBalancer - has some light semblance of balance, but moves things
with 0 regard to locality, tablets wanting to be stable, or any tablets which are already
in migration. I have a suspicion that this will kill performance so badly that ingest may
never happen with it, so we may need to up the rerun time from 5ms

Added:
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
  (with props)
    incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
  (with props)

Added: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java?rev=1294288&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
(added)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
Mon Feb 27 19:21:05 2012
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.thrift.TException;
+
+/**
+ * A chaotic load balancer used for testing. It constantly shuffles tablets, preventing them
from resting in a single location for very long. This is not
+ * designed for performance, do not use on production systems. I'm calling it the LokiLoadBalancer.
+ */
+public class ChaoticLoadBalancer extends TabletBalancer {
+  Random r = new Random();
+  
+  /* (non-Javadoc)
+   * @see org.apache.accumulo.server.master.balancer.TabletBalancer#getAssignments(java.util.SortedMap,
java.util.Map, java.util.Map)
+   */
+  @Override
+  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current,
Map<KeyExtent,TServerInstance> unassigned,
+      Map<KeyExtent,TServerInstance> assignments) {
+    long total = assignments.size() + unassigned.size();
+    long avg = (long) Math.ceil(((double) total) / current.size());
+    Map<TServerInstance,Long> toAssign = new HashMap<TServerInstance,Long>();
+    List<TServerInstance> tServerArray = new ArrayList<TServerInstance>();
+    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+      long numTablets = 0;
+      for (TableInfo ti : e.getValue().getTableMap().values()) {
+        numTablets += ti.tablets;
+      }
+      if (numTablets < avg) {
+        tServerArray.add(e.getKey());
+        toAssign.put(e.getKey(), avg - numTablets);
+      }
+    }
+
+    for (KeyExtent ke : unassigned.keySet())
+    {
+      int index = r.nextInt(tServerArray.size());
+      TServerInstance dest = tServerArray.get(index);
+      assignments.put(ke, dest);
+      long remaining = toAssign.get(dest).longValue() - 1;
+      if (remaining == 0) {
+        tServerArray.remove(index);
+        toAssign.remove(dest);
+      } else {
+        toAssign.put(dest, remaining);
+      }
+    }
+  }
+  
+  /**
+   * Will balance randomly, maintaining distribution
+   */
+  @Override
+  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent>
migrations, List<TabletMigration> migrationsOut) {
+    Map<TServerInstance,Long> numTablets = new HashMap<TServerInstance,Long>();
+    List<TServerInstance> underCapacityTServer = new ArrayList<TServerInstance>();
+
+    long totalTablets = 0;
+    for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
+      long tabletCount = 0;
+      for (TableInfo ti : e.getValue().getTableMap().values()) {
+        tabletCount += ti.tablets;
+      }
+      numTablets.put(e.getKey(), tabletCount);
+      underCapacityTServer.add(e.getKey());
+      totalTablets += tabletCount;
+    }
+    // totalTablets is fuzzy due to asynchronicity of the stats
+    // *1.2 to handle fuzziness, and prevent locking for 'perfect' balancing scenarios
+    long avg = (long) Math.ceil(((double) totalTablets) / current.size() * 1.2);
+    
+    for (Entry<TServerInstance, TabletServerStatus> e : current.entrySet())
+    {
+      for (String table : e.getValue().getTableMap().keySet())
+      {
+        try {
+          for (TabletStats ts : getOnlineTabletsForTable(e.getKey(), table)) {
+            
+            int index = r.nextInt(underCapacityTServer.size());
+            TServerInstance dest = underCapacityTServer.get(index);
+            if (dest.equals(e.getKey()))
+              continue;
+            migrationsOut.add(new TabletMigration(new KeyExtent(ts.extent), e.getKey(), dest));
+            if (numTablets.put(dest, numTablets.get(dest) + 1) > avg)
+              underCapacityTServer.remove(index);
+            if (numTablets.put(e.getKey(), numTablets.get(e.getKey()) - 1) <= avg &&
!underCapacityTServer.contains(e.getKey()))
+              underCapacityTServer.add(e.getKey());
+
+          }
+        } catch (ThriftSecurityException e1) {
+          // Shouldn't happen, but carry on if it does
+          e1.printStackTrace();
+        } catch (TException e1) {
+          // Shouldn't happen, but carry on if it does
+          e1.printStackTrace();
+        }
+      }
+    }
+    
+    // Yes, it can run every 5ms
+    return 5;
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java?rev=1294288&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
(added)
+++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
Mon Feb 27 19:21:05 2012
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletMigration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class ChaoticLoadBalancerTest {
+  
+  class FakeTServer {
+    List<KeyExtent> extents = new ArrayList<KeyExtent>();
+    
+    TabletServerStatus getStatus(TServerInstance server) {
+      TabletServerStatus result = new TabletServerStatus();
+      result.tableMap = new HashMap<String,TableInfo>();
+      for (KeyExtent extent : extents) {
+        String table = extent.getTableId().toString();
+        TableInfo info = result.tableMap.get(table);
+        if (info == null)
+          result.tableMap.put(table, info = new TableInfo());
+        info.onlineTablets++;
+        info.recs = info.onlineTablets;
+        info.ingestRate = 123.;
+        info.queryRate = 456.;
+      }
+      return result;
+    }
+  }
+  
+  Map<TServerInstance,FakeTServer> servers = new HashMap<TServerInstance,FakeTServer>();
+  
+  class TestChaoticLoadBalancer extends ChaoticLoadBalancer {
+    
+    @Override
+    public List<TabletStats> getOnlineTabletsForTable(TServerInstance tserver, String
table) throws ThriftSecurityException, TException {
+      List<TabletStats> result = new ArrayList<TabletStats>();
+      for (KeyExtent extent : servers.get(tserver).extents) {
+        if (extent.getTableId().toString().equals(table)) {
+          result.add(new TabletStats(extent.toThrift(), null, null, null, 0l, 0., 0., 0));
+        }
+      }
+      return result;
+    }
+  }
+  
+  @Test
+  public void testAssignMigrations() {
+    servers.clear();
+    servers.put(new TServerInstance(AddressUtil.parseAddress("127.0.0.1", 1234), "a"), new
FakeTServer());
+    servers.put(new TServerInstance(AddressUtil.parseAddress("127.0.0.1", 1235), "b"), new
FakeTServer());
+    servers.put(new TServerInstance(AddressUtil.parseAddress("127.0.0.1", 1236), "c"), new
FakeTServer());
+    Map<KeyExtent,TServerInstance> metadataTable = new TreeMap<KeyExtent,TServerInstance>();
+    String table = "t1";
+    metadataTable.put(makeExtent(table, null, null), null);
+    table = "t2";
+    metadataTable.put(makeExtent(table, "a", null), null);
+    metadataTable.put(makeExtent(table, null, "a"), null);
+    table = "t3";
+    metadataTable.put(makeExtent(table, "a", null), null);
+    metadataTable.put(makeExtent(table, "b", "a"), null);
+    metadataTable.put(makeExtent(table, "c", "b"), null);
+    metadataTable.put(makeExtent(table, "d", "c"), null);
+    metadataTable.put(makeExtent(table, "e", "d"), null);
+    metadataTable.put(makeExtent(table, null, "e"), null);
+    
+    TestChaoticLoadBalancer balancer = new TestChaoticLoadBalancer();
+    
+    SortedMap<TServerInstance,TabletServerStatus> current = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      current.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    
+    Map<KeyExtent, TServerInstance> assignments = new HashMap<KeyExtent, TServerInstance>();
+    balancer.getAssignments(getAssignments(servers), metadataTable, assignments);
+    
+    assertEquals(assignments.size(), metadataTable.size());
+  }
+  
+  SortedMap<TServerInstance,TabletServerStatus> getAssignments(Map<TServerInstance,FakeTServer>
servers) {
+    SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<TServerInstance,TabletServerStatus>();
+    for (Entry<TServerInstance,FakeTServer> entry : servers.entrySet()) {
+      result.put(entry.getKey(), entry.getValue().getStatus(entry.getKey()));
+    }
+    return result;
+  }
+  
+  @Test
+  public void testUnevenAssignment() {
+    servers.clear();
+    for (char c : "abcdefghijklmnopqrstuvwxyz".toCharArray()) {
+      String cString = Character.toString(c);
+      InetSocketAddress fakeAddress = AddressUtil.parseAddress("127.0.0.1", (int) c);
+      String fakeInstance = cString;
+      TServerInstance tsi = new TServerInstance(fakeAddress, fakeInstance);
+      FakeTServer fakeTServer = new FakeTServer();
+      servers.put(tsi, fakeTServer);
+      fakeTServer.extents.add(makeExtent(cString, null, null));
+    }
+    // Put more tablets on one server, but not more than the number of servers
+    Entry<TServerInstance,FakeTServer> first = servers.entrySet().iterator().next();
+    first.getValue().extents.add(makeExtent("newTable", "a", null));
+    first.getValue().extents.add(makeExtent("newTable", "b", "a"));
+    first.getValue().extents.add(makeExtent("newTable", "c", "b"));
+    first.getValue().extents.add(makeExtent("newTable", "d", "c"));
+    first.getValue().extents.add(makeExtent("newTable", "e", "d"));
+    first.getValue().extents.add(makeExtent("newTable", "f", "e"));
+    first.getValue().extents.add(makeExtent("newTable", "g", "f"));
+    first.getValue().extents.add(makeExtent("newTable", "h", "g"));
+    first.getValue().extents.add(makeExtent("newTable", "i", null));
+    TestChaoticLoadBalancer balancer = new TestChaoticLoadBalancer();
+    Set<KeyExtent> migrations = Collections.emptySet();
+    
+    // Just want to make sure it gets some migrations, randomness prevents guarantee of a
defined amount, or even expected amount
+    List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
+    while (migrationsOut.size() != 0) {
+      balancer.balance(getAssignments(servers), migrations, migrationsOut);
+    }
+  }
+  
+  private static KeyExtent makeExtent(String table, String end, String prev) {
+    return new KeyExtent(new Text(table), toText(end), toText(prev));
+  }
+  
+  private static Text toText(String value) {
+    if (value != null)
+      return new Text(value);
+    return null;
+  }
+  
+}

Propchange: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/balancer/ChaoticLoadBalancerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message