Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 507D2200D4A for ; Tue, 24 Oct 2017 03:02:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4F15A1609E0; Tue, 24 Oct 2017 01:02:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25E98160BF8 for ; Tue, 24 Oct 2017 03:02:55 +0200 (CEST) Received: (qmail 2790 invoked by uid 500); 24 Oct 2017 01:02:55 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 1890 invoked by uid 99); 24 Oct 2017 01:02:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Oct 2017 01:02:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B3DA6DF9FE; Tue, 24 Oct 2017 01:02:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apurtell@apache.org To: commits@hbase.apache.org Date: Tue, 24 Oct 2017 01:03:01 -0000 Message-Id: In-Reply-To: <0434de0002204f72bd5340406a600695@git.apache.org> References: <0434de0002204f72bd5340406a600695@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/14] hbase git commit: HBASE-15631 Backport Regionserver Groups (HBASE-6721) to branch-1 (Francis Liu and Andrew Purtell) archived-at: Tue, 24 Oct 2017 01:02:58 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java new file mode 100644 index 0000000..eec03ce --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -0,0 +1,795 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.google.common.collect.Sets; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This is an implementation of {@link RSGroupInfoManager}. Which makes + * use of an HBase table as the persistence store for the group information. + * It also makes use of zookeeper to store group information needed + * for bootstrapping during offline mode. + */ +public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListener { + private static final Log LOG = LogFactory.getLog(RSGroupInfoManagerImpl.class); + + /** Table descriptor for hbase:rsgroup catalog table */ + private final static HTableDescriptor RSGROUP_TABLE_DESC; + static { + RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME); + RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES)); + RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); + try { + RSGROUP_TABLE_DESC.addCoprocessor( + MultiRowMutationEndpoint.class.getName(), + null, Coprocessor.PRIORITY_SYSTEM, null); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + private volatile Map rsGroupMap; + private volatile Map tableMap; + private MasterServices master; + private ClusterConnection conn; + private ZooKeeperWatcher watcher; + private RSGroupStartupWorker rsGroupStartupWorker; + // contains list of groups that were last flushed to persistent store + private volatile Set prevRSGroups; + private RSGroupSerDe rsGroupSerDe; + private DefaultServerUpdater defaultServerUpdater; + private boolean isInit = false; + + public RSGroupInfoManagerImpl(MasterServices master) throws IOException { + this.rsGroupMap = Collections.emptyMap(); + this.tableMap = Collections.emptyMap(); + rsGroupSerDe = new RSGroupSerDe(); + this.master = master; + this.watcher = master.getZooKeeper(); + this.conn = master.getConnection(); + prevRSGroups = new HashSet(); + } + + public void init() throws IOException{ + rsGroupStartupWorker = new RSGroupStartupWorker(this, master, conn); + refresh(); + rsGroupStartupWorker.start(); + defaultServerUpdater = new DefaultServerUpdater(this); + master.getServerManager().registerListener(this); + defaultServerUpdater.start(); + isInit = true; + } + + boolean isInit() { + return isInit; + } + + /** + * Adds the group. + * + * @param rsGroupInfo the group name + */ + @Override + public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { + checkGroupName(rsGroupInfo.getName()); + if (rsGroupMap.get(rsGroupInfo.getName()) != null || + rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName()); + } + Map newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); + flushConfig(newGroupMap); + } + + @Override + public synchronized Set
moveServers(Set
servers, String srcGroup, + String dstGroup) throws IOException { + if (servers == null) { + throw new ConstraintException("The list of servers to move cannot be null."); + } + Set
movedServers = Sets.newHashSet(); + if (!rsGroupMap.containsKey(srcGroup)) { + throw new DoNotRetryIOException("Group "+srcGroup+" does not exist"); + } + if (!rsGroupMap.containsKey(dstGroup)) { + throw new DoNotRetryIOException("Group "+dstGroup+" does not exist"); + } + + RSGroupInfo src = new RSGroupInfo(getRSGroup(srcGroup)); + RSGroupInfo dst = new RSGroupInfo(getRSGroup(dstGroup)); + for(Address el: servers) { + if (src.removeServer(el)) { + movedServers.add(el); + } + dst.addServer(el); + } + + Map newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(src.getName(), src); + newGroupMap.put(dst.getName(), dst); + + flushConfig(newGroupMap); + return movedServers; + } + + /** + * Gets the group info of server. + * + * @param server the server + * @return An instance of GroupInfo. + */ + @Override + public RSGroupInfo getRSGroupOfServer(Address server) throws IOException { + for (RSGroupInfo info : rsGroupMap.values()) { + if (info.containsServer(server)){ + return info; + } + } + return null; + } + + /** + * Gets the group information. + * + * @param groupName + * the group name + * @return An instance of GroupInfo + */ + @Override + public RSGroupInfo getRSGroup(String groupName) throws IOException { + RSGroupInfo RSGroupInfo = rsGroupMap.get(groupName); + return RSGroupInfo; + } + + + + @Override + public String getRSGroupOfTable(TableName tableName) throws IOException { + return tableMap.get(tableName); + } + + @Override + public synchronized void moveTables( + Set tableNames, String groupName) throws IOException { + if (groupName != null && !rsGroupMap.containsKey(groupName)) { + throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a special group"); + } + + Map newGroupMap = Maps.newHashMap(rsGroupMap); + for(TableName tableName: tableNames) { + if (tableMap.containsKey(tableName)) { + RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); + src.removeTable(tableName); + newGroupMap.put(src.getName(), src); + } + if(groupName != null) { + RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName)); + dst.addTable(tableName); + newGroupMap.put(dst.getName(), dst); + } + } + + flushConfig(newGroupMap); + } + + + /** + * Delete a region server group. + * + * @param groupName the group name + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + @Override + public synchronized void removeRSGroup(String groupName) throws IOException { + if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { + throw new DoNotRetryIOException("Group "+groupName+" does not exist or is a reserved group"); + } + Map newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.remove(groupName); + flushConfig(newGroupMap); + } + + @Override + public List listRSGroups() throws IOException { + List list = Lists.newLinkedList(rsGroupMap.values()); + return list; + } + + @Override + public boolean isOnline() { + return rsGroupStartupWorker.isOnline(); + } + + @Override + public synchronized void refresh() throws IOException { + refresh(false); + } + + private synchronized void refresh(boolean forceOnline) throws IOException { + List groupList = new LinkedList(); + + // overwrite anything read from zk, group table is source of truth + // if online read from GROUP table + if (forceOnline || isOnline()) { + LOG.debug("Refreshing in Online mode."); + try (Table rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME)) { + groupList.addAll(rsGroupSerDe.retrieveGroupList(rsGroupTable)); + } + } else { + LOG.debug("Refershing in Offline mode."); + String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode); + groupList.addAll(rsGroupSerDe.retrieveGroupList(watcher, groupBasePath)); + } + + // refresh default group, prune + NavigableSet orphanTables = new TreeSet(); + for(String entry: master.getTableDescriptors().getAll().keySet()) { + orphanTables.add(TableName.valueOf(entry)); + } + + List specialTables; + if(!master.isInitialized()) { + specialTables = new ArrayList(); + specialTables.add(AccessControlLists.ACL_TABLE_NAME); + specialTables.add(TableName.META_TABLE_NAME); + specialTables.add(TableName.NAMESPACE_TABLE_NAME); + specialTables.add(RSGROUP_TABLE_NAME); + } else { + specialTables = + master.listTableNamesByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); + } + + for(TableName table : specialTables) { + orphanTables.add(table); + } + for(RSGroupInfo group: groupList) { + if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + orphanTables.removeAll(group.getTables()); + } + } + + // This is added to the last of the list + // so it overwrites the default group loaded + // from region group table or zk + groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, + Sets.newHashSet(getDefaultServers()), + orphanTables)); + + + // populate the data + HashMap newGroupMap = Maps.newHashMap(); + HashMap newTableMap = Maps.newHashMap(); + for (RSGroupInfo group : groupList) { + newGroupMap.put(group.getName(), group); + for(TableName table: group.getTables()) { + newTableMap.put(table, group.getName()); + } + } + rsGroupMap = Collections.unmodifiableMap(newGroupMap); + tableMap = Collections.unmodifiableMap(newTableMap); + + prevRSGroups.clear(); + prevRSGroups.addAll(rsGroupMap.keySet()); + } + + private synchronized Map flushConfigTable(Map newGroupMap) + throws IOException { + Map newTableMap = Maps.newHashMap(); + List mutations = Lists.newArrayList(); + + // populate deletes + for(String groupName : prevRSGroups) { + if(!newGroupMap.containsKey(groupName)) { + Delete d = new Delete(Bytes.toBytes(groupName)); + mutations.add(d); + } + } + + // populate puts + for(RSGroupInfo RSGroupInfo : newGroupMap.values()) { + RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); + Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); + p.addColumn(META_FAMILY_BYTES, + META_QUALIFIER_BYTES, + proto.toByteArray()); + mutations.add(p); + for(TableName entry: RSGroupInfo.getTables()) { + newTableMap.put(entry, RSGroupInfo.getName()); + } + } + + if(mutations.size() > 0) { + multiMutate(mutations); + } + return newTableMap; + } + + private synchronized void flushConfig(Map newGroupMap) throws IOException { + Map newTableMap; + + // For offline mode persistence is still unavailable + // We're refreshing in-memory state but only for default servers + if (!isOnline()) { + Map m = Maps.newHashMap(rsGroupMap); + RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP); + RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); + if (!m.equals(newGroupMap) || + !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) { + throw new IOException("Only default servers can be updated during offline mode"); + } + newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); + rsGroupMap = newGroupMap; + return; + } + + newTableMap = flushConfigTable(newGroupMap); + + // make changes visible since it has been + // persisted in the source of truth + rsGroupMap = Collections.unmodifiableMap(newGroupMap); + tableMap = Collections.unmodifiableMap(newTableMap); + + + try { + String groupBasePath = ZKUtil.joinZNode(watcher.baseZNode, rsGroupZNode); + ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufUtil.PB_MAGIC); + + List zkOps = new ArrayList(newGroupMap.size()); + for(String groupName : prevRSGroups) { + if(!newGroupMap.containsKey(groupName)) { + String znode = ZKUtil.joinZNode(groupBasePath, groupName); + zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); + } + } + + + for(RSGroupInfo RSGroupInfo : newGroupMap.values()) { + String znode = ZKUtil.joinZNode(groupBasePath, RSGroupInfo.getName()); + RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); + LOG.debug("Updating znode: "+znode); + ZKUtil.createAndFailSilent(watcher, znode); + zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); + zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, + ProtobufUtil.prependPBMagic(proto.toByteArray()))); + } + LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); + + ZKUtil.multiOrSequential(watcher, zkOps, false); + } catch (KeeperException e) { + LOG.error("Failed to write to rsGroupZNode", e); + master.abort("Failed to write to rsGroupZNode", e); + throw new IOException("Failed to write to rsGroupZNode",e); + } + + prevRSGroups.clear(); + prevRSGroups.addAll(newGroupMap.keySet()); + } + + private List getOnlineRS() throws IOException { + if (master != null) { + return master.getServerManager().getOnlineServersList(); + } + try { + LOG.debug("Reading online RS from zookeeper"); + List servers = new LinkedList(); + for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode)) { + servers.add(ServerName.parseServerName(el)); + } + return servers; + } catch (KeeperException e) { + throw new IOException("Failed to retrieve server list from zookeeper", e); + } + } + + private List
getDefaultServers() throws IOException { + List
defaultServers = new LinkedList
(); + for(ServerName server : getOnlineRS()) { + Address address = Address.fromParts(server.getHostname(), server.getPort()); + boolean found = false; + for(RSGroupInfo info : rsGroupMap.values()) { + if(!RSGroupInfo.DEFAULT_GROUP.equals(info.getName()) && + info.containsServer(address)) { + found = true; + break; + } + } + if(!found) { + defaultServers.add(address); + } + } + return defaultServers; + } + + private synchronized void updateDefaultServers( + Set
server) throws IOException { + RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); + RSGroupInfo newInfo = new RSGroupInfo(info.getName(), server, info.getTables()); + HashMap newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(newInfo.getName(), newInfo); + flushConfig(newGroupMap); + } + + @Override + public void serverAdded(ServerName serverName) { + defaultServerUpdater.serverChanged(); + } + + @Override + public void serverRemoved(ServerName serverName) { + defaultServerUpdater.serverChanged(); + } + + private static class DefaultServerUpdater extends Thread { + private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class); + private RSGroupInfoManagerImpl mgr; + private boolean hasChanged = false; + + public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) { + this.mgr = mgr; + } + + @Override + public void run() { + List
prevDefaultServers = new LinkedList
(); + while(!mgr.master.isAborted() || !mgr.master.isStopped()) { + try { + LOG.info("Updating default servers."); + List
servers = mgr.getDefaultServers(); + Collections.sort(servers, new Comparator
() { + @Override + public int compare(Address o1, Address o2) { + int diff = o1.getHostname().compareTo(o2.getHostname()); + if (diff != 0) { + return diff; + } + return o1.getPort() - o2.getPort(); + } + }); + if(!servers.equals(prevDefaultServers)) { + mgr.updateDefaultServers(Sets.
newHashSet(servers)); + prevDefaultServers = servers; + LOG.info("Updated with servers: "+servers.size()); + } + try { + synchronized (this) { + if(!hasChanged) { + wait(); + } + hasChanged = false; + } + } catch (InterruptedException e) { + } + } catch (IOException e) { + LOG.warn("Failed to update default servers", e); + } + } + } + + public void serverChanged() { + synchronized (this) { + hasChanged = true; + this.notify(); + } + } + } + + @Override + public void waiting() { + + } + + private static class RSGroupStartupWorker extends Thread { + private static final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class); + + private volatile boolean isOnline = false; + private MasterServices masterServices; + private RSGroupInfoManagerImpl groupInfoManager; + private ClusterConnection conn; + + public RSGroupStartupWorker(RSGroupInfoManagerImpl groupInfoManager, + MasterServices masterServices, + ClusterConnection conn) { + this.masterServices = masterServices; + this.groupInfoManager = groupInfoManager; + this.conn = conn; + setName(RSGroupStartupWorker.class.getName()+"-"+masterServices.getServerName()); + setDaemon(true); + } + + @Override + public void run() { + if(waitForGroupTableOnline()) { + LOG.info("GroupBasedLoadBalancer is now online"); + } + } + + public boolean waitForGroupTableOnline() { + final List foundRegions = new LinkedList(); + final List assignedRegions = new LinkedList(); + final AtomicBoolean found = new AtomicBoolean(false); + final TableStateManager tsm = + masterServices.getAssignmentManager().getTableStateManager(); + boolean createSent = false; + while (!found.get() && isMasterRunning()) { + foundRegions.clear(); + assignedRegions.clear(); + found.set(true); + try { + boolean rootMetaFound = + masterServices.getMetaTableLocator().verifyMetaRegionLocation( + conn, + masterServices.getZooKeeper(), + 1); + final AtomicBoolean nsFound = new AtomicBoolean(false); + if (rootMetaFound) { + + MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { + @Override + public boolean visit(Result row) throws IOException { + + HRegionInfo info = MetaTableAccessor.getHRegionInfo(row); + if (info != null) { + Cell serverCell = + row.getColumnLatestCell(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) { + ServerName sn = + ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell)); + if (sn == null) { + found.set(false); + } else if (tsm.isTableState(RSGROUP_TABLE_NAME, + ZooKeeperProtos.Table.State.ENABLED)) { + try { + ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(info.getRegionName(), + new Get(ROW_KEY)); + rs.get(null, request); + assignedRegions.add(info); + } catch(Exception ex) { + LOG.debug("Caught exception while verifying group region", ex); + } + } + foundRegions.add(info); + } + if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) { + Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + ServerName sn = null; + if(cell != null) { + sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell)); + } + if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME, + ZooKeeperProtos.Table.State.ENABLED)) { + try { + ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(info.getRegionName(), + new Get(ROW_KEY)); + rs.get(null, request); + nsFound.set(true); + } catch(Exception ex) { + LOG.debug("Caught exception while verifying group region", ex); + } + } + } + } + return true; + } + }; + MetaTableAccessor.fullScan(conn, visitor); + // if no regions in meta then we have to create the table + if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) { + groupInfoManager.createGroupTable(masterServices); + createSent = true; + } + LOG.info("Group table: " + RSGROUP_TABLE_NAME + " isOnline: " + found.get() + + ", regionCount: " + foundRegions.size() + ", assignCount: " + + assignedRegions.size() + ", rootMetaFound: "+rootMetaFound); + found.set(found.get() && assignedRegions.size() == foundRegions.size() + && foundRegions.size() > 0); + } else { + LOG.info("Waiting for catalog tables to come online"); + found.set(false); + } + if (found.get()) { + LOG.debug("With group table online, refreshing cached information."); + groupInfoManager.refresh(true); + isOnline = true; + //flush any inconsistencies between ZK and HTable + groupInfoManager.flushConfig(groupInfoManager.rsGroupMap); + } + } catch (RuntimeException e) { + throw e; + } catch(Exception e) { + found.set(false); + LOG.warn("Failed to perform check", e); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + } + return found.get(); + } + + public boolean isOnline() { + return isOnline; + } + + private boolean isMasterRunning() { + return !masterServices.isAborted() && !masterServices.isStopped(); + } + } + + private void createGroupTable(MasterServices masterServices) throws IOException { + HRegionInfo[] newRegions = + ModifyRegionUtils.createHRegionInfos(RSGROUP_TABLE_DESC, null); + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + masterServices.getMasterProcedureExecutor().submitProcedure( + new CreateTableProcedure( + masterServices.getMasterProcedureExecutor().getEnvironment(), + RSGROUP_TABLE_DESC, + newRegions, + latch)); + latch.await(); + // wait for region to be online + int tries = 600; + while(masterServices.getAssignmentManager().getRegionStates() + .getRegionServerOfRegion(newRegions[0]) == null && tries > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException("Wait interrupted", e); + } + tries--; + } + if(tries <= 0) { + throw new IOException("Failed to create group table."); + } + } + + private void multiMutate(List mutations) + throws IOException { + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + for (Mutation mutation : mutations) { + if (mutation instanceof Put) { + mrmBuilder.addMutationRequest(ProtobufUtil.toMutation( + ClientProtos.MutationProto.MutationType.PUT, mutation)); + } else if (mutation instanceof Delete) { + mrmBuilder.addMutationRequest(ProtobufUtil.toMutation( + ClientProtos.MutationProto.MutationType.DELETE, mutation)); + } else { + throw new DoNotRetryIOException("multiMutate doesn't support " + + mutation.getClass().getName()); + } + } + MutateRowsRequest mrm = mrmBuilder.build(); + // Be robust against movement of the rsgroup table + // TODO: Why is this necessary sometimes? Should we be using our own connection? + conn.clearRegionCache(RSGROUP_TABLE_NAME); + try (Table rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME)) { + CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY); + MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = + MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); + try { + service.mutateRows(null, mrm); + } catch (ServiceException ex) { + ProtobufUtil.toIOException(ex); + } + } + } + + private void checkGroupName(String groupName) throws ConstraintException { + if(!groupName.matches("[a-zA-Z0-9_]+")) { + throw new ConstraintException("Group name should only contain alphanumeric characters"); + } + } + + @Override + public void moveServersAndTables(Set
servers, Set tables, String srcGroup, + String dstGroup) throws IOException { + //get server's group + RSGroupInfo srcGroupInfo = getRSGroup(srcGroup); + RSGroupInfo dstGroupInfo = getRSGroup(dstGroup); + + //move servers + for (Address el: servers) { + srcGroupInfo.removeServer(el); + dstGroupInfo.addServer(el); + } + //move tables + for(TableName tableName: tables) { + srcGroupInfo.removeTable(tableName); + dstGroupInfo.addTable(tableName); + } + + //flush changed groupinfo + Map newGroupMap = Maps.newHashMap(rsGroupMap); + newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); + newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); + flushConfig(newGroupMap); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java new file mode 100644 index 0000000..0874210 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupProtobufUtil.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.protobuf.generated.TableProtos; + +@InterfaceAudience.Private +class RSGroupProtobufUtil { + static RSGroupInfo toGroupInfo(RSGroupProtos.RSGroupInfo proto) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(proto.getName()); + for(HBaseProtos.ServerName el: proto.getServersList()) { + RSGroupInfo.addServer(Address.fromParts(el.getHostName(), el.getPort())); + } + for(TableProtos.TableName pTableName: proto.getTablesList()) { + RSGroupInfo.addTable(ProtobufUtil.toTableName(pTableName)); + } + return RSGroupInfo; + } + + static RSGroupProtos.RSGroupInfo toProtoGroupInfo(RSGroupInfo pojo) { + List tables = new ArrayList<>(pojo.getTables().size()); + for(TableName arg: pojo.getTables()) { + tables.add(ProtobufUtil.toProtoTableName(arg)); + } + List servers = new ArrayList<>(pojo.getServers().size()); + for(Address el: pojo.getServers()) { + servers.add(HBaseProtos.ServerName.newBuilder() + .setHostName(el.getHostname()) + .setPort(el.getPort()) + .build()); + } + return RSGroupProtos.RSGroupInfo.newBuilder().setName(pojo.getName()) + .addAllServers(servers) + .addAllTables(tables).build(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java new file mode 100644 index 0000000..642cb4a --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupSerDe.java @@ -0,0 +1,88 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Lists; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +//TODO do better encapsulation of SerDe logic from GroupInfoManager and GroupTracker +public class RSGroupSerDe { + private static final Log LOG = LogFactory.getLog(RSGroupSerDe.class); + + public RSGroupSerDe() { + + } + + public List retrieveGroupList(Table groupTable) throws IOException { + List RSGroupInfoList = Lists.newArrayList(); + for (Result result : groupTable.getScanner(new Scan())) { + RSGroupProtos.RSGroupInfo proto = + RSGroupProtos.RSGroupInfo.parseFrom( + result.getValue( + RSGroupInfoManager.META_FAMILY_BYTES, + RSGroupInfoManager.META_QUALIFIER_BYTES)); + RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); + } + return RSGroupInfoList; + } + + public List retrieveGroupList(ZooKeeperWatcher watcher, + String groupBasePath) throws IOException { + List RSGroupInfoList = Lists.newArrayList(); + //Overwrite any info stored by table, this takes precedence + try { + if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { + for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) { + byte[] data = ZKUtil.getData(watcher, ZKUtil.joinZNode(groupBasePath, znode)); + if(data.length > 0) { + ProtobufUtil.expectPBMagicPrefix(data); + ByteArrayInputStream bis = new ByteArrayInputStream( + data, ProtobufUtil.lengthOfPBMagic(), data.length); + RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); + } + } + LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); + } + } catch (KeeperException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } catch (DeserializationException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } catch (InterruptedException e) { + throw new IOException("Failed to read rsGroupZNode",e); + } + return RSGroupInfoList; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java new file mode 100644 index 0000000..6c791a1 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupableBalancer.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.LoadBalancer; + +/** + * Marker Interface. RSGroups feature will check for a LoadBalancer + * marked with this Interface before it runs. + */ +@InterfaceAudience.Private +public interface RSGroupableBalancer extends LoadBalancer { + /** Config for pluggable load balancers */ + String HBASE_RSGROUP_LOADBALANCER_CLASS = "hbase.rsgroup.grouploadbalancer.class"; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java new file mode 100644 index 0000000..3b96de6 --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancer.java @@ -0,0 +1,573 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; +import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +//TODO use stochastic based load balancer instead +@Category(SmallTests.class) +public class TestRSGroupBasedLoadBalancer { + + private static final Log LOG = LogFactory.getLog(TestRSGroupBasedLoadBalancer.class); + private static RSGroupBasedLoadBalancer loadBalancer; + private static SecureRandom rand; + + static String[] groups = new String[] { RSGroupInfo.DEFAULT_GROUP, "dg2", "dg3", + "dg4" }; + static TableName[] tables = + new TableName[] { TableName.valueOf("dt1"), + TableName.valueOf("dt2"), + TableName.valueOf("dt3"), + TableName.valueOf("dt4")}; + static List servers; + static Map groupMap; + static Map tableMap; + static List tableDescs; + int[] regionAssignment = new int[] { 2, 5, 7, 10, 4, 3, 1 }; + static int regionId = 0; + + @BeforeClass + public static void beforeAllTests() throws Exception { + rand = new SecureRandom(); + servers = generateServers(7); + groupMap = constructGroupInfo(servers, groups); + tableMap = new HashMap(); + tableDescs = constructTableDesc(); + Configuration conf = HBaseConfiguration.create(); + conf.set("hbase.regions.slop", "0"); + conf.set("hbase.group.grouploadbalancer.class", SimpleLoadBalancer.class.getCanonicalName()); + loadBalancer = new RSGroupBasedLoadBalancer(getMockedGroupInfoManager()); + loadBalancer.setMasterServices(getMockedMaster()); + loadBalancer.setConf(conf); + loadBalancer.initialize(); + } + + /** + * Test the load balancing algorithm. + * + * Invariant is that all servers of the group should be hosting either floor(average) or + * ceiling(average) + * + * @throws Exception + */ + @Test + public void testBalanceCluster() throws Exception { + Map> servers = mockClusterServers(); + ArrayListMultimap list = convertToGroupBasedMap(servers); + LOG.info("Mock Cluster : " + printStats(list)); + List plans = loadBalancer.balanceCluster(servers); + ArrayListMultimap balancedCluster = reconcile( + list, plans); + LOG.info("Mock Balance : " + printStats(balancedCluster)); + assertClusterAsBalanced(balancedCluster); + } + + /** + * Invariant is that all servers of a group have load between floor(avg) and + * ceiling(avg) number of regions. + */ + private void assertClusterAsBalanced( + ArrayListMultimap groupLoadMap) { + for (String gName : groupLoadMap.keySet()) { + List groupLoad = groupLoadMap.get(gName); + int numServers = groupLoad.size(); + int numRegions = 0; + int maxRegions = 0; + int minRegions = Integer.MAX_VALUE; + for (ServerAndLoad server : groupLoad) { + int nr = server.getLoad(); + if (nr > maxRegions) { + maxRegions = nr; + } + if (nr < minRegions) { + minRegions = nr; + } + numRegions += nr; + } + if (maxRegions - minRegions < 2) { + // less than 2 between max and min, can't balance + return; + } + int min = numRegions / numServers; + int max = numRegions % numServers == 0 ? min : min + 1; + + for (ServerAndLoad server : groupLoad) { + assertTrue(server.getLoad() <= max); + assertTrue(server.getLoad() >= min); + } + } + } + + /** + * All regions have an assignment. + * + * @param regions + * @param servers + * @param assignments + * @throws java.io.IOException + * @throws java.io.FileNotFoundException + */ + private void assertImmediateAssignment(List regions, + List servers, + Map assignments) + throws IOException { + for (HRegionInfo region : regions) { + assertTrue(assignments.containsKey(region)); + ServerName server = assignments.get(region); + TableName tableName = region.getTable(); + + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup(groupName); + assertTrue("Region is not correctly assigned to group servers.", + gInfo.containsServer(server.getAddress())); + } + } + + /** + * Tests the bulk assignment used during cluster startup. + * + * Round-robin. Should yield a balanced cluster so same invariant as the + * load balancer holds, all servers holding either floor(avg) or + * ceiling(avg). + * + * @throws Exception + */ + @Test + public void testBulkAssignment() throws Exception { + List regions = randomRegions(25); + Map> assignments = loadBalancer + .roundRobinAssignment(regions, servers); + //test empty region/servers scenario + //this should not throw an NPE + loadBalancer.roundRobinAssignment(regions, Collections.emptyList()); + //test regular scenario + assertTrue(assignments.keySet().size() == servers.size()); + for (ServerName sn : assignments.keySet()) { + List regionAssigned = assignments.get(sn); + for (HRegionInfo region : regionAssigned) { + TableName tableName = region.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(sn.getAddress())); + } + } + ArrayListMultimap loadMap = convertToGroupBasedMap(assignments); + assertClusterAsBalanced(loadMap); + } + + /** + * Test the cluster startup bulk assignment which attempts to retain + * assignment info. + * + * @throws Exception + */ + @Test + public void testRetainAssignment() throws Exception { + // Test simple case where all same servers are there + Map> currentAssignments = mockClusterServers(); + Map inputForTest = new HashMap(); + for (ServerName sn : currentAssignments.keySet()) { + for (HRegionInfo region : currentAssignments.get(sn)) { + inputForTest.put(region, sn); + } + } + //verify region->null server assignment is handled + inputForTest.put(randomRegions(1).get(0), null); + Map> newAssignment = loadBalancer + .retainAssignment(inputForTest, servers); + assertRetainedAssignment(inputForTest, servers, newAssignment); + } + + /** + * Asserts a valid retained assignment plan. + *

+ * Must meet the following conditions: + *

    + *
  • Every input region has an assignment, and to an online server + *
  • If a region had an existing assignment to a server with the same + * address a a currently online server, it will be assigned to it + *
+ * + * @param existing + * @param assignment + * @throws java.io.IOException + * @throws java.io.FileNotFoundException + */ + private void assertRetainedAssignment( + Map existing, List servers, + Map> assignment) + throws FileNotFoundException, IOException { + // Verify condition 1, every region assigned, and to online server + Set onlineServerSet = new TreeSet(servers); + Set assignedRegions = new TreeSet(); + for (Map.Entry> a : assignment.entrySet()) { + assertTrue( + "Region assigned to server that was not listed as online", + onlineServerSet.contains(a.getKey())); + for (HRegionInfo r : a.getValue()) + assignedRegions.add(r); + } + assertEquals(existing.size(), assignedRegions.size()); + + // Verify condition 2, every region must be assigned to correct server. + Set onlineHostNames = new TreeSet(); + for (ServerName s : servers) { + onlineHostNames.add(s.getHostname()); + } + + for (Map.Entry> a : assignment.entrySet()) { + ServerName currentServer = a.getKey(); + for (HRegionInfo r : a.getValue()) { + ServerName oldAssignedServer = existing.get(r); + TableName tableName = r.getTable(); + String groupName = + getMockedGroupInfoManager().getRSGroupOfTable(tableName); + assertTrue(StringUtils.isNotEmpty(groupName)); + RSGroupInfo gInfo = getMockedGroupInfoManager().getRSGroup( + groupName); + assertTrue( + "Region is not correctly assigned to group servers.", + gInfo.containsServer(currentServer.getAddress())); + if (oldAssignedServer != null + && onlineHostNames.contains(oldAssignedServer + .getHostname())) { + // this region was previously assigned somewhere, and that + // host is still around, then the host must have been is a + // different group. + if (!oldAssignedServer.getAddress().equals(currentServer.getAddress())) { + assertFalse(gInfo.containsServer(oldAssignedServer.getAddress())); + } + } + } + } + } + + private String printStats( + ArrayListMultimap groupBasedLoad) { + StringBuffer sb = new StringBuffer(); + sb.append("\n"); + for (String groupName : groupBasedLoad.keySet()) { + sb.append("Stats for group: " + groupName); + sb.append("\n"); + sb.append(groupMap.get(groupName).getServers()); + sb.append("\n"); + List groupLoad = groupBasedLoad.get(groupName); + int numServers = groupLoad.size(); + int totalRegions = 0; + sb.append("Per Server Load: \n"); + for (ServerAndLoad sLoad : groupLoad) { + sb.append("Server :" + sLoad.getServerName() + " Load : " + + sLoad.getLoad() + "\n"); + totalRegions += sLoad.getLoad(); + } + sb.append(" Group Statistics : \n"); + float average = (float) totalRegions / numServers; + int max = (int) Math.ceil(average); + int min = (int) Math.floor(average); + sb.append("[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + + average + " max=" + max + " min=" + min + "]"); + sb.append("\n"); + sb.append("==============================="); + sb.append("\n"); + } + return sb.toString(); + } + + private ArrayListMultimap convertToGroupBasedMap( + final Map> serversMap) throws IOException { + ArrayListMultimap loadMap = ArrayListMultimap + .create(); + for (RSGroupInfo gInfo : getMockedGroupInfoManager().listRSGroups()) { + Set
groupServers = gInfo.getServers(); + for (Address server : groupServers) { + ServerName actual = null; + for(ServerName entry: servers) { + if(entry.getAddress().equals(server)) { + actual = entry; + break; + } + } + List regions = serversMap.get(actual); + assertTrue("No load for " + actual, regions != null); + loadMap.put(gInfo.getName(), + new ServerAndLoad(actual, regions.size())); + } + } + return loadMap; + } + + private ArrayListMultimap reconcile( + ArrayListMultimap previousLoad, + List plans) { + ArrayListMultimap result = ArrayListMultimap + .create(); + result.putAll(previousLoad); + if (plans != null) { + for (RegionPlan plan : plans) { + ServerName source = plan.getSource(); + updateLoad(result, source, -1); + ServerName destination = plan.getDestination(); + updateLoad(result, destination, +1); + } + } + return result; + } + + private void updateLoad( + ArrayListMultimap previousLoad, + final ServerName sn, final int diff) { + for (String groupName : previousLoad.keySet()) { + ServerAndLoad newSAL = null; + ServerAndLoad oldSAL = null; + for (ServerAndLoad sal : previousLoad.get(groupName)) { + if (ServerName.isSameAddress(sn, sal.getServerName())) { + oldSAL = sal; + newSAL = new ServerAndLoad(sn, sal.getLoad() + diff); + break; + } + } + if (newSAL != null) { + previousLoad.remove(groupName, oldSAL); + previousLoad.put(groupName, newSAL); + break; + } + } + } + + private Map> mockClusterServers() throws IOException { + assertTrue(servers.size() == regionAssignment.length); + Map> assignment = new TreeMap>(); + for (int i = 0; i < servers.size(); i++) { + int numRegions = regionAssignment[i]; + List regions = assignedRegions(numRegions, servers.get(i)); + assignment.put(servers.get(i), regions); + } + return assignment; + } + + /** + * Generate a list of regions evenly distributed between the tables. + * + * @param numRegions The number of regions to be generated. + * @return List of HRegionInfo. + */ + private List randomRegions(int numRegions) { + List regions = new ArrayList(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + rand.nextBytes(start); + rand.nextBytes(end); + int regionIdx = rand.nextInt(tables.length); + for (int i = 0; i < numRegions; i++) { + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + int tableIndex = (i + regionIdx) % tables.length; + HRegionInfo hri = new HRegionInfo( + tables[tableIndex], start, end, false, regionId++); + regions.add(hri); + } + return regions; + } + + /** + * Generate assigned regions to a given server using group information. + * + * @param numRegions the num regions to generate + * @param sn the servername + * @return the list of regions + * @throws java.io.IOException Signals that an I/O exception has occurred. + */ + private List assignedRegions(int numRegions, ServerName sn) throws IOException { + List regions = new ArrayList(numRegions); + byte[] start = new byte[16]; + byte[] end = new byte[16]; + Bytes.putInt(start, 0, numRegions << 1); + Bytes.putInt(end, 0, (numRegions << 1) + 1); + for (int i = 0; i < numRegions; i++) { + TableName tableName = getTableName(sn); + HRegionInfo hri = new HRegionInfo( + tableName, start, end, false, + regionId++); + regions.add(hri); + } + return regions; + } + + private static List generateServers(int numServers) { + List servers = new ArrayList(numServers); + for (int i = 0; i < numServers; i++) { + String host = "server" + rand.nextInt(100000); + int port = rand.nextInt(60000); + servers.add(ServerName.valueOf(host, port, -1)); + } + return servers; + } + + /** + * Construct group info, with each group having at least one server. + * + * @param servers the servers + * @param groups the groups + * @return the map + */ + private static Map constructGroupInfo( + List servers, String[] groups) { + assertTrue(servers != null); + assertTrue(servers.size() >= groups.length); + int index = 0; + Map groupMap = new HashMap(); + for (String grpName : groups) { + RSGroupInfo RSGroupInfo = new RSGroupInfo(grpName); + RSGroupInfo.addServer(servers.get(index).getAddress()); + groupMap.put(grpName, RSGroupInfo); + index++; + } + while (index < servers.size()) { + int grpIndex = rand.nextInt(groups.length); + groupMap.get(groups[grpIndex]).addServer(servers.get(index).getAddress()); + index++; + } + return groupMap; + } + + /** + * Construct table descriptors evenly distributed between the groups. + * + * @return the list + */ + private static List constructTableDesc() { + List tds = Lists.newArrayList(); + int index = rand.nextInt(groups.length); + for (int i = 0; i < tables.length; i++) { + HTableDescriptor htd = new HTableDescriptor(tables[i]); + int grpIndex = (i + index) % groups.length ; + String groupName = groups[grpIndex]; + tableMap.put(tables[i], groupName); + tds.add(htd); + } + return tds; + } + + private static MasterServices getMockedMaster() throws IOException { + TableDescriptors tds = Mockito.mock(TableDescriptors.class); + Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(0)); + Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(1)); + Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(2)); + Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(3)); + MasterServices services = Mockito.mock(HMaster.class); + Mockito.when(services.getTableDescriptors()).thenReturn(tds); + AssignmentManager am = Mockito.mock(AssignmentManager.class); + Mockito.when(services.getAssignmentManager()).thenReturn(am); + return services; + } + + private static RSGroupInfoManager getMockedGroupInfoManager() throws IOException { + RSGroupInfoManager gm = Mockito.mock(RSGroupInfoManager.class); + Mockito.when(gm.getRSGroup(groups[0])).thenReturn( + groupMap.get(groups[0])); + Mockito.when(gm.getRSGroup(groups[1])).thenReturn( + groupMap.get(groups[1])); + Mockito.when(gm.getRSGroup(groups[2])).thenReturn( + groupMap.get(groups[2])); + Mockito.when(gm.getRSGroup(groups[3])).thenReturn( + groupMap.get(groups[3])); + Mockito.when(gm.listRSGroups()).thenReturn( + Lists.newLinkedList(groupMap.values())); + Mockito.when(gm.isOnline()).thenReturn(true); + Mockito.when(gm.getRSGroupOfTable(Mockito.any(TableName.class))) + .thenAnswer(new Answer() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return tableMap.get(invocation.getArguments()[0]); + } + }); + return gm; + } + + private TableName getTableName(ServerName sn) throws IOException { + TableName tableName = null; + RSGroupInfoManager gm = getMockedGroupInfoManager(); + RSGroupInfo groupOfServer = null; + for(RSGroupInfo gInfo : gm.listRSGroups()){ + if(gInfo.containsServer(sn.getAddress())){ + groupOfServer = gInfo; + break; + } + } + + for(HTableDescriptor desc : tableDescs){ + if(gm.getRSGroupOfTable(desc.getTableName()).endsWith(groupOfServer.getName())){ + tableName = desc.getTableName(); + } + } + return tableName; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/64328cae/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java ---------------------------------------------------------------------- diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java new file mode 100644 index 0000000..3ad928f --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroups.java @@ -0,0 +1,300 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.collect.Sets; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MediumTests.class}) +public class TestRSGroups extends TestRSGroupsBase { + protected static final Log LOG = LogFactory.getLog(TestRSGroups.class); + private static HMaster master; + private static boolean init = false; + private static RSGroupAdminEndpoint RSGroupAdminEndpoint; + + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setFloat( + "hbase.master.balancer.stochastic.tableSkewCost", 6000); + TEST_UTIL.getConfiguration().set( + HConstants.HBASE_MASTER_LOADBALANCER_CLASS, + RSGroupBasedLoadBalancer.class.getName()); + TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + RSGroupAdminEndpoint.class.getName()); + TEST_UTIL.getConfiguration().setBoolean( + HConstants.ZOOKEEPER_USEMULTI, + true); + TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE); + TEST_UTIL.getConfiguration().set( + ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, + ""+NUM_SLAVES_BASE); + TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + + admin = TEST_UTIL.getHBaseAdmin(); + cluster = TEST_UTIL.getHBaseCluster(); + master = ((MiniHBaseCluster)cluster).getMaster(); + + //wait for balancer to come online + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return master.isInitialized() && + ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline(); + } + }); + admin.setBalancerRunning(false,true); + rsGroupAdmin = new VerifyingRSGroupAdminClient(new RSGroupAdminClient(TEST_UTIL.getConnection()), + TEST_UTIL.getConfiguration()); + RSGroupAdminEndpoint = + master.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class).get(0); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void beforeMethod() throws Exception { + if(!init) { + init = true; + afterMethod(); + } + + } + + @After + public void afterMethod() throws Exception { + deleteTableIfNecessary(); + deleteNamespaceIfNecessary(); + deleteGroups(); + + int missing = NUM_SLAVES_BASE - getNumServers(); + LOG.info("Restoring servers: "+missing); + for(int i=0; i() { + @Override + public boolean evaluate() throws Exception { + LOG.info("Waiting for cleanup to finish " + rsGroupAdmin.listRSGroups()); + //Might be greater since moving servers back to default + //is after starting a server + + return rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP).getServers().size() + == NUM_SLAVES_BASE; + } + }); + } + + @Test + public void testBasicStartUp() throws IOException { + RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP); + assertEquals(4, defaultInfo.getServers().size()); + // Assignment of root and meta regions. + int count = master.getAssignmentManager().getRegionStates().getRegionAssignments().size(); + //3 meta,namespace, group + assertEquals(3, count); + } + + @Test + public void testNamespaceCreateAndAssign() throws Exception { + LOG.info("testNamespaceCreateAndAssign"); + String nsName = tablePrefix+"_foo"; + final TableName tableName = TableName.valueOf(nsName, tablePrefix + "_testCreateAndAssign"); + RSGroupInfo appInfo = addGroup(rsGroupAdmin, "appInfo", 1); + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "appInfo").build()); + final HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc); + //wait for created table to be assigned + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return getTableRegionMap().get(desc.getTableName()) != null; + } + }); + ServerName targetServer = + ServerName.parseServerName(appInfo.getServers().iterator().next().toString()); + AdminProtos.AdminService.BlockingInterface rs = admin.getConnection().getAdmin(targetServer); + //verify it was assigned to the right group + Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size()); + } + + @Test + public void testDefaultNamespaceCreateAndAssign() throws Exception { + LOG.info("testDefaultNamespaceCreateAndAssign"); + final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign"); + admin.modifyNamespace(NamespaceDescriptor.create("default") + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "default").build()); + final HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor("f")); + admin.createTable(desc); + //wait for created table to be assigned + TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return getTableRegionMap().get(desc.getTableName()) != null; + } + }); + } + + @Test + public void testNamespaceConstraint() throws Exception { + String nsName = tablePrefix+"_foo"; + String groupName = tablePrefix+"_foo"; + LOG.info("testNamespaceConstraint"); + rsGroupAdmin.addRSGroup(groupName); + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) + .build()); + //test removing a referenced group + try { + rsGroupAdmin.removeRSGroup(groupName); + fail("Expected a constraint exception"); + } catch (IOException ex) { + } + //test modify group + //changing with the same name is fine + admin.modifyNamespace( + NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, groupName) + .build()); + String anotherGroup = tablePrefix+"_anotherGroup"; + rsGroupAdmin.addRSGroup(anotherGroup); + //test add non-existent group + admin.deleteNamespace(nsName); + rsGroupAdmin.removeRSGroup(groupName); + try { + admin.createNamespace(NamespaceDescriptor.create(nsName) + .addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "foo") + .build()); + fail("Expected a constraint exception"); + } catch (IOException ex) { + } + } + + @Test + public void testGroupInfoMultiAccessing() throws Exception { + RSGroupInfoManager manager = RSGroupAdminEndpoint.getGroupInfoManager(); + final RSGroupInfo defaultGroup = manager.getRSGroup("default"); + // getRSGroup updates default group's server list + // this process must not affect other threads iterating the list + Iterator
it = defaultGroup.getServers().iterator(); + manager.getRSGroup("default"); + it.next(); + } + + @Test + public void testMisplacedRegions() throws Exception { + final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions"); + LOG.info("testMisplacedRegions"); + + final RSGroupInfo RSGroupInfo = addGroup(rsGroupAdmin, "testMisplacedRegions", 1); + + TEST_UTIL.createMultiRegionTable(tableName, new byte[]{'f'}, 15); + TEST_UTIL.waitUntilAllRegionsAssigned(tableName); + + RSGroupAdminEndpoint.getGroupInfoManager() + .moveTables(Sets.newHashSet(tableName), RSGroupInfo.getName()); + + assertTrue(rsGroupAdmin.balanceRSGroup(RSGroupInfo.getName())); + + TEST_UTIL.waitFor(60000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName serverName = + ServerName.valueOf(RSGroupInfo.getServers().iterator().next().toString(), 1); + return admin.getConnection().getAdmin() + .getOnlineRegions(serverName).size() == 15; + } + }); + } + + @Test + public void testCloneSnapshot() throws Exception { + final TableName tableName = TableName.valueOf(tablePrefix+"_testCloneSnapshot"); + LOG.info("testCloneSnapshot"); + + byte[] FAMILY = Bytes.toBytes("test"); + String snapshotName = tableName.getNameAsString() + "_snap"; + TableName clonedTableName = TableName.valueOf(tableName.getNameAsString() + "_clone"); + + // create base table + TEST_UTIL.createTable(tableName, FAMILY); + + // create snapshot + admin.snapshot(snapshotName, tableName); + + // clone + admin.cloneSnapshot(snapshotName, clonedTableName); + } + +}