hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [10/14] hbase git commit: HBASE-15631 Backport Regionserver Groups (HBASE-6721) to branch-1 (Francis Liu and Andrew Purtell)
Date Tue, 24 Oct 2017 01:03:01 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
new file mode 100644
index 0000000..eec03ce
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -0,0 +1,795 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.ServiceException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.constraint.ConstraintException;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerListener;
+import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This is an implementation of {@link RSGroupInfoManager}. Which makes
+ * use of an HBase table as the persistence store for the group information.
+ * It also makes use of zookeeper to store group information needed
+ * for bootstrapping during offline mode.
+ */
+public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener {
+  private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class);
+
+  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
+  private final static HTableDescriptor RSGROUP_TABLE_DESC;
+  static {
+    RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
+    RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
+    RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
+    try {
+      RSGROUP_TABLE_DESC.addCoprocessor(
+        MultiRowMutationEndpoint.class.getName(),
+          null, Coprocessor.PRIORITY_SYSTEM, null);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private volatile Map<String, RSGroupInfo> rsGroupMap;
+  private volatile Map<TableName, String> tableMap;
+  private MasterServices master;
+  private ClusterConnection conn;
+  private ZooKeeperWatcher watcher;
+  private RSGroupStartupWorker rsGroupStartupWorker;
+  // contains list of groups that were last flushed to persistent store
+  private volatile Set<String> prevRSGroups;
+  private RSGroupSerDe rsGroupSerDe;
+  private DefaultServerUpdater defaultServerUpdater;
+  private boolean isInit = false;
+
+  public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
+    this.rsGroupMap = Collections.emptyMap();
+    this.tableMap = Collections.emptyMap();
+    rsGroupSerDe = new RSGroupSerDe();
+    this.master = master;
+    this.watcher = master.getZooKeeper();
+    this.conn = master.getConnection();
+    prevRSGroups = new HashSet<String>();
+  }
+
+  public void init() throws IOException{
+    rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn);
+    refresh();
+    rsGroupStartupWorker.start();
+    defaultServerUpdater = new DefaultServerUpdater(this);
+    master.getServerManager().registerListener(this);
+    defaultServerUpdater.start();
+    isInit = true;
+  }
+
+  boolean isInit() {
+    return isInit;
+  }
+
+  /**
+   * Adds the group.
+   *
+   * @param rsGroupInfo the group name
+   */
+  @Override
+  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
+    checkGroupName(rsGroupInfo.getName());
+    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
+        rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+      throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
+    }
+    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
+                                          String dstGroup) throws IOException {
+    if (servers == null) {
+      throw new ConstraintException("The list of servers to move cannot be null.");
+    }
+    Set<Address> movedServers = Sets.newHashSet();
+    if (!rsGroupMap.containsKey(srcGroup)) {
+      throw new DoNotRetryIOException("Group "+srcGroup+" does not exist");
+    }
+    if (!rsGroupMap.containsKey(dstGroup)) {
+      throw new DoNotRetryIOException("Group "+dstGroup+" does not exist");
+    }
+
+    RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup));
+    RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup));
+    for(Address el: servers) {
+      if (src.removeServer(el)) {
+        movedServers.add(el);
+      }
+      dst.addServer(el);
+    }
+
+    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    newGroupMap.put(src.getName(), src);
+    newGroupMap.put(dst.getName(), dst);
+
+    flushConfig(newGroupMap);
+    return movedServers;
+  }
+
+  /**
+   * Gets the group info of server.
+   *
+   * @param server the server
+   * @return An instance of GroupInfo.
+   */
+  @Override
+  public RSGroupInfo getRSGroupOfServer(Address server) throws IOException {
+    for (RSGroupInfo info : rsGroupMap.values()) {
+      if (info.containsServer(server)){
+        return info;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets the group information.
+   *
+   * @param groupName
+   *          the group name
+   * @return An instance of GroupInfo
+   */
+  @Override
+  public RSGroupInfo getRSGroup(String groupName) throws IOException {
+    RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName);
+    return RSGroupInfo;
+  }
+
+
+
+  @Override
+  public String getRSGroupOfTable(TableName tableName) throws IOException {
+    return tableMap.get(tableName);
+  }
+
+  @Override
+  public synchronized void moveTables(
+      Set<TableName> tableNames, String groupName) throws IOException {
+    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
+      throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group");
+    }
+
+    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    for(TableName tableName: tableNames) {
+      if (tableMap.containsKey(tableName)) {
+        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
+        src.removeTable(tableName);
+        newGroupMap.put(src.getName(), src);
+      }
+      if(groupName != null) {
+        RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
+        dst.addTable(tableName);
+        newGroupMap.put(dst.getName(), dst);
+      }
+    }
+
+    flushConfig(newGroupMap);
+  }
+
+
+  /**
+   * Delete a region server group.
+   *
+   * @param groupName the group name
+   * @throws java.io.IOException Signals that an I/O exception has occurred.
+   */
+  @Override
+  public synchronized void removeRSGroup(String groupName) throws IOException {
+    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
+      throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group");
+    }
+    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    newGroupMap.remove(groupName);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public List<RSGroupInfo> listRSGroups() throws IOException {
+    List<RSGroupInfo> list = Lists.newLinkedList(rsGroupMap.values());
+    return list;
+  }
+
+  @Override
+  public boolean isOnline() {
+    return rsGroupStartupWorker.isOnline();
+  }
+
+  @Override
+  public synchronized void refresh() throws IOException {
+    refresh(false);
+  }
+
+  private synchronized void refresh(boolean forceOnline) throws IOException {
+    List<RSGroupInfo> groupList = new LinkedList<RSGroupInfo>();
+
+    // overwrite anything read from zk, group table is source of truth
+    // if online read from GROUP table
+    if (forceOnline || isOnline()) {
+      LOG.debug("Refreshing in Online mode.");
+      try (Table rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME)) {
+        groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable));
+      }
+    } else {
+      LOG.debug("Refershing in Offline mode.");
+      String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
+      groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath));
+    }
+
+    // refresh default group, prune
+    NavigableSet<TableName> orphanTables = new TreeSet<TableName>();
+    for(String entry: master.getTableDescriptors().getAll().keySet()) {
+      orphanTables.add(TableName.valueOf(entry));
+    }
+
+    List<TableName> specialTables;
+    if(!master.isInitialized()) {
+      specialTables = new ArrayList<TableName>();
+      specialTables.add(AccessControlLists.ACL_TABLE_NAME);
+      specialTables.add(TableName.META_TABLE_NAME);
+      specialTables.add(TableName.NAMESPACE_TABLE_NAME);
+      specialTables.add(RSGROUP_TABLE_NAME);
+    } else {
+      specialTables =
+          master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR);
+    }
+
+    for(TableName table : specialTables) {
+      orphanTables.add(table);
+    }
+    for(RSGroupInfo group: groupList) {
+      if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
+        orphanTables.removeAll(group.getTables());
+      }
+    }
+
+    // This is added to the last of the list
+    // so it overwrites the default group loaded
+    // from region group table or zk
+    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP,
+        Sets.newHashSet(getDefaultServers()),
+        orphanTables));
+
+
+    // populate the data
+    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
+    HashMap<TableName, String> newTableMap = Maps.newHashMap();
+    for (RSGroupInfo group : groupList) {
+      newGroupMap.put(group.getName(), group);
+      for(TableName table: group.getTables()) {
+        newTableMap.put(table, group.getName());
+      }
+    }
+    rsGroupMap = Collections.unmodifiableMap(newGroupMap);
+    tableMap = Collections.unmodifiableMap(newTableMap);
+
+    prevRSGroups.clear();
+    prevRSGroups.addAll(rsGroupMap.keySet());
+  }
+
+  private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> newGroupMap)
+      throws IOException {
+    Map<TableName,String> newTableMap = Maps.newHashMap();
+    List<Mutation> mutations = Lists.newArrayList();
+
+    // populate deletes
+    for(String groupName : prevRSGroups) {
+      if(!newGroupMap.containsKey(groupName)) {
+        Delete d = new Delete(Bytes.toBytes(groupName));
+        mutations.add(d);
+      }
+    }
+
+    // populate puts
+    for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
+      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
+      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
+      p.addColumn(META_FAMILY_BYTES,
+          META_QUALIFIER_BYTES,
+          proto.toByteArray());
+      mutations.add(p);
+      for(TableName entry: RSGroupInfo.getTables()) {
+        newTableMap.put(entry, RSGroupInfo.getName());
+      }
+    }
+
+    if(mutations.size() > 0) {
+      multiMutate(mutations);
+    }
+    return newTableMap;
+  }
+
+  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
+    Map<TableName, String> newTableMap;
+
+    // For offline mode persistence is still unavailable
+    // We're refreshing in-memory state but only for default servers
+    if (!isOnline()) {
+      Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
+      RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
+      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
+      if (!m.equals(newGroupMap) ||
+          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
+        throw new IOException("Only default servers can be updated during offline mode");
+      }
+      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
+      rsGroupMap = newGroupMap;
+      return;
+    }
+
+    newTableMap = flushConfigTable(newGroupMap);
+
+    // make changes visible since it has been
+    // persisted in the source of truth
+    rsGroupMap = Collections.unmodifiableMap(newGroupMap);
+    tableMap = Collections.unmodifiableMap(newTableMap);
+
+
+    try {
+      String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode);
+      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC);
+
+      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<ZKUtil.ZKUtilOp>(newGroupMap.size());
+      for(String groupName : prevRSGroups) {
+        if(!newGroupMap.containsKey(groupName)) {
+          String znode = ZKUtil.joinZNode(groupBasePath, groupName);
+          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+        }
+      }
+
+
+      for(RSGroupInfo RSGroupInfo : newGroupMap.values()) {
+        String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName());
+        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
+        LOG.debug("Updating znode: "+znode);
+        ZKUtil.createAndFailSilent(watcher, znode);
+        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
+        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
+            ProtobufUtil.prependPBMagic(proto.toByteArray())));
+      }
+      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
+
+      ZKUtil.multiOrSequential(watcher, zkOps, false);
+    } catch (KeeperException e) {
+      LOG.error("Failed to write to rsGroupZNode", e);
+      master.abort("Failed to write to rsGroupZNode", e);
+      throw new IOException("Failed to write to rsGroupZNode",e);
+    }
+
+    prevRSGroups.clear();
+    prevRSGroups.addAll(newGroupMap.keySet());
+  }
+
+  private List<ServerName> getOnlineRS() throws IOException {
+    if (master != null) {
+      return master.getServerManager().getOnlineServersList();
+    }
+    try {
+      LOG.debug("Reading online RS from zookeeper");
+      List<ServerName> servers = new LinkedList<ServerName>();
+      for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) {
+        servers.add(ServerName.parseServerName(el));
+      }
+      return servers;
+    } catch (KeeperException e) {
+      throw new IOException("Failed to retrieve server list from zookeeper", e);
+    }
+  }
+
+  private List<Address> getDefaultServers() throws IOException {
+    List<Address> defaultServers = new LinkedList<Address>();
+    for(ServerName server : getOnlineRS()) {
+      Address address = Address.fromParts(server.getHostname(), server.getPort());
+      boolean found = false;
+      for(RSGroupInfo info : rsGroupMap.values()) {
+        if(!RSGroupInfo.DEFAULT_GROUP.equals(info.getName()) &&
+            info.containsServer(address)) {
+          found = true;
+          break;
+        }
+      }
+      if(!found) {
+        defaultServers.add(address);
+      }
+    }
+    return defaultServers;
+  }
+
+  private synchronized void updateDefaultServers(
+      Set<Address> server) throws IOException {
+    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
+    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), server, info.getTables());
+    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    newGroupMap.put(newInfo.getName(), newInfo);
+    flushConfig(newGroupMap);
+  }
+
+  @Override
+  public void serverAdded(ServerName serverName) {
+    defaultServerUpdater.serverChanged();
+  }
+
+  @Override
+  public void serverRemoved(ServerName serverName) {
+    defaultServerUpdater.serverChanged();
+  }
+
+  private static class DefaultServerUpdater extends Thread {
+    private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
+    private RSGroupInfoManagerImpl mgr;
+    private boolean hasChanged = false;
+
+    public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
+      this.mgr = mgr;
+    }
+
+    @Override
+    public void run() {
+      List<Address> prevDefaultServers = new LinkedList<Address>();
+      while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+        try {
+          LOG.info("Updating default servers.");
+          List<Address> servers = mgr.getDefaultServers();
+          Collections.sort(servers, new Comparator<Address>() {
+            @Override
+            public int compare(Address o1, Address o2) {
+              int diff = o1.getHostname().compareTo(o2.getHostname());
+              if (diff != 0) {
+                return diff;
+              }
+              return o1.getPort() - o2.getPort();
+            }
+          });
+          if(!servers.equals(prevDefaultServers)) {
+            mgr.updateDefaultServers(Sets.<Address>newHashSet(servers));
+            prevDefaultServers = servers;
+            LOG.info("Updated with servers: "+servers.size());
+          }
+          try {
+            synchronized (this) {
+              if(!hasChanged) {
+                wait();
+              }
+              hasChanged = false;
+            }
+          } catch (InterruptedException e) {
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to update default servers", e);
+        }
+      }
+    }
+
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
+  @Override
+  public void waiting() {
+
+  }
+
+  private static class RSGroupStartupWorker extends Thread {
+    private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
+
+    private volatile boolean isOnline = false;
+    private MasterServices masterServices;
+    private RSGroupInfoManagerImpl groupInfoManager;
+    private ClusterConnection conn;
+
+    public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager,
+                                MasterServices masterServices,
+                                ClusterConnection conn) {
+      this.masterServices = masterServices;
+      this.groupInfoManager = groupInfoManager;
+      this.conn = conn;
+      setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName());
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      if(waitForGroupTableOnline()) {
+        LOG.info("GroupBasedLoadBalancer is now online");
+      }
+    }
+
+    public boolean waitForGroupTableOnline() {
+      final List<HRegionInfo> foundRegions = new LinkedList<HRegionInfo>();
+      final List<HRegionInfo> assignedRegions = new LinkedList<HRegionInfo>();
+      final AtomicBoolean found = new AtomicBoolean(false);
+      final TableStateManager tsm =
+          masterServices.getAssignmentManager().getTableStateManager();
+      boolean createSent = false;
+      while (!found.get() && isMasterRunning()) {
+        foundRegions.clear();
+        assignedRegions.clear();
+        found.set(true);
+        try {
+          boolean rootMetaFound =
+              masterServices.getMetaTableLocator().verifyMetaRegionLocation(
+                  conn,
+                  masterServices.getZooKeeper(),
+                  1);
+          final AtomicBoolean nsFound = new AtomicBoolean(false);
+          if (rootMetaFound) {
+
+            MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
+              @Override
+              public boolean visit(Result row) throws IOException {
+
+                HRegionInfo info = MetaTableAccessor.getHRegionInfo(row);
+                if (info != null) {
+                  Cell serverCell =
+                      row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+                          HConstants.SERVER_QUALIFIER);
+                  if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
+                    ServerName sn =
+                        ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
+                    if (sn == null) {
+                      found.set(false);
+                    } else if (tsm.isTableState(RSGROUP_TABLE_NAME,
+                        ZooKeeperProtos.Table.State.ENABLED)) {
+                      try {
+                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+                        ClientProtos.GetRequest request =
+                            RequestConverter.buildGetRequest(info.getRegionName(),
+                                new Get(ROW_KEY));
+                        rs.get(null, request);
+                        assignedRegions.add(info);
+                      } catch(Exception ex) {
+                        LOG.debug("Caught exception while verifying group region", ex);
+                      }
+                    }
+                    foundRegions.add(info);
+                  }
+                  if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
+                    Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
+                        HConstants.SERVER_QUALIFIER);
+                    ServerName sn = null;
+                    if(cell != null) {
+                      sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
+                    }
+                    if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
+                        ZooKeeperProtos.Table.State.ENABLED)) {
+                      try {
+                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
+                        ClientProtos.GetRequest request =
+                            RequestConverter.buildGetRequest(info.getRegionName(),
+                                new Get(ROW_KEY));
+                        rs.get(null, request);
+                        nsFound.set(true);
+                      } catch(Exception ex) {
+                        LOG.debug("Caught exception while verifying group region", ex);
+                      }
+                    }
+                  }
+                }
+                return true;
+              }
+            };
+            MetaTableAccessor.fullScan(conn, visitor);
+            // if no regions in meta then we have to create the table
+            if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
+              groupInfoManager.createGroupTable(masterServices);
+              createSent = true;
+            }
+            LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get()
+                + ", regionCount: " + foundRegions.size() + ", assignCount: "
+                + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound);
+            found.set(found.get() && assignedRegions.size() == foundRegions.size()
+                && foundRegions.size() > 0);
+          } else {
+            LOG.info("Waiting for catalog tables to come online");
+            found.set(false);
+          }
+          if (found.get()) {
+            LOG.debug("With group table online, refreshing cached information.");
+            groupInfoManager.refresh(true);
+            isOnline = true;
+            //flush any inconsistencies between ZK and HTable
+            groupInfoManager.flushConfig(groupInfoManager.rsGroupMap);
+          }
+        } catch (RuntimeException e) {
+          throw e;
+        } catch(Exception e) {
+          found.set(false);
+          LOG.warn("Failed to perform check", e);
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          LOG.info("Sleep interrupted", e);
+        }
+      }
+      return found.get();
+    }
+
+    public boolean isOnline() {
+      return isOnline;
+    }
+
+    private boolean isMasterRunning() {
+      return !masterServices.isAborted() && !masterServices.isStopped();
+    }
+  }
+
+  private void createGroupTable(MasterServices masterServices) throws IOException {
+    HRegionInfo[] newRegions =
+        ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null);
+    ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
+    masterServices.getMasterProcedureExecutor().submitProcedure(
+        new CreateTableProcedure(
+            masterServices.getMasterProcedureExecutor().getEnvironment(),
+            RSGROUP_TABLE_DESC,
+            newRegions,
+            latch));
+    latch.await();
+    // wait for region to be online
+    int tries = 600;
+    while(masterServices.getAssignmentManager().getRegionStates()
+        .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new IOException("Wait interrupted", e);
+      }
+      tries--;
+    }
+    if(tries <= 0) {
+      throw new IOException("Failed to create group table.");
+    }
+  }
+
+  private void multiMutate(List<Mutation> mutations)
+      throws IOException {
+    MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
+    for (Mutation mutation : mutations) {
+      if (mutation instanceof Put) {
+        mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(
+          ClientProtos.MutationProto.MutationType.PUT, mutation));
+      } else if (mutation instanceof Delete) {
+        mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(
+          ClientProtos.MutationProto.MutationType.DELETE, mutation));
+      } else {
+        throw new DoNotRetryIOException("multiMutate doesn't support "
+          + mutation.getClass().getName());
+      }
+    }
+    MutateRowsRequest mrm = mrmBuilder.build();
+    // Be robust against movement of the rsgroup table
+    // TODO: Why is this necessary sometimes? Should we be using our own connection?
+    conn.clearRegionCache(RSGROUP_TABLE_NAME);
+    try (Table rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME)) {
+      CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
+      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
+          MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
+      try {
+        service.mutateRows(null, mrm);
+      } catch (ServiceException ex) {
+        ProtobufUtil.toIOException(ex);
+      }
+    }
+  }
+
+  private void checkGroupName(String groupName) throws ConstraintException {
+    if(!groupName.matches("[a-zA-Z0-9_]+")) {
+      throw new ConstraintException("Group name should only contain alphanumeric characters");
+    }
+  }
+
+  @Override
+  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
+      String dstGroup) throws IOException {
+    //get server's group
+    RSGroupInfo srcGroupInfo = getRSGroup(srcGroup);
+    RSGroupInfo dstGroupInfo = getRSGroup(dstGroup);
+
+    //move servers
+    for (Address el: servers) {
+      srcGroupInfo.removeServer(el);
+      dstGroupInfo.addServer(el);
+    }
+    //move tables
+    for(TableName tableName: tables) {
+      srcGroupInfo.removeTable(tableName);
+      dstGroupInfo.addTable(tableName);
+    }
+
+    //flush changed groupinfo
+    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
+    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
+    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
+    flushConfig(newGroupMap);    
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java
new file mode 100644
index 0000000..0874210
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.TableProtos;
+
+@InterfaceAudience.Private
+class RSGroupProtobufUtil {
+  static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) {
+    RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName());
+    for(HBaseProtos.ServerName el: proto.getServersList()) {
+      RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort()));
+    }
+    for(TableProtos.TableName pTableName: proto.getTablesList()) {
+      RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName));
+    }
+    return RSGroupInfo;
+  }
+
+  static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) {
+    List<TableProtos.TableName> tables = new ArrayList<>(pojo.getTables().size());
+    for(TableName arg: pojo.getTables()) {
+      tables.add(ProtobufUtil.toProtoTableName(arg));
+    }
+    List<HBaseProtos.ServerName> servers = new ArrayList<>(pojo.getServers().size());
+    for(Address el: pojo.getServers()) {
+      servers.add(HBaseProtos.ServerName.newBuilder()
+          .setHostName(el.getHostname())
+          .setPort(el.getPort())
+          .build());
+    }
+    return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName())
+        .addAllServers(servers)
+        .addAllTables(tables).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
new file mode 100644
index 0000000..642cb4a
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker
+public class RSGroupSerDe {
+  private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class);
+
+  public RSGroupSerDe() {
+
+  }
+
+  public List<RSGroupInfo> retrieveGroupList(Table groupTable) throws IOException {
+    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
+    for (Result result : groupTable.getScanner(new Scan())) {
+      RSGroupProtos.RSGroupInfo proto =
+          RSGroupProtos.RSGroupInfo.parseFrom(
+              result.getValue(
+                  RSGroupInfoManager.META_FAMILY_BYTES,
+                  RSGroupInfoManager.META_QUALIFIER_BYTES));
+      RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
+    }
+    return RSGroupInfoList;
+  }
+
+  public List<RSGroupInfo> retrieveGroupList(ZooKeeperWatcher watcher,
+                                             String groupBasePath) throws IOException {
+    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
+    //Overwrite any info stored by table, this takes precedence
+    try {
+      if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
+        for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
+          byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode));
+          if(data.length > 0) {
+            ProtobufUtil.expectPBMagicPrefix(data);
+            ByteArrayInputStream bis = new ByteArrayInputStream(
+                data, ProtobufUtil.lengthOfPBMagic(), data.length);
+            RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
+          }
+        }
+        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to read rsGroupZNode",e);
+    } catch (DeserializationException e) {
+      throw new IOException("Failed to read rsGroupZNode",e);
+    } catch (InterruptedException e) {
+      throw new IOException("Failed to read rsGroupZNode",e);
+    }
+    return RSGroupInfoList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
new file mode 100644
index 0000000..6c791a1
--- /dev/null
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.rsgroup;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+
+/**
+ * Marker Interface. RSGroups feature will check for a LoadBalancer
+ * marked with this Interface before it runs.
+ */
+@InterfaceAudience.Private
+public interface RSGroupableBalancer extends LoadBalancer {
+  /** Config for pluggable load balancers */
+  String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class";
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
new file mode 100644
index 0000000..3b96de6
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java
@@ -0,0 +1,573 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
+import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+//TODO use stochastic based load balancer instead
+@Category(SmallTests.class)
+public class TestRSGroupBasedLoadBalancer {
+
+  private static final Log LOG = LogFactory.getLog(TestRSGroupBasedLoadBalancer.class);
+  private static RSGroupBasedLoadBalancer loadBalancer;
+  private static SecureRandom rand;
+
+  static String[]  groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3",
+      "dg4" };
+  static TableName[] tables =
+      new TableName[] { TableName.valueOf("dt1"),
+          TableName.valueOf("dt2"),
+          TableName.valueOf("dt3"),
+          TableName.valueOf("dt4")};
+  static List<ServerName> servers;
+  static Map<String, RSGroupInfo> groupMap;
+  static Map<TableName, String> tableMap;
+  static List<HTableDescriptor> tableDescs;
+  int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 };
+  static int regionId = 0;
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    rand = new SecureRandom();
+    servers = generateServers(7);
+    groupMap = constructGroupInfo(servers, groups);
+    tableMap = new HashMap<TableName, String>();
+    tableDescs = constructTableDesc();
+    Configuration conf = HBaseConfiguration.create();
+    conf.set("hbase.regions.slop", "0");
+    conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName());
+    loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager());
+    loadBalancer.setMasterServices(getMockedMaster());
+    loadBalancer.setConf(conf);
+    loadBalancer.initialize();
+  }
+
+  /**
+   * Test the load balancing algorithm.
+   *
+   * Invariant is that all servers of the group should be hosting either floor(average) or
+   * ceiling(average)
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testBalanceCluster() throws Exception {
+    Map<ServerName, List<HRegionInfo>> servers = mockClusterServers();
+    ArrayListMultimap<String, ServerAndLoad> list = convertToGroupBasedMap(servers);
+    LOG.info("Mock Cluster :  " + printStats(list));
+    List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
+    ArrayListMultimap<String, ServerAndLoad> balancedCluster = reconcile(
+        list, plans);
+    LOG.info("Mock Balance : " + printStats(balancedCluster));
+    assertClusterAsBalanced(balancedCluster);
+  }
+
+  /**
+   * Invariant is that all servers of a group have load between floor(avg) and
+   * ceiling(avg) number of regions.
+   */
+  private void assertClusterAsBalanced(
+      ArrayListMultimap<String, ServerAndLoad> groupLoadMap) {
+    for (String gName : groupLoadMap.keySet()) {
+      List<ServerAndLoad> groupLoad = groupLoadMap.get(gName);
+      int numServers = groupLoad.size();
+      int numRegions = 0;
+      int maxRegions = 0;
+      int minRegions = Integer.MAX_VALUE;
+      for (ServerAndLoad server : groupLoad) {
+        int nr = server.getLoad();
+        if (nr > maxRegions) {
+          maxRegions = nr;
+        }
+        if (nr < minRegions) {
+          minRegions = nr;
+        }
+        numRegions += nr;
+      }
+      if (maxRegions - minRegions < 2) {
+        // less than 2 between max and min, can't balance
+        return;
+      }
+      int min = numRegions / numServers;
+      int max = numRegions % numServers == 0 ? min : min + 1;
+
+      for (ServerAndLoad server : groupLoad) {
+        assertTrue(server.getLoad() <= max);
+        assertTrue(server.getLoad() >= min);
+      }
+    }
+  }
+
+  /**
+   * All regions have an assignment.
+   *
+   * @param regions
+   * @param servers
+   * @param assignments
+   * @throws java.io.IOException
+   * @throws java.io.FileNotFoundException
+   */
+  private void assertImmediateAssignment(List<HRegionInfo> regions,
+                                         List<ServerName> servers,
+                                         Map<HRegionInfo, ServerName> assignments)
+      throws IOException {
+    for (HRegionInfo region : regions) {
+      assertTrue(assignments.containsKey(region));
+      ServerName server = assignments.get(region);
+      TableName tableName = region.getTable();
+
+      String groupName =
+          getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+      assertTrue(StringUtils.isNotEmpty(groupName));
+      RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName);
+      assertTrue("Region is not correctly assigned to group servers.",
+          gInfo.containsServer(server.getAddress()));
+    }
+  }
+
+  /**
+   * Tests the bulk assignment used during cluster startup.
+   *
+   * Round-robin. Should yield a balanced cluster so same invariant as the
+   * load balancer holds, all servers holding either floor(avg) or
+   * ceiling(avg).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testBulkAssignment() throws Exception {
+    List<HRegionInfo> regions = randomRegions(25);
+    Map<ServerName, List<HRegionInfo>> assignments = loadBalancer
+        .roundRobinAssignment(regions, servers);
+    //test empty region/servers scenario
+    //this should not throw an NPE
+    loadBalancer.roundRobinAssignment(regions, Collections.<ServerName>emptyList());
+    //test regular scenario
+    assertTrue(assignments.keySet().size() == servers.size());
+    for (ServerName sn : assignments.keySet()) {
+      List<HRegionInfo> regionAssigned = assignments.get(sn);
+      for (HRegionInfo region : regionAssigned) {
+        TableName tableName = region.getTable();
+        String groupName =
+            getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+        assertTrue(StringUtils.isNotEmpty(groupName));
+        RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
+            groupName);
+        assertTrue(
+            "Region is not correctly assigned to group servers.",
+            gInfo.containsServer(sn.getAddress()));
+      }
+    }
+    ArrayListMultimap<String, ServerAndLoad> loadMap = convertToGroupBasedMap(assignments);
+    assertClusterAsBalanced(loadMap);
+  }
+
+  /**
+   * Test the cluster startup bulk assignment which attempts to retain
+   * assignment info.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRetainAssignment() throws Exception {
+    // Test simple case where all same servers are there
+    Map<ServerName, List<HRegionInfo>> currentAssignments = mockClusterServers();
+    Map<HRegionInfo, ServerName> inputForTest = new HashMap<HRegionInfo, ServerName>();
+    for (ServerName sn : currentAssignments.keySet()) {
+      for (HRegionInfo region : currentAssignments.get(sn)) {
+        inputForTest.put(region, sn);
+      }
+    }
+    //verify region->null server assignment is handled
+    inputForTest.put(randomRegions(1).get(0), null);
+    Map<ServerName, List<HRegionInfo>> newAssignment = loadBalancer
+        .retainAssignment(inputForTest, servers);
+    assertRetainedAssignment(inputForTest, servers, newAssignment);
+  }
+
+  /**
+   * Asserts a valid retained assignment plan.
+   * <p>
+   * Must meet the following conditions:
+   * <ul>
+   * <li>Every input region has an assignment, and to an online server
+   * <li>If a region had an existing assignment to a server with the same
+   * address a a currently online server, it will be assigned to it
+   * </ul>
+   *
+   * @param existing
+   * @param assignment
+   * @throws java.io.IOException
+   * @throws java.io.FileNotFoundException
+   */
+  private void assertRetainedAssignment(
+      Map<HRegionInfo, ServerName> existing, List<ServerName> servers,
+      Map<ServerName, List<HRegionInfo>> assignment)
+      throws FileNotFoundException, IOException {
+    // Verify condition 1, every region assigned, and to online server
+    Set<ServerName> onlineServerSet = new TreeSet<ServerName>(servers);
+    Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
+    for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
+      assertTrue(
+          "Region assigned to server that was not listed as online",
+          onlineServerSet.contains(a.getKey()));
+      for (HRegionInfo r : a.getValue())
+        assignedRegions.add(r);
+    }
+    assertEquals(existing.size(), assignedRegions.size());
+
+    // Verify condition 2, every region must be assigned to correct server.
+    Set<String> onlineHostNames = new TreeSet<String>();
+    for (ServerName s : servers) {
+      onlineHostNames.add(s.getHostname());
+    }
+
+    for (Map.Entry<ServerName, List<HRegionInfo>> a : assignment.entrySet()) {
+      ServerName currentServer = a.getKey();
+      for (HRegionInfo r : a.getValue()) {
+        ServerName oldAssignedServer = existing.get(r);
+        TableName tableName = r.getTable();
+        String groupName =
+            getMockedGroupInfoManager().getRSGroupOfTable(tableName);
+        assertTrue(StringUtils.isNotEmpty(groupName));
+        RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(
+            groupName);
+        assertTrue(
+            "Region is not correctly assigned to group servers.",
+            gInfo.containsServer(currentServer.getAddress()));
+        if (oldAssignedServer != null
+            && onlineHostNames.contains(oldAssignedServer
+            .getHostname())) {
+          // this region was previously assigned somewhere, and that
+          // host is still around, then the host must have been is a
+          // different group.
+          if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) {
+            assertFalse(gInfo.containsServer(oldAssignedServer.getAddress()));
+          }
+        }
+      }
+    }
+  }
+
+  private String printStats(
+      ArrayListMultimap<String, ServerAndLoad> groupBasedLoad) {
+    StringBuffer sb = new StringBuffer();
+    sb.append("\n");
+    for (String groupName : groupBasedLoad.keySet()) {
+      sb.append("Stats for group: " + groupName);
+      sb.append("\n");
+      sb.append(groupMap.get(groupName).getServers());
+      sb.append("\n");
+      List<ServerAndLoad> groupLoad = groupBasedLoad.get(groupName);
+      int numServers = groupLoad.size();
+      int totalRegions = 0;
+      sb.append("Per Server Load: \n");
+      for (ServerAndLoad sLoad : groupLoad) {
+        sb.append("Server :" + sLoad.getServerName() + " Load : "
+            + sLoad.getLoad() + "\n");
+        totalRegions += sLoad.getLoad();
+      }
+      sb.append(" Group Statistics : \n");
+      float average = (float) totalRegions / numServers;
+      int max = (int) Math.ceil(average);
+      int min = (int) Math.floor(average);
+      sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg="
+          + average + " max=" + max + " min=" + min + "]");
+      sb.append("\n");
+      sb.append("===============================");
+      sb.append("\n");
+    }
+    return sb.toString();
+  }
+
+  private ArrayListMultimap<String, ServerAndLoad> convertToGroupBasedMap(
+      final Map<ServerName, List<HRegionInfo>> serversMap) throws IOException {
+    ArrayListMultimap<String, ServerAndLoad> loadMap = ArrayListMultimap
+        .create();
+    for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) {
+      Set<Address> groupServers = gInfo.getServers();
+      for (Address server : groupServers) {
+        ServerName actual = null;
+        for(ServerName entry: servers) {
+          if(entry.getAddress().equals(server)) {
+            actual = entry;
+            break;
+          }
+        }
+        List<HRegionInfo> regions = serversMap.get(actual);
+        assertTrue("No load for " + actual, regions != null);
+        loadMap.put(gInfo.getName(),
+            new ServerAndLoad(actual, regions.size()));
+      }
+    }
+    return loadMap;
+  }
+
+  private ArrayListMultimap<String, ServerAndLoad> reconcile(
+      ArrayListMultimap<String, ServerAndLoad> previousLoad,
+      List<RegionPlan> plans) {
+    ArrayListMultimap<String, ServerAndLoad> result = ArrayListMultimap
+        .create();
+    result.putAll(previousLoad);
+    if (plans != null) {
+      for (RegionPlan plan : plans) {
+        ServerName source = plan.getSource();
+        updateLoad(result, source, -1);
+        ServerName destination = plan.getDestination();
+        updateLoad(result, destination, +1);
+      }
+    }
+    return result;
+  }
+
+  private void updateLoad(
+      ArrayListMultimap<String, ServerAndLoad> previousLoad,
+      final ServerName sn, final int diff) {
+    for (String groupName : previousLoad.keySet()) {
+      ServerAndLoad newSAL = null;
+      ServerAndLoad oldSAL = null;
+      for (ServerAndLoad sal : previousLoad.get(groupName)) {
+        if (ServerName.isSameAddress(sn, sal.getServerName())) {
+          oldSAL = sal;
+          newSAL = new ServerAndLoad(sn, sal.getLoad() + diff);
+          break;
+        }
+      }
+      if (newSAL != null) {
+        previousLoad.remove(groupName, oldSAL);
+        previousLoad.put(groupName, newSAL);
+        break;
+      }
+    }
+  }
+
+  private Map<ServerName, List<HRegionInfo>> mockClusterServers() throws IOException {
+    assertTrue(servers.size() == regionAssignment.length);
+    Map<ServerName, List<HRegionInfo>> assignment = new TreeMap<ServerName, List<HRegionInfo>>();
+    for (int i = 0; i < servers.size(); i++) {
+      int numRegions = regionAssignment[i];
+      List<HRegionInfo> regions = assignedRegions(numRegions, servers.get(i));
+      assignment.put(servers.get(i), regions);
+    }
+    return assignment;
+  }
+
+  /**
+   * Generate a list of regions evenly distributed between the tables.
+   *
+   * @param numRegions The number of regions to be generated.
+   * @return List of HRegionInfo.
+   */
+  private List<HRegionInfo> randomRegions(int numRegions) {
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+    byte[] start = new byte[16];
+    byte[] end = new byte[16];
+    rand.nextBytes(start);
+    rand.nextBytes(end);
+    int regionIdx = rand.nextInt(tables.length);
+    for (int i = 0; i < numRegions; i++) {
+      Bytes.putInt(start, 0, numRegions << 1);
+      Bytes.putInt(end, 0, (numRegions << 1) + 1);
+      int tableIndex = (i + regionIdx) % tables.length;
+      HRegionInfo hri = new HRegionInfo(
+          tables[tableIndex], start, end, false, regionId++);
+      regions.add(hri);
+    }
+    return regions;
+  }
+
+  /**
+   * Generate assigned regions to a given server using group information.
+   *
+   * @param numRegions the num regions to generate
+   * @param sn the servername
+   * @return the list of regions
+   * @throws java.io.IOException Signals that an I/O exception has occurred.
+   */
+  private List<HRegionInfo> assignedRegions(int numRegions, ServerName sn) throws IOException {
+    List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
+    byte[] start = new byte[16];
+    byte[] end = new byte[16];
+    Bytes.putInt(start, 0, numRegions << 1);
+    Bytes.putInt(end, 0, (numRegions << 1) + 1);
+    for (int i = 0; i < numRegions; i++) {
+      TableName tableName = getTableName(sn);
+      HRegionInfo hri = new HRegionInfo(
+          tableName, start, end, false,
+          regionId++);
+      regions.add(hri);
+    }
+    return regions;
+  }
+
+  private static List<ServerName> generateServers(int numServers) {
+    List<ServerName> servers = new ArrayList<ServerName>(numServers);
+    for (int i = 0; i < numServers; i++) {
+      String host = "server" + rand.nextInt(100000);
+      int port = rand.nextInt(60000);
+      servers.add(ServerName.valueOf(host, port, -1));
+    }
+    return servers;
+  }
+
+  /**
+   * Construct group info, with each group having at least one server.
+   *
+   * @param servers the servers
+   * @param groups the groups
+   * @return the map
+   */
+  private static Map<String, RSGroupInfo> constructGroupInfo(
+      List<ServerName> servers, String[] groups) {
+    assertTrue(servers != null);
+    assertTrue(servers.size() >= groups.length);
+    int index = 0;
+    Map<String, RSGroupInfo> groupMap = new HashMap<String, RSGroupInfo>();
+    for (String grpName : groups) {
+      RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName);
+      RSGroupInfo.addServer(servers.get(index).getAddress());
+      groupMap.put(grpName, RSGroupInfo);
+      index++;
+    }
+    while (index < servers.size()) {
+      int grpIndex = rand.nextInt(groups.length);
+      groupMap.get(groups[grpIndex]).addServer(servers.get(index).getAddress());
+      index++;
+    }
+    return groupMap;
+  }
+
+  /**
+   * Construct table descriptors evenly distributed between the groups.
+   *
+   * @return the list
+   */
+  private static List<HTableDescriptor> constructTableDesc() {
+    List<HTableDescriptor> tds = Lists.newArrayList();
+    int index = rand.nextInt(groups.length);
+    for (int i = 0; i < tables.length; i++) {
+      HTableDescriptor htd = new HTableDescriptor(tables[i]);
+      int grpIndex = (i + index) % groups.length ;
+      String groupName = groups[grpIndex];
+      tableMap.put(tables[i], groupName);
+      tds.add(htd);
+    }
+    return tds;
+  }
+
+  private static MasterServices getMockedMaster() throws IOException {
+    TableDescriptors tds = Mockito.mock(TableDescriptors.class);
+    Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0));
+    Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1));
+    Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2));
+    Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3));
+    MasterServices services = Mockito.mock(HMaster.class);
+    Mockito.when(services.getTableDescriptors()).thenReturn(tds);
+    AssignmentManager am = Mockito.mock(AssignmentManager.class);
+    Mockito.when(services.getAssignmentManager()).thenReturn(am);
+    return services;
+  }
+
+  private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException {
+    RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class);
+    Mockito.when(gm.getRSGroup(groups[0])).thenReturn(
+        groupMap.get(groups[0]));
+    Mockito.when(gm.getRSGroup(groups[1])).thenReturn(
+        groupMap.get(groups[1]));
+    Mockito.when(gm.getRSGroup(groups[2])).thenReturn(
+        groupMap.get(groups[2]));
+    Mockito.when(gm.getRSGroup(groups[3])).thenReturn(
+        groupMap.get(groups[3]));
+    Mockito.when(gm.listRSGroups()).thenReturn(
+        Lists.newLinkedList(groupMap.values()));
+    Mockito.when(gm.isOnline()).thenReturn(true);
+    Mockito.when(gm.getRSGroupOfTable(Mockito.any(TableName.class)))
+        .thenAnswer(new Answer<String>() {
+          @Override
+          public String answer(InvocationOnMock invocation) throws Throwable {
+            return tableMap.get(invocation.getArguments()[0]);
+          }
+        });
+    return gm;
+  }
+
+  private TableName getTableName(ServerName sn) throws IOException {
+    TableName tableName = null;
+    RSGroupInfoManager gm = getMockedGroupInfoManager();
+    RSGroupInfo groupOfServer = null;
+    for(RSGroupInfo gInfo : gm.listRSGroups()){
+      if(gInfo.containsServer(sn.getAddress())){
+        groupOfServer = gInfo;
+        break;
+      }
+    }
+
+    for(HTableDescriptor desc : tableDescs){
+      if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){
+        tableName = desc.getTableName();
+      }
+    }
+    return tableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
new file mode 100644
index 0000000..3ad928f
--- /dev/null
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java
@@ -0,0 +1,300 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.rsgroup;
+
+import com.google.common.collect.Sets;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MediumTests.class})
+public class TestRSGroups extends TestRSGroupsBase {
+  protected static final Log LOG = LogFactory.getLog(TestRSGroups.class);
+  private static HMaster master;
+  private static boolean init = false;
+  private static RSGroupAdminEndpoint RSGroupAdminEndpoint;
+
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    TEST_UTIL.getConfiguration().setFloat(
+            "hbase.master.balancer.stochastic.tableSkewCost", 6000);
+    TEST_UTIL.getConfiguration().set(
+        HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
+        RSGroupBasedLoadBalancer.class.getName());
+    TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        RSGroupAdminEndpoint.class.getName());
+    TEST_UTIL.getConfiguration().setBoolean(
+        HConstants.ZOOKEEPER_USEMULTI,
+        true);
+    TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
+    TEST_UTIL.getConfiguration().set(
+        ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
+        ""+NUM_SLAVES_BASE);
+    TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+
+    admin = TEST_UTIL.getHBaseAdmin();
+    cluster = TEST_UTIL.getHBaseCluster();
+    master = ((MiniHBaseCluster)cluster).getMaster();
+
+    //wait for balancer to come online
+    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return master.isInitialized() &&
+            ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
+      }
+    });
+    admin.setBalancerRunning(false,true);
+    rsGroupAdmin = new VerifyingRSGroupAdminClient(new RSGroupAdminClient(TEST_UTIL.getConnection()),
+        TEST_UTIL.getConfiguration());
+    RSGroupAdminEndpoint =
+        master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void beforeMethod() throws Exception {
+    if(!init) {
+      init = true;
+      afterMethod();
+    }
+
+  }
+
+  @After
+  public void afterMethod() throws Exception {
+    deleteTableIfNecessary();
+    deleteNamespaceIfNecessary();
+    deleteGroups();
+
+    int missing = NUM_SLAVES_BASE - getNumServers();
+    LOG.info("Restoring servers: "+missing);
+    for(int i=0; i<missing; i++) {
+      ((MiniHBaseCluster)cluster).startRegionServer();
+    }
+
+    rsGroupAdmin.addRSGroup("master");
+    ServerName masterServerName =
+        ((MiniHBaseCluster)cluster).getMaster().getServerName();
+
+    try {
+      rsGroupAdmin.moveServers(
+          Sets.newHashSet(masterServerName.getAddress()),
+          "master");
+    } catch (Exception ex) {
+      // ignore
+    }
+    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        LOG.info("Waiting for cleanup to finish " + rsGroupAdmin.listRSGroups());
+        //Might be greater since moving servers back to default
+        //is after starting a server
+
+        return rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size()
+            == NUM_SLAVES_BASE;
+      }
+    });
+  }
+
+  @Test
+  public void testBasicStartUp() throws IOException {
+    RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
+    assertEquals(4, defaultInfo.getServers().size());
+    // Assignment of root and meta regions.
+    int count = master.getAssignmentManager().getRegionStates().getRegionAssignments().size();
+    //3 meta,namespace, group
+    assertEquals(3, count);
+  }
+
+  @Test
+  public void testNamespaceCreateAndAssign() throws Exception {
+    LOG.info("testNamespaceCreateAndAssign");
+    String nsName = tablePrefix+"_foo";
+    final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign");
+    RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1);
+    admin.createNamespace(NamespaceDescriptor.create(nsName)
+        .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "appInfo").build());
+    final HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor("f"));
+    admin.createTable(desc);
+    //wait for created table to be assigned
+    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return getTableRegionMap().get(desc.getTableName()) != null;
+      }
+    });
+    ServerName targetServer =
+        ServerName.parseServerName(appInfo.getServers().iterator().next().toString());
+    AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer);
+    //verify it was assigned to the right group
+    Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size());
+  }
+
+  @Test
+  public void testDefaultNamespaceCreateAndAssign() throws Exception {
+    LOG.info("testDefaultNamespaceCreateAndAssign");
+    final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign");
+    admin.modifyNamespace(NamespaceDescriptor.create("default")
+        .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "default").build());
+    final HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor("f"));
+    admin.createTable(desc);
+    //wait for created table to be assigned
+    TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return getTableRegionMap().get(desc.getTableName()) != null;
+      }
+    });
+  }
+
+  @Test
+  public void testNamespaceConstraint() throws Exception {
+    String nsName = tablePrefix+"_foo";
+    String groupName = tablePrefix+"_foo";
+    LOG.info("testNamespaceConstraint");
+    rsGroupAdmin.addRSGroup(groupName);
+    admin.createNamespace(NamespaceDescriptor.create(nsName)
+        .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
+        .build());
+    //test removing a referenced group
+    try {
+      rsGroupAdmin.removeRSGroup(groupName);
+      fail("Expected a constraint exception");
+    } catch (IOException ex) {
+    }
+    //test modify group
+    //changing with the same name is fine
+    admin.modifyNamespace(
+        NamespaceDescriptor.create(nsName)
+          .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName)
+          .build());
+    String anotherGroup = tablePrefix+"_anotherGroup";
+    rsGroupAdmin.addRSGroup(anotherGroup);
+    //test add non-existent group
+    admin.deleteNamespace(nsName);
+    rsGroupAdmin.removeRSGroup(groupName);
+    try {
+      admin.createNamespace(NamespaceDescriptor.create(nsName)
+          .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo")
+          .build());
+      fail("Expected a constraint exception");
+    } catch (IOException ex) {
+    }
+  }
+
+  @Test
+  public void testGroupInfoMultiAccessing() throws Exception {
+    RSGroupInfoManager manager = RSGroupAdminEndpoint.getGroupInfoManager();
+    final RSGroupInfo defaultGroup = manager.getRSGroup("default");
+    // getRSGroup updates default group's server list
+    // this process must not affect other threads iterating the list
+    Iterator<Address> it = defaultGroup.getServers().iterator();
+    manager.getRSGroup("default");
+    it.next();
+  }
+
+  @Test
+  public void testMisplacedRegions() throws Exception {
+    final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
+    LOG.info("testMisplacedRegions");
+
+    final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1);
+
+    TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15);
+    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
+
+    RSGroupAdminEndpoint.getGroupInfoManager()
+        .moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName());
+
+    assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName()));
+
+    TEST_UTIL.waitFor(60000, new Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ServerName serverName =
+            ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1);
+        return admin.getConnection().getAdmin()
+            .getOnlineRegions(serverName).size() == 15;
+      }
+    });
+  }
+
+  @Test
+  public void testCloneSnapshot() throws Exception {
+    final TableName tableName = TableName.valueOf(tablePrefix+"_testCloneSnapshot");
+    LOG.info("testCloneSnapshot");
+
+    byte[] FAMILY = Bytes.toBytes("test");
+    String snapshotName = tableName.getNameAsString() + "_snap";
+    TableName clonedTableName = TableName.valueOf(tableName.getNameAsString() + "_clone");
+
+    // create base table
+    TEST_UTIL.createTable(tableName, FAMILY);
+
+    // create snapshot
+    admin.snapshot(snapshotName, tableName);
+
+    // clone
+    admin.cloneSnapshot(snapshotName, clonedTableName);
+  }
+
+}


Mime
View raw message