Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 27378 invoked from network); 5 Aug 2010 07:36:28 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Aug 2010 07:36:28 -0000 Received: (qmail 91628 invoked by uid 500); 5 Aug 2010 07:36:28 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 91530 invoked by uid 500); 5 Aug 2010 07:36:27 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 91420 invoked by uid 99); 5 Aug 2010 07:36:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 07:36:24 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=10.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Aug 2010 07:36:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D04A523889D5; Thu, 5 Aug 2010 07:35:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r982489 [2/7] - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/executor/ s... Date: Thu, 05 Aug 2010 07:35:02 -0000 To: commits@hbase.apache.org From: jgray@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100805073503.D04A523889D5@eris.apache.org> Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java Thu Aug 5 07:35:00 2010 @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -43,88 +43,77 @@ import org.apache.commons.logging.LogFac * HBEventHandler class and create an event type that submits to this service. * */ -public class HBaseExecutorService -{ +public class HBaseExecutorService { private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class); // default number of threads in the pool private int corePoolSize = 1; - // max number of threads - maximum concurrency - private int maximumPoolSize = 5; // how long to retain excess threads private long keepAliveTimeInMillis = 1000; // the thread pool executor that services the requests ThreadPoolExecutor threadPoolExecutor; // work queue to use - unbounded queue - BlockingQueue workQueue = new LinkedBlockingQueue(); + BlockingQueue workQueue = new PriorityBlockingQueue(); // name for this executor service String name; // hold the all the executors created in a map addressable by their names static Map executorServicesMap = Collections.synchronizedMap(new HashMap()); - /** - * The following is a list of names for the various executor services in both + * The following is a list of names for the various executor services in both * the master and the region server. */ public enum HBaseExecutorServiceType { - NONE (-1), - MASTER_CLOSEREGION (1), - MASTER_OPENREGION (2); - - private final int value; - - HBaseExecutorServiceType(int intValue) { - this.value = intValue; - } - - public void startExecutorService(String serverName) { - // if this is NONE then there is no executor to start - if(value == NONE.value) { - throw new RuntimeException("Cannot start NONE executor type."); - } + + // Master executor services + MASTER_CLOSE_REGION (1), + MASTER_OPEN_REGION (2), + MASTER_SERVER_OPERATIONS (3), + MASTER_TABLE_OPERATIONS (4), + + // RegionServer executor services + RS_OPEN_REGION (20), + RS_OPEN_ROOT (21), + RS_OPEN_META (22), + RS_CLOSE_REGION (23), + RS_CLOSE_ROOT (24), + RS_CLOSE_META (25); + + HBaseExecutorServiceType(int value) {} + + public void startExecutorService(String serverName, int maxThreads) { String name = getExecutorName(serverName); if(HBaseExecutorService.isExecutorServiceRunning(name)) { LOG.debug("Executor service " + toString() + " already running on " + serverName); return; } LOG.debug("Starting executor service [" + name + "]"); - HBaseExecutorService.startExecutorService(name); + HBaseExecutorService.startExecutorService(name, maxThreads); } - + public HBaseExecutorService getExecutor(String serverName) { - // if this is NONE then there is no executor - if(value == NONE.value) { - return null; - } return HBaseExecutorService.getExecutorService(getExecutorName(serverName)); } - + public String getExecutorName(String serverName) { - // if this is NONE then there is no executor - if(value == NONE.value) { - return null; - } return (this.toString() + "-" + serverName); } } - - /** * Start an executor service with a given name. If there was a service already * started with the same name, this throws a RuntimeException. * @param name Name of the service to start. */ - public static void startExecutorService(String name) { + public static void startExecutorService(String name, int maxThreads) { if(executorServicesMap.get(name) != null) { throw new RuntimeException("An executor service with the name " + name + " is already running!"); } - HBaseExecutorService hbes = new HBaseExecutorService(name); + HBaseExecutorService hbes = new HBaseExecutorService(name, maxThreads); executorServicesMap.put(name, hbes); LOG.debug("Starting executor service: " + name); } - + public static boolean isExecutorServiceRunning(String name) { return (executorServicesMap.containsKey(name)); } @@ -140,7 +129,7 @@ public class HBaseExecutorService } return executor; } - + public static void shutdown() { for(Entry entry : executorServicesMap.entrySet()) { entry.getValue().threadPoolExecutor.shutdown(); @@ -148,16 +137,11 @@ public class HBaseExecutorService executorServicesMap.clear(); } - protected HBaseExecutorService(String name) { + protected HBaseExecutorService(String name, int maxThreads) { this.name = name; // create the thread pool executor - threadPoolExecutor = new ThreadPoolExecutor( - corePoolSize, - maximumPoolSize, - keepAliveTimeInMillis, - TimeUnit.MILLISECONDS, - workQueue - ); + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads, + keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue); // name the threads for this threadpool threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name)); } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java Thu Aug 5 07:35:00 2010 @@ -23,8 +23,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; @@ -36,19 +36,16 @@ public class RegionTransitionData implem * Type of transition event (offline, opening, opened, closing, closed). * Required. */ - private HBaseEventType eventType; + private EventType eventType; /** Region being transitioned. Required. */ - private String regionName; + private byte [] regionName; /** Server event originated from. Optional. */ private String serverName; /** Time the event was created. Required but automatically set. */ - private long timeStamp; - - /** Temporary. Holds payload used doing transitions via heartbeats. */ - private HMsg hmsg; // to be removed shortly once we stop using heartbeats + private long stamp; /** * Writable constructor. Do not use directly. @@ -68,12 +65,12 @@ public class RegionTransitionData implem * assignment. * *

Since only the master uses this constructor, the type should always be - * {@link HBaseEventType#M2ZK_REGION_OFFLINE}. + * {@link EventType#M2ZK_REGION_OFFLINE}. * * @param eventType type of event * @param regionName name of region */ - public RegionTransitionData(HBaseEventType eventType, String regionName) { + public RegionTransitionData(EventType eventType, byte [] regionName) { this(eventType, regionName, null); } @@ -83,37 +80,20 @@ public class RegionTransitionData implem * *

Used when the server name is known (a regionserver is setting it). * - *

Valid types for this constructor are {@link HBaseEventType#RS2ZK_REGION_CLOSING}, - * {@link HBaseEventType#RS2ZK_REGION_CLOSED}, {@link HBaseEventType#RS2ZK_REGION_OPENING}, - * and {@link HBaseEventType#RS2ZK_REGION_OPENED}. + *

Valid types for this constructor are {@link EventType#RS2ZK_REGION_CLOSING}, + * {@link EventType#RS2ZK_REGION_CLOSED}, {@link EventType#RS2ZK_REGION_OPENING}, + * and {@link EventType#RS2ZK_REGION_OPENED}. * * @param eventType type of event * @param regionName name of region * @param serverName name of server setting data */ - public RegionTransitionData(HBaseEventType eventType, String regionName, + public RegionTransitionData(EventType eventType, byte [] regionName, String serverName) { - this(eventType, regionName, serverName, null); - } - - /** - * Construct data for a fully-specified, old-format region transition event - * which uses HMsg/heartbeats. - * - * TODO: Remove this constructor once we stop using heartbeats. - * - * @param eventType - * @param regionName - * @param serverName - * @param hmsg - */ - public RegionTransitionData(HBaseEventType eventType, String regionName, - String serverName, HMsg hmsg) { this.eventType = eventType; - this.timeStamp = System.currentTimeMillis(); + this.stamp = System.currentTimeMillis(); this.regionName = regionName; this.serverName = serverName; - this.hmsg = hmsg; } /** @@ -121,25 +101,25 @@ public class RegionTransitionData implem * *

One of: *

    - *
  • {@link HBaseEventType#M2ZK_REGION_OFFLINE} - *
  • {@link HBaseEventType#RS2ZK_REGION_CLOSING} - *
  • {@link HBaseEventType#RS2ZK_REGION_CLOSED} - *
  • {@link HBaseEventType#RS2ZK_REGION_OPENING} - *
  • {@link HBaseEventType#RS2ZK_REGION_OPENED} + *
  • {@link EventType#M2ZK_REGION_OFFLINE} + *
  • {@link EventType#RS2ZK_REGION_CLOSING} + *
  • {@link EventType#RS2ZK_REGION_CLOSED} + *
  • {@link EventType#RS2ZK_REGION_OPENING} + *
  • {@link EventType#RS2ZK_REGION_OPENED} *
* @return type of region transition event */ - public HBaseEventType getEventType() { + public EventType getEventType() { return eventType; } /** - * Gets the encoded name of the region being transitioned. + * Gets the name of the region being transitioned. * *

Region name is required so this never returns null. * @return region name */ - public String getRegionName() { + public byte [] getRegionName() { return regionName; } @@ -156,54 +136,39 @@ public class RegionTransitionData implem /** * Gets the timestamp when this event was created. * - * @return time event was created + * @return stamp event was created */ - public long getTimeStamp() { - return timeStamp; - } - - /** - * Gets the {@link HMsg} payload of this region transition event. - * @return heartbeat payload - */ - public HMsg getHmsg() { - return hmsg; + public long getStamp() { + return stamp; } @Override public void readFields(DataInput in) throws IOException { // the event type byte - eventType = HBaseEventType.fromByte(in.readByte()); + eventType = EventType.values()[in.readShort()]; // the timestamp - timeStamp = in.readLong(); + stamp = in.readLong(); // the encoded name of the region being transitioned - regionName = in.readUTF(); + regionName = Bytes.readByteArray(in); // remaining fields are optional so prefixed with boolean // the name of the regionserver sending the data if(in.readBoolean()) { serverName = in.readUTF(); - } - // hmsg - if(in.readBoolean()) { - hmsg = new HMsg(); - hmsg.readFields(in); + } else { + serverName = null; } } @Override public void write(DataOutput out) throws IOException { - out.writeByte(eventType.getByteValue()); + out.writeShort(eventType.ordinal()); out.writeLong(System.currentTimeMillis()); - out.writeUTF(regionName); + Bytes.writeByteArray(out, regionName); // remaining fields are optional so prefixed with boolean out.writeBoolean(serverName != null); if(serverName != null) { out.writeUTF(serverName); } - out.writeBoolean(hmsg != null); - if(hmsg != null) { - hmsg.write(out); - } } /** Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Thu Aug 5 07:35:00 2010 @@ -19,13 +19,11 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; + import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.io.Writable; - -import java.io.IOException; /** * Clients interact with the HMasterInterface to gain access to meta-level @@ -110,11 +108,10 @@ public interface HMasterInterface extend * Modify a table's metadata * * @param tableName table to modify - * @param op the operation to do - * @param args arguments for operation + * @param htd new descriptor for table * @throws IOException e */ - public void modifyTable(byte[] tableName, HConstants.Modify op, Writable[] args) + public void modifyTable(byte[] tableName, HTableDescriptor htd) throws IOException; /** Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Aug 5 07:35:00 2010 @@ -19,6 +19,9 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; +import java.util.List; + import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.NotServingRegionException; @@ -31,9 +34,6 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; -import java.io.IOException; -import java.util.List; - /** * Clients interact with HRegionServers using a handle to the HRegionInterface. * @@ -279,10 +279,64 @@ public interface HRegionInterface extend * @throws IOException e */ public MultiPutResponse multiPut(MultiPut puts) throws IOException; - + /** * Bulk load an HFile into an open region */ public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) throws IOException; + + // Master methods + + /** + * Opens the specified region. + * @param region region to open + */ + public void openRegion(final HRegionInfo region); + + /** + * Closes the specified region. + * @param region region to close + * @return true if closing region, false if not + */ + public boolean closeRegion(final HRegionInfo region) + throws NotServingRegionException; + + // Region administrative methods + + /** + * Flushes the MemStore of the specified region. + *

+ * This method is synchronous. + * @param regionInfo region to flush + * @throws NotServingRegionException + * @throws IOException + */ + void flushRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException; + + /** + * Splits the specified region. + *

+ * This method currently flushes the region and then forces a compaction which + * will then trigger a split. The flush is done synchronously but the + * compaction is asynchronous. + * @param regionInfo region to split + * @throws NotServingRegionException + * @throws IOException + */ + void splitRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException; + + /** + * Compacts the specified region. Performs a major compaction if specified. + *

+ * This method is asynchronous. + * @param regionInfo region to compact + * @param major true to force major compaction + * @throws NotServingRegionException + * @throws IOException + */ + void compactRegion(HRegionInfo regionInfo, boolean major) + throws NotServingRegionException, IOException; } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Thu Aug 5 07:35:00 2010 @@ -48,10 +48,10 @@ public class ActiveMasterManager extends final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); private final HServerAddress address; - private final MasterStatus status; + private final MasterController status; ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address, - MasterStatus status) { + MasterController status) { super(watcher); this.address = address; this.status = status; Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Aug 5 07:35:00 2010 @@ -19,41 +19,91 @@ */ package org.apache.hadoop.hbase.master; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.executor.RegionTransitionData; -import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler; -import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler; +import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; +import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; +import org.apache.hadoop.io.Writable; import org.apache.zookeeper.KeeperException; /** - * Manages region assignment. - * - *

Monitors ZooKeeper for events related to regions in transition. - * - *

Handles existing regions in transition during master failover. + * Manages and performs region assignment. + *

+ * Monitors ZooKeeper for events related to regions in transition. + *

+ * Handles existing regions in transition during master failover. */ public class AssignmentManager extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(AssignmentManager.class); - private MasterStatus status; + protected MasterController master; private ServerManager serverManager; - private RegionManager regionManager; + private CatalogTracker catalogTracker; - private String serverName; + private TimeoutMonitor timeoutMonitor; -// TODO: Eventually RIT will move here? -// private final Map regionsInTransition = -// new TreeMap(); + /** Regions currently in transition. */ + private final Map regionsInTransition = + new TreeMap(); + + /** Plans for region movement. */ + private final Map regionPlans = + new TreeMap(); + + /** Set of tables that have been disabled. */ + private final Set disabledTables = + Collections.synchronizedSet(new HashSet()); + + /** + * Server to regions assignment map. + * Contains the set of regions currently assigned to a given server. + */ + private final SortedMap> servers = + new TreeMap>(); + + /** + * Region to server assignment map. + * Contains the server a given region is currently assigned to. + * This object should be used for all synchronization around servers/regions. + */ + private final SortedMap regions = + new TreeMap(); /** * Constructs a new assignment manager. @@ -63,85 +113,211 @@ public class AssignmentManager extends Z * @param watcher zookeeper watcher * @param status master status */ - public AssignmentManager(ZooKeeperWatcher watcher, MasterStatus status, - ServerManager serverManager, RegionManager regionManager) { + public AssignmentManager(ZooKeeperWatcher watcher, MasterController master, + ServerManager serverManager, CatalogTracker catalogTracker) { super(watcher); - this.status = status; + this.master = master; this.serverManager = serverManager; - this.regionManager = regionManager; - serverName = status.getHServerAddress().toString(); + this.catalogTracker = catalogTracker; + Configuration conf = master.getConfiguration(); + this.timeoutMonitor = new TimeoutMonitor( + conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), + master.getClosed(), + conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000)); + Threads.setDaemonThreadRunning(timeoutMonitor, + master.getServerName() + ".timeoutMonitor"); } /** - * Starts the assignment manager. - * - *

This includes registering itself with ZooKeeper and handling - * the initial state of whatever unassigned nodes already exist. + * Cluster startup. Reset all unassigned nodes and assign all user regions. + * @throws IOException * @throws KeeperException */ - public void start() throws KeeperException { - watcher.registerListener(this); - if(status.isClusterStartup()) { - processStartup(); - } else { - processFailover(); - } - } - - public synchronized void processStartup() throws KeeperException { + void processStartup() throws IOException, KeeperException { + // Cleanup any existing ZK nodes and start watching ZKAssign.deleteAllNodes(watcher); - ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode); + ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.assignmentZNode); + // Assign all existing user regions out + assignAllUserRegions(); } /** - * Handle failover. + * Handle failover. Restore state from META and ZK. Handle any regions in + * transition. * @throws KeeperException + * @throws IOException */ - public synchronized void processFailover() throws KeeperException { + void processFailover() throws KeeperException, IOException { + // Scan META to build list of existing regions, servers, and assignment + rebuildUserRegions(); + // Pickup any disabled tables + rebuildDisabledTables(); + // Check existing regions in transition List nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode); if(nodes.isEmpty()) { - LOG.info("No regions in transition in ZK, nothing to do for failover"); + LOG.info("No regions in transition in ZK to process on failover"); return; } LOG.info("Failed-over master needs to process " + nodes.size() + " regions in transition"); for(String regionName : nodes) { RegionTransitionData data = ZKAssign.getData(watcher, regionName); + HRegionInfo regionInfo = + MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst(); + String encodedName = regionInfo.getEncodedName(); switch(data.getEventType()) { - case M2ZK_REGION_OFFLINE: - // TODO: Generate new assignment and send OPEN RPC - break; case RS2ZK_REGION_CLOSING: - // TODO: Only need to deal with timeouts. + // Just insert region into RIT + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.CLOSING, + data.getStamp())); break; + case RS2ZK_REGION_CLOSED: - // TODO: Generate new assignment and send OPEN RPC + // Region is closed, insert into RIT and handle it + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.CLOSED, + data.getStamp())); + new ClosedRegionHandler(master, this, data, regionInfo).execute(); break; + case RS2ZK_REGION_OPENING: - // TODO: Only need to deal with timeouts. + // Just insert region into RIT + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.OPENING, + data.getStamp())); break; + case RS2ZK_REGION_OPENED: - // TODO: Delete the node from ZK. Region successfully opened but not - // acknowledged. + // Region is opened, insert into RIT and handle it + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.OPENING, + data.getStamp())); + new OpenedRegionHandler(master, this, data, regionInfo, + serverManager.getServerInfo(data.getServerName())).execute(); break; } } } - private synchronized void handleRegion(RegionTransitionData data) { - switch(data.getEventType()) { - case RS2ZK_REGION_CLOSED: - new MasterCloseRegionHandler(data.getEventType(), serverManager, - serverName, data.getRegionName(), data.getBytes()) - .submit(); - break; - case RS2ZK_REGION_OPENED: - case RS2ZK_REGION_OPENING: - new MasterOpenRegionHandler(data.getEventType(), serverManager, - serverName, data.getRegionName(), data.getBytes()) - .submit(); - break; + /** + * Gets the region info for the region with the specified encoded name. + *

+ * Currently this does a full scan of the regions map looking for a region + * with the specified encoded name. + *

+ * Returns null if none found. + * @param regionName + * @return + * @deprecated should be able to remove this now? + */ + @Deprecated + private HRegionInfo getRegionInfoFromEncoded(String encodedName) { + for(HRegionInfo regionInfo : regions.keySet()) { + if(regionInfo.getEncodedName().equals(encodedName)) { + return regionInfo; + } + } + return null; + } + + /** + * Handles various states an unassigned node can be in. + *

+ * Method is called when a state change is suspected for an unassigned node. + *

+ * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING + * yet). + * @param data + */ + private void handleRegion(RegionTransitionData data) { + synchronized(regionsInTransition) { + // Verify this is a known server + if(!serverManager.isServerOnline(data.getServerName())) { + LOG.warn("Attempted to handle region transition for server " + + data.getServerName() + " but server is not online"); + return; + } + String encodedName = HRegionInfo.encodeRegionName(data.getRegionName()); + LOG.debug("Attempting to handle region transition for server " + + data.getServerName() + " and region " + encodedName); + RegionState regionState = regionsInTransition.get(encodedName); + switch(data.getEventType()) { + + case RS2ZK_REGION_CLOSING: + // Should see CLOSING after we have asked it to CLOSE or additional + // times after already being in state of CLOSING + if(regionState == null || + (!regionState.isPendingClose() && !regionState.isClosing())) { + LOG.warn("Received CLOSING for region " + encodedName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_CLOSE or CLOSING states"); + return; + } + // Transition to CLOSING (or update stamp if already CLOSING) + regionState.update(RegionState.State.CLOSING, data.getStamp()); + break; + + case RS2ZK_REGION_CLOSED: + // Should see CLOSED after CLOSING but possible after PENDING_CLOSE + if(regionState == null || + (!regionState.isPendingClose() && !regionState.isClosing())) { + LOG.warn("Received CLOSED for region " + encodedName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_CLOSE or CLOSING states"); + return; + } + // Handle CLOSED by assigning elsewhere or stopping if a disable + new ClosedRegionHandler(master, this, data, regionState.getRegion()) + .submit(); + break; + + case RS2ZK_REGION_OPENING: + // Should see OPENING after we have asked it to OPEN or additional + // times after already being in state of OPENING + if(regionState == null || + (!regionState.isPendingOpen() && !regionState.isOpening())) { + LOG.warn("Received OPENING for region " + encodedName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_OPEN or OPENING states"); + return; + } + // Transition to OPENING (or update stamp if already OPENING) + regionState.update(RegionState.State.OPENING, data.getStamp()); + break; + + case RS2ZK_REGION_OPENED: + // Should see OPENED after OPENING but possible after PENDING_OPEN + if(regionState == null || + (!regionState.isPendingOpen() && !regionState.isOpening())) { + LOG.warn("Received OPENED for region " + encodedName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_OPEN or OPENING states"); + return; + } + // If this is a catalog table, update catalog manager accordingly + // Moving root and meta editing over to RS who does the opening + LOG.debug("Processing OPENED for region " + regionState.getRegion() + + " which isMeta[" + regionState.getRegion().isMetaRegion() + "] " + + " isRoot[" + regionState.getRegion().isRootRegion() + "]"); + + // Used to have updating of root/meta locations here but it's + // automatic in CatalogTracker now + + // Handle OPENED by removing from transition and deleted zk node + new OpenedRegionHandler(master, this, data, regionState.getRegion(), + serverManager.getServerInfo(data.getServerName())) + .submit(); + break; + } } } @@ -160,17 +336,19 @@ public class AssignmentManager extends Z * */ @Override - public synchronized void nodeCreated(String path) { + public void nodeCreated(String path) { if(path.startsWith(watcher.assignmentZNode)) { - try { - RegionTransitionData data = ZKAssign.getData(watcher, path); - if(data == null) { - return; + synchronized(regionsInTransition) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned node data", e); + master.abort(); } - handleRegion(data); - } catch (KeeperException e) { - LOG.error("Unexpected ZK exception reading unassigned node data", e); - status.abort(); } } } @@ -188,17 +366,19 @@ public class AssignmentManager extends Z * */ @Override - public synchronized void nodeDataChanged(String path) { + public void nodeDataChanged(String path) { if(path.startsWith(watcher.assignmentZNode)) { - try { - RegionTransitionData data = ZKAssign.getData(watcher, path); - if(data == null) { - return; + synchronized(regionsInTransition) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned node data", e); + master.abort(); } - handleRegion(data); - } catch (KeeperException e) { - LOG.error("Unexpected ZK exception reading unassigned node data", e); - status.abort(); } } } @@ -217,19 +397,589 @@ public class AssignmentManager extends Z * */ @Override - public synchronized void nodeChildrenChanged(String path) { + public void nodeChildrenChanged(String path) { if(path.equals(watcher.assignmentZNode)) { + synchronized(regionsInTransition) { + try { + List newNodes = ZKUtil.watchAndGetNewChildren(watcher, + watcher.assignmentZNode); + for(NodeAndData newNode : newNodes) { + LOG.debug("Handling new unassigned node: " + newNode); + handleRegion(RegionTransitionData.fromBytes(newNode.getData())); + } + } catch(KeeperException e) { + LOG.error("Unexpected ZK exception reading unassigned children", e); + master.abort(); + } + } + } + } + + /** + * Marks the region as online. Removes it from regions in transition and + * updates the in-memory assignment information. + *

+ * Used when a region has been successfully opened on a region server. + * @param regionInfo + * @param serverInfo + */ + public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) { + synchronized(regionsInTransition) { + regionsInTransition.remove(regionInfo.getEncodedName()); + regionsInTransition.notifyAll(); + } + synchronized(regions) { + regions.put(regionInfo, serverInfo); + Set regionSet = servers.get(serverInfo); + if(regionSet == null) { + regionSet = new TreeSet(); + servers.put(serverInfo, regionSet); + } + regionSet.add(regionInfo); + } + } + + /** + * Marks the region as offline. Removes it from regions in transition and + * removes in-memory assignment information. + *

+ * Used when a region has been closed and should remain closed. + * @param regionInfo + * @param serverInfo + */ + public void regionOffline(HRegionInfo regionInfo) { + synchronized(regionsInTransition) { + regionsInTransition.remove(regionInfo.getEncodedName()); + regionsInTransition.notifyAll(); + } + synchronized(regions) { + HServerInfo serverInfo = regions.remove(regionInfo); + Set serverRegions = servers.get(serverInfo); + serverRegions.remove(regionInfo); + } + } + + /** + * Sets the region as offline by removing in-memory assignment information but + * retaining transition information. + *

+ * Used when a region has been closed but should be reassigned. + * @param regionInfo + */ + public void setOffline(HRegionInfo regionInfo) { + synchronized(regions) { + HServerInfo serverInfo = regions.remove(regionInfo); + Set serverRegions = servers.get(serverInfo); + serverRegions.remove(regionInfo); + } + } + + // Assignment methods + + /** + * Assigns the specified region. + *

+ * If a RegionPlan is available with a valid destination then it will be used + * to determine what server region is assigned to. If no RegionPlan is + * available, region will be assigned to a random available server. + *

+ * Updates the RegionState and sends the OPEN RPC. + *

+ * This will only succeed if the region is in transition and in a CLOSED or + * OFFLINE state or not in transition (in-memory not zk). If the in-memory + * checks pass, the zk node is forced to OFFLINE before assigning. + * + * @param regionName server to be assigned + */ + public void assign(HRegionInfo region) { + LOG.debug("Starting assignment for region " + region); + // Grab the state of this region and synchronize on it + String regionName = region.getEncodedName(); + RegionState state; + synchronized(regionsInTransition) { + state = regionsInTransition.get(regionName); + if(state == null) { + state = new RegionState(region, RegionState.State.OFFLINE); + regionsInTransition.put(regionName, state); + } + } + synchronized(state) { + if(!state.isClosed() && !state.isOffline()) { + LOG.info("Attempting to assign region but it is in transition and in " + + "an unexpected state:" + state); + return; + } else { + state.update(RegionState.State.OFFLINE); + } + try { + if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), region, + master.getServerName())) { + LOG.warn("Attempted to create/force node into OFFLINE state before " + + "completing assignment but failed to do so"); + return; + } + } catch (KeeperException e) { + LOG.error("Unexpected ZK exception creating/setting node OFFLINE", e); + master.abort(); + return; + } + // Pickup existing plan or make a new one + RegionPlan plan; + synchronized(regionPlans) { + plan = regionPlans.get(regionName); + if(plan == null) { + LOG.debug("No previous transition plan for " + regionName + + " so generating a random one from " + serverManager.numServers() + + " ( " + serverManager.getOnlineServers().size() + ") available servers"); + plan = new RegionPlan(regionName, null, + LoadBalancer.randomAssignment(serverManager.getOnlineServersList())); + regionPlans.put(regionName, plan); + } + } + // Transition RegionState to PENDING_OPEN and send OPEN RPC + state.update(RegionState.State.PENDING_OPEN); + serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); + } + } + + /** + * Unassigns the specified region. + *

+ * Updates the RegionState and sends the OPEN RPC. + *

+ * If a RegionPlan is already set, it will remain. If this is being used + * to disable a table, be sure to use {@link #disableTable(String)} to ensure + * regions are not onlined after being closed. + * + * @param regionName server to be unassigned + */ + public void unassign(HRegionInfo region) { + LOG.debug("Starting unassignment of region " + region + " (offlining)"); + // Check if this region is currently assigned + if (!regions.containsKey(region)) { + LOG.debug("Attempted to unassign region " + region + " but it is not " + + "currently assigned anywhere"); + return; + } + String regionName = region.getEncodedName(); + // Grab the state of this region and synchronize on it + RegionState state; + synchronized(regionsInTransition) { + state = regionsInTransition.get(regionName); + if(state == null) { + state = new RegionState(region, RegionState.State.PENDING_CLOSE); + regionsInTransition.put(regionName, state); + } else { + LOG.debug("Attempting to unassign region " + region + " but it is " + + "already in transition (" + state.getState() + ")"); + return; + } + } + // Send OPEN RPC + try { + serverManager.sendRegionClose(regions.get(region), state.getRegion()); + } catch (NotServingRegionException e) { + LOG.warn("Attempted to close region " + region + " but got an NSRE", e); + } + } + + /** + * Waits until the specified region has completed assignment. + *

+ * If the region is already assigned, returns immediately. Otherwise, method + * blocks until the region is assigned. + * @param regionInfo region to wait on assignment for + * @throws InterruptedException + */ + public void waitForAssignment(HRegionInfo regionInfo) + throws InterruptedException { + synchronized(regions) { + while(!regions.containsKey(regionInfo)) { + regions.wait(); + } + } + } + + /** + * Assigns the ROOT region. + *

+ * Assumes that ROOT is currently closed and is not being actively served by + * any RegionServer. + *

+ * Forcibly unsets the current root region location in ZooKeeper and assigns + * ROOT to a random RegionServer. + */ + public void assignRoot() { + // Force assignment to a random server + assign(HRegionInfo.ROOT_REGIONINFO); + } + + /** + * Assigns the META region. + *

+ * Assumes that META is currently closed and is not being actively served by + * any RegionServer. + *

+ * Forcibly assigns META to a random RegionServer. + */ + public void assignMeta() { + // Force assignment to a random server + assign(HRegionInfo.FIRST_META_REGIONINFO); + } + + /** + * Assigns all user regions, if any exist. Used during cluster startup. + *

+ * This is a synchronous call and will return once every region has been + * assigned. If anything fails, an exception is thrown and the cluster + * should be shutdown. + */ + public void assignAllUserRegions() throws IOException { + // First experiment at synchronous assignment + // Simpler because just wait for no regions in transition + + // Scan META for all user regions + List allRegions = MetaScanner.listAllRegions( + master.getConfiguration()); + if(allRegions == null || allRegions.isEmpty()) { + return; + } + + // Get all available servers + List servers = serverManager.getOnlineServersList(); + + LOG.info("Assigning " + allRegions.size() + " across " + servers.size() + + " servers"); + + // Generate a cluster startup region placement plan + Map> bulkPlan = + LoadBalancer.bulkAssignment(allRegions, servers); + + // For each server, create OFFLINE nodes and send OPEN RPCs + for(Map.Entry> entry : bulkPlan.entrySet()) { + HServerInfo server = entry.getKey(); + List regions = entry.getValue(); + LOG.debug("Assigning " + regions.size() + " regions to " + server); + for(HRegionInfo region : regions) { + LOG.debug("Assigning " + region + " to " + server); + String regionName = region.getEncodedName(); + RegionPlan plan = new RegionPlan(regionName, null,server); + regionPlans.put(regionName, plan); + assign(region); + } + } + + // Wait for no regions to be in transition + try { + waitUntilNoRegionsInTransition(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for regions to be assigned", e); + throw new IOException(e); + } + + LOG.info("\n\nAll user regions have been assigned"); + } + + private void rebuildUserRegions() throws IOException { + Map allRegions = + MetaReader.fullScan(catalogTracker); + for(Map.Entry region : allRegions.entrySet()) { + HServerAddress regionLocation = region.getValue(); + HRegionInfo regionInfo = region.getKey(); + if(regionLocation == null) { + regions.put(regionInfo, null); + continue; + } + HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation); + regions.put(regionInfo, serverInfo); + Set regionSet = servers.get(serverInfo); + if(regionSet == null) { + regionSet = new TreeSet(); + servers.put(serverInfo, regionSet); + } + regionSet.add(regionInfo); + } + } + + /** + * Blocks until there are no regions in transition. It is possible that there + * are regions in transition immediately after this returns but guarantees + * that if it returns without an exception that there was a period of time + * with no regions in transition from the point-of-view of the in-memory + * state of the Master. + * @throws InterruptedException + */ + public void waitUntilNoRegionsInTransition() throws InterruptedException { + synchronized(regionsInTransition) { + while(regionsInTransition.size() > 0) { + regionsInTransition.wait(); + } + } + } + + /** + * Gets the map of regions currently in transition. + * @return + */ + public Map getRegionsInTransition() { + return regionsInTransition; + } + + /** + * Checks if the specified table has been disabled by the user. + * @param tableName + * @return + */ + public boolean isTableDisabled(String tableName) { + synchronized(disabledTables) { + return disabledTables.contains(tableName); + } + } + + /** + * Checks if the table of the specified region has been disabled by the user. + * @param regionName + * @return + */ + public boolean isTableOfRegionDisabled(byte [] regionName) { + return isTableDisabled(Bytes.toString( + HRegionInfo.getTableName(regionName))); + } + + /** + * Sets the specified table to be disabled. + * @param tableName table to be disabled + */ + public void disableTable(String tableName) { + synchronized(disabledTables) { + if(!isTableDisabled(tableName)) { + disabledTables.add(tableName); + try { + ZKTableDisable.disableTable(master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as disabled", e); + } + } + } + } + + /** + * Unsets the specified table from being disabled. + *

+ * This operation only acts on the in-memory + * @param tableName table to be undisabled + */ + public void undisableTable(String tableName) { + synchronized(disabledTables) { + if(isTableDisabled(tableName)) { + disabledTables.remove(tableName); + try { + ZKTableDisable.undisableTable(master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as disabled", e); + } + } + } + } + + /** + * Rebuild the set of disabled tables from zookeeper. Used during master + * failover. + */ + private void rebuildDisabledTables() { + synchronized(disabledTables) { + List disabledTables; try { - List newNodes = ZKUtil.watchAndGetNewChildren(watcher, - watcher.assignmentZNode); - for(NodeAndData newNode : newNodes) { - LOG.debug("Handling new unassigned node: " + newNode); - handleRegion(RegionTransitionData.fromBytes(newNode.getData())); - } - } catch(KeeperException e) { - LOG.error("Unexpected ZK exception reading unassigned children", e); - status.abort(); + disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper()); + } catch (KeeperException e) { + LOG.warn("ZK error getting list of disabled tables", e); + return; } + if(!disabledTables.isEmpty()) { + LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " + + "tables from zookeeper"); + disabledTables.addAll(disabledTables); + } + } + } + + /** + * Gets the online regions of the specified table. + * @param tableName + * @return + */ + public List getRegionsOfTable(byte[] tableName) { + List tableRegions = new ArrayList(); + for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo( + new HTableDescriptor(tableName), null, null)).keySet()) { + if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { + tableRegions.add(regionInfo); + } else { + break; + } + } + return tableRegions; + } + + /** + * Unsets the specified table as disabled (enables it). + */ + public class TimeoutMonitor extends Chore { + + private final int timeout; + + /** + * Creates a periodic monitor to check for time outs on region transition + * operations. This will deal with retries if for some reason something + * doesn't happen within the specified timeout. + * @param period + * @param stop + * @param timeout + */ + public TimeoutMonitor(final int period, final AtomicBoolean stop, + final int timeout) { + super("AssignmentTimeoutMonitor", period, stop); + this.timeout = timeout; + } + + @Override + protected void chore() { + synchronized(regionsInTransition) { + // Iterate all regions in transition checking for time outs + long now = System.currentTimeMillis(); + for(RegionState regionState : regionsInTransition.values()) { + if(regionState.getStamp() + timeout <= now) { + HRegionInfo regionInfo = regionState.getRegion(); + String regionName = regionInfo.getEncodedName(); + LOG.info("Region transition timed out for region " + regionName); + // Expired! Do a retry. + switch(regionState.getState()) { + case OFFLINE: + case CLOSED: + LOG.info("Region has been OFFLINE or CLOSED for too long, " + + "reassigning " + regionInfo.getRegionNameAsString()); + assign(regionState.getRegion()); + break; + case PENDING_OPEN: + case OPENING: + LOG.info("Region has been PENDING_OPEN or OPENING for too " + + "long, reassigning " + regionInfo.getRegionNameAsString()); + assign(regionState.getRegion()); + break; + case OPEN: + LOG.warn("Long-running region in OPEN state? This should " + + "not happen"); + break; + case PENDING_CLOSE: + case CLOSING: + LOG.info("Region has been PENDING_CLOSE or CLOSING for too " + + "long, resending close rpc"); + unassign(regionInfo); + break; + } + } + } + } + } + } + + public static class RegionState implements Writable { + private HRegionInfo region; + + public enum State { + OFFLINE, // region is in an offline state + PENDING_OPEN, // sent rpc to server to open but has not begun + OPENING, // server has begun to open but not yet done + OPEN, // server opened region and updated meta + PENDING_CLOSE, // sent rpc to server to close but has not begun + CLOSING, // server has begun to close but not yet done + CLOSED // server closed region and updated meta + } + + private State state; + private long stamp; + + public RegionState() {} + + RegionState(HRegionInfo region, State state) { + this(region, state, System.currentTimeMillis()); + } + + RegionState(HRegionInfo region, State state, long stamp) { + this.region = region; + this.state = state; + this.stamp = stamp; + } + + public void update(State state, long stamp) { + this.state = state; + this.stamp = stamp; + } + + public void update(State state) { + this.state = state; + this.stamp = System.currentTimeMillis(); + } + + public State getState() { + return state; + } + + public long getStamp() { + return stamp; + } + + public HRegionInfo getRegion() { + return region; + } + + public boolean isClosing() { + return state == State.CLOSING; + } + + public boolean isClosed() { + return state == State.CLOSED; + } + + public boolean isPendingClose() { + return state == State.PENDING_CLOSE; + } + + public boolean isOpening() { + return state == State.OPENING; + } + + public boolean isOpened() { + return state == State.OPEN; + } + + public boolean isPendingOpen() { + return state == State.PENDING_OPEN; + } + + public boolean isOffline() { + return state == State.OFFLINE; + } + + @Override + public String toString() { + return "RegionState (" + region.getRegionNameAsString() + ") " + state + + " at time " + stamp; + } + + @Override + public void readFields(DataInput in) throws IOException { + region = new HRegionInfo(); + region.readFields(in); + state = State.valueOf(in.readUTF()); + stamp = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + region.write(out); + out.writeUTF(state.name()); + out.writeLong(stamp); } } } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java?rev=982489&r1=982488&r2=982489&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java Thu Aug 5 07:35:00 2010 @@ -34,13 +34,14 @@ import org.apache.hadoop.hbase.HConstant import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; /** - * This class abstract a bunch of operations the HMaster needs to interact with - * the underlying file system, including splitting log files, checking file + * This class abstract a bunch of operations the HMaster needs to interact with + * the underlying file system, including splitting log files, checking file * system status, etc. */ public class FileSystemManager { @@ -48,7 +49,7 @@ public class FileSystemManager { // HBase configuration Configuration conf; // master status - MasterStatus masterStatus; + MasterController masterStatus; // Keep around for convenience. private final FileSystem fs; // Is the fileystem ok? @@ -59,8 +60,8 @@ public class FileSystemManager { private final Path rootdir; // create the split log lock final Lock splitLogLock = new ReentrantLock(); - - public FileSystemManager(Configuration conf, MasterStatus masterStatus) throws IOException { + + public FileSystemManager(Configuration conf, MasterController masterStatus) throws IOException { this.conf = conf; this.masterStatus = masterStatus; // Set filesystem to be that of this.rootdir else we get complaints about @@ -105,7 +106,7 @@ public class FileSystemManager { public Path getOldLogDir() { return this.oldLogDir; } - + /** * Checks to see if the file system is still accessible. * If not, sets closed @@ -123,7 +124,7 @@ public class FileSystemManager { } return this.fsOk; } - + /** * @return HBase root dir. * @throws IOException @@ -131,20 +132,22 @@ public class FileSystemManager { public Path getRootDir() { return this.rootdir; } - + public Lock getSplitLogLock() { return splitLogLock; } - - /* + + /** * Inspect the log directory to recover any log file without - * ad active region server. + * an active region server. */ public void splitLogAfterStartup() { Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); try { - if (!this.fs.exists(logsDirPath)) return; + if (!this.fs.exists(logsDirPath)) { + return; + } } catch (IOException e) { throw new RuntimeException("Could exists for " + logsDirPath, e); } @@ -179,8 +182,8 @@ public class FileSystemManager { } } } - - /* + + /** * Get the rootdir. Make sure its wholesome and exists before returning. * @param rd * @param conf @@ -238,7 +241,7 @@ public class FileSystemManager { } } - /* + /** * @param hri Set all family block caching to b * @param b */ @@ -250,4 +253,29 @@ public class FileSystemManager { } } } + + public void deleteRegion(HRegionInfo region) throws IOException { + fs.delete(HRegion.getRegionDir(rootdir, region), true); + } + + public void deleteTable(byte[] tableName) throws IOException { + fs.delete(new Path(rootdir, Bytes.toString(tableName)), true); + } + + public void updateRegionInfo(HRegionInfo region) { + // TODO implement this. i think this is currently broken in trunk i don't + // see this getting updated. + // @see HRegion.checkRegioninfoOnFilesystem() + } + + public void addFamily(HRegionInfo region, byte[] familyName) { + // TODO Looks like the family directory is just created on the first flush? + } + + public void deleteFamily(HRegionInfo region, byte[] familyName) + throws IOException { + fs.delete(Store.getStoreHomedir( + new Path(rootdir, region.getTableDesc().getNameAsString()), + region.getEncodedName(), familyName), true); + } }