Author: jgray Date: Wed Jul 14 03:41:11 2010 New Revision: 963935 URL: http://svn.apache.org/viewvc?rev=963935&view=rev Log: HBASE-2695 [MasterStartupCleanup-v4] Cleans up the master startup, adds new ZK tool ActiveMasterManager for master-side (part of master cleanup and refactor) Added: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original) +++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Wed Jul 14 03:41:11 2010 @@ -14,6 +14,9 @@ Branch 0.90.0 - Master Rewrite Branch HBASE-2696 Re-enabled TestZooKeeper.testRegionServerSessionExpired HBASE-2699 [LoadBalancer-v5] Reimplement load balancing to be a background process and to not use heartbeats + HBASE-2695 [MasterStartupCleanup-v4] Cleans up the master startup, adds + new ZK tool ActiveMasterManager for master-side (part of + master cleanup and refactor) NEW FEATURES Added: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt?rev=963935&view=auto ============================================================================== --- hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt (added) +++ hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt Wed Jul 14 03:41:11 2010 @@ -0,0 +1,38 @@ +List of things todo for branch, including comments from reviews not yet +implemented. + + +Now: + +* synchronize all access to the boolean in ActiveMasterManager + + +Think about: + +* renaming master file manager? MasterFS/MasterFileSystem + + +Later: + +* ServerStatus/MasterStatus + + - These need new names to be more descriptive (ServerControl?) + - They should have a very clear purpose that adds value beyond passing + HMaster directly + - Current idea is these things would just have accessors/setters to + the server status booleans and abort() methods (like closed, closing, + abortRequested) + +* HBaseEventHandler/HBaseEventType/HBaseExecutorService + + - After ZK changes, renamed to EventHandler/EventType + - Currently multiple types map to a single handler, we may want 1-to-1 + - Need to do a full review of the semantics of these once bulk of + master rewrite is done + +* LoadBalancer + + - Need to finish or back out code related to block locations + (if finish, need to use files not directory, and use right location) + - Put notes from reviewboard/jira into LB javadoc or hbase "book" + Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=963935&view=auto ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (added) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Wed Jul 14 03:41:11 2010 @@ -0,0 +1,154 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Handles everything on master-side related to master election. + * + * Listens and responds to ZooKeeper notifications on the master znode, + * both nodeCreated and nodeDeleted. + * + * Contains blocking methods which will hold up backup masters, waiting + * for the active master to fail. + * + * This class is instantiated in the HMaster constructor and the method + * {@link #blockUntilBecomingActiveMaster()} is called to wait until becoming + * the active master of the cluster. + */ +public class ActiveMasterManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class); + + final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); + + private final HServerAddress address; + private final MasterStatus status; + + ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address, + MasterStatus status) { + super(watcher); + this.address = address; + this.status = status; + } + + @Override + public void nodeCreated(String path) { + if(path.equals(watcher.masterAddressZNode) && !status.isClosed()) { + handleMasterNodeChange(); + } + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(watcher.masterAddressZNode) && !status.isClosed()) { + handleMasterNodeChange(); + } + } + + /** + * Handle a change in the master node. Doesn't matter whether this was called + * from a nodeCreated or nodeDeleted event because there are no guarantees + * that the current state of the master node matches the event at the time of + * our next ZK request. + * + * Uses the watchAndCheckExists method which watches the master address node + * regardless of whether it exists or not. If it does exist (there is an + * active master), it returns true. Otherwise it returns false. + * + * A watcher is set which guarantees that this method will get called again if + * there is another change in the master node. + */ + private void handleMasterNodeChange() { + // Watch the node and check if it exists. + try { + synchronized(clusterHasActiveMaster) { + if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) { + // A master node exists, there is an active master + LOG.debug("A master is now available"); + clusterHasActiveMaster.set(true); + } else { + // Node is no longer there, cluster does not have an active master + LOG.debug("No master available. notifying waiting threads"); + clusterHasActiveMaster.set(false); + // Notify any thread waiting to become the active master + clusterHasActiveMaster.notifyAll(); + } + } + } catch (KeeperException ke) { + LOG.fatal("Received an unexpected KeeperException, aborting", ke); + status.abortServer(); + } + } + + /** + * Block until becoming the active master. + * + * Method blocks until there is not another active master and our attempt + * to become the new active master is successful. + * + * This also makes sure that we are watching the master znode so will be + * notified if another master dies. + */ + void blockUntilBecomingActiveMaster() { + // Try to become the active master, watch if there is another master + try { + if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode, + address)) { + // We are the master, return + clusterHasActiveMaster.set(true); + return; + } + } catch (KeeperException ke) { + LOG.fatal("Received an unexpected KeeperException, aborting", ke); + status.abortServer(); + return; + } + // There is another active master, this is not a cluster startup + // and we must wait until the active master dies + LOG.info("Another master is already the active master, waiting to become " + + "the next active master"); + clusterHasActiveMaster.set(true); + status.setClusterStartup(false); + synchronized(clusterHasActiveMaster) { + while(clusterHasActiveMaster.get() && !status.isClosed()) { + try { + clusterHasActiveMaster.wait(); + } catch (InterruptedException e) { + // We expect to be interrupted when a master dies, will fall out if so + LOG.debug("Interrupted waiting for master to die", e); + } + } + if(status.isClosed()) { + return; + } + // Try to become active master again now that there is no active master + blockUntilBecomingActiveMaster(); + } + } +} Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java Wed Jul 14 03:41:11 2010 @@ -72,12 +72,23 @@ public class FileSystemManager { // We're supposed to run on 0.20 and 0.21 anyways. conf.set("fs.default.name", this.rootdir.toString()); conf.set("fs.defaultFS", this.rootdir.toString()); + // setup the filesystem variable this.fs = FileSystem.get(conf); - + // set up the archived logs path + this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + } + + /** + *
    + *
  1. Check if the root region exists and is readable, if not create it
  2. + *
  3. Create a log archive directory for RS to put archived logs
  4. + *
+ */ + public void initialize() throws IOException { + // check if the root directory exists checkRootDir(this.rootdir, conf, this.fs); // Make sure the region servers can archive their old logs - this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); if(!this.fs.exists(this.oldLogDir)) { this.fs.mkdirs(this.oldLogDir); } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jul 14 03:41:11 2010 @@ -29,19 +29,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.SortedMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -52,7 +46,6 @@ import org.apache.hadoop.hbase.HRegionIn import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; -import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LocalHBaseCluster; @@ -77,10 +70,10 @@ import org.apache.hadoop.hbase.ipc.HBase import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.RegionServerOperationQueue.ProcessingResultCode; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; @@ -127,122 +120,231 @@ public class HMaster extends Thread impl // TODO: Is this separate flag necessary? private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + // The configuration for the Master private final Configuration conf; + // server for the web ui private InfoServer infoServer; - private final int numRetries; - - // Metrics is set when we call run. + // Reporting to track master metrics. private final MasterMetrics metrics; // Our zk client. TODO: rename variable once we settle on naming private ZooKeeperWatcher zooKeeperWrapper; - // Watcher for master address and for cluster shutdown. - private final ZKMasterAddressWatcher zkMasterAddressWatcher; + // Manager and zk listener for master election + private ActiveMasterManager activeMasterManager; // A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo. private final Sleeper sleeper; - + // RPC server for the HMaster private final HBaseServer rpcServer; + // Address of the HMaster private final HServerAddress address; - // file system manager for the master FS operations private final FileSystemManager fileSystemManager; private final ServerConnection connection; + // server manager to deal with region server info private final ServerManager serverManager; + // region manager to deal with region specific stuff private final RegionManager regionManager; - private long lastFragmentationQuery = -1L; - private Map fragmentation = null; - private final RegionServerOperationQueue regionServerOperationQueue; - // True if this is the master that started the cluster. boolean isClusterStartup; + // TODO: the following should eventually be removed from here + private final RegionServerOperationQueue regionServerOperationQueue; + private long lastFragmentationQuery = -1L; + private Map fragmentation = null; + /** - * Constructor - * @param conf configuration - * @throws IOException + * Initializes the HMaster. The steps are as follows: + * + *
    + *
  1. Initialize HMaster RPC and address + *
  2. Connect to ZooKeeper and figure out if this is a fresh cluster start or + * a failed over master + *
  3. Initialize master components - server manager, region manager, metrics, + * region server queue, file system manager, etc + *
  4. Block until becoming active master + *
*/ public HMaster(Configuration conf) throws IOException { + // initialize some variables this.conf = conf; - - // Get my address and create an rpc server instance. The rpc-server port - // can be ephemeral...ensure we have the correct info - HServerAddress a = new HServerAddress(getMyAddress(this.conf)); - this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(), - a.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); - this.address = new HServerAddress(this.rpcServer.getListenerAddress()); + // set the thread name + setName(MASTER + "-" + this.address); - this.numRetries = conf.getInt("hbase.client.retries.number", 2); - int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); - this.sleeper = new Sleeper(threadWakeFrequency, this.closed); - this.connection = ServerConnectionManager.getConnection(conf); + /* + * 1. Determine address and initialize RPC server (but do not start) + * + * Get the master address and create an RPC server instance. The RPC + * server ports can be ephemeral. + */ + HServerAddress a = new HServerAddress(getMyAddress(this.conf)); + int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10); + this.rpcServer = HBaseRPC.getServer(this, a.getBindAddress(), a.getPort(), + numHandlers, false, conf); + this.address = new HServerAddress(rpcServer.getListenerAddress()); - // Figure out if this is a fresh cluster start. This is done by checking the - // number of RS ephemeral nodes. RS ephemeral nodes are created only after - // the primary master has written the address to ZK. So this has to be done - // before we race to write our address to zookeeper. - zooKeeperWrapper = + /* + * 2. Determine if this is a fresh cluster startup or failed over master + * + * This is done by checking for the existence of any ephemeral + * RegionServer nodes in ZooKeeper. These nodes are created by RSs on + * their initialization but only after they find the primary master. As + * long as this check is done before we write our address into ZK, this + * will work. Note that multiple masters could find this to be true on + * startup (none have become active master yet), which is why there is + * an additional check if this master does not become primary on its + * first attempt. + */ + zooKeeperWrapper = new ZooKeeperWatcher(conf, getHServerAddress().toString(), this); isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0); - - // Create the filesystem manager, which in turn does the following: - // - Creates the root hbase directory in the FS - // - Checks the FS to make sure the root directory is readable - // - Creates the archive directory for logs + + /* + * 3. Initialize master components. + * + * This includes the filesystem manager, server manager, region manager, + * metrics, queues, sleeper, etc... + */ + this.connection = ServerConnectionManager.getConnection(conf); + this.regionServerOperationQueue = new RegionServerOperationQueue(conf, closed); + this.metrics = new MasterMetrics(this.getName()); fileSystemManager = new FileSystemManager(conf, this); - - // Get our zookeeper wrapper and then try to write our address to zookeeper. - // We'll succeed if we are only master or if we win the race when many - // masters. Otherwise we park here inside in writeAddressToZooKeeper. - // TODO: Bring up the UI to redirect to active Master. - zooKeeperWrapper.registerListener(this); - this.zkMasterAddressWatcher = - new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested); - zooKeeperWrapper.registerListener(zkMasterAddressWatcher); - this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true); - this.regionServerOperationQueue = - new RegionServerOperationQueue(this.conf, this.closed); + serverManager = new ServerManager(this, metrics, regionServerOperationQueue); + regionManager = new RegionManager(this); + // create a sleeper to sleep for a configured wait frequency + int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); + this.sleeper = new Sleeper(threadWakeFrequency, this.closed); - // set the thread name - setName(MASTER); - // create the master metrics object - this.metrics = new MasterMetrics(MASTER); + /* + * 4. Block on becoming the active master. + * + * We race with other masters to write our address into ZooKeeper. If we + * succeed, we are the primary/active master and finish initialization. + * + * If we do not succeed, there is another active master and we should + * now wait until it dies to try and become the next active master. If + * we do not succeed on our first attempt, this is no longer a cluster + * startup. + */ + activeMasterManager = new ActiveMasterManager(zooKeeperWrapper, address, + this); + zooKeeperWrapper.registerListener(activeMasterManager); + // Wait here until we are the active master + activeMasterManager.blockUntilBecomingActiveMaster(); - serverManager = new ServerManager(this, metrics, regionServerOperationQueue); + // We are the active master now. - - // Start the unassigned watcher - which will create the unassigned region - // in ZK. This is needed before RegionManager() constructor tries to assign - // the root region. - ZKUnassignedWatcher.start(this.conf, this); - // start the "close region" executor service - HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString()); - // start the "open region" executor service - HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString()); + LOG.info("Server has become the active/primary master. Address is " + + this.address.toString()); - - // start the region manager - regionManager = new RegionManager(this); + // run() is executed next + } - // We're almost open for business - this.closed.set(false); - LOG.info("HMaster initialized on " + this.address.toString()); + /** + * Main processing loop for the HMaster. + * 1. Handle both fresh cluster start as well as failed over initialization of + * the HMaster. + * 2. Start the necessary services + * 3. Reassign the root region + * 4. The master is no longer closed - set "closed" to false + */ + @Override + public void run() { + try { + // If this is a fresh cluster start, make sure the root region exists. + if(isClusterStartup()) { + // Initialize the filesystem, which does the following: + // - Creates the root hbase directory in the FS + // - Checks the FS to make sure the root directory is readable + // - Creates the archive directory for logs + fileSystemManager.initialize(); + // Do any log splitting necessary + // TODO: Should do this in background rather than block master startup + fileSystemManager.splitLogAfterStartup(); + } + // TODO: fix the logic and naming for joinCluster() + joinCluster(); + // start up all service threads. + startServiceThreads(); + // assign the root region + regionManager.reassignRootRegion(); + // set the master as opened + this.closed.set(false); + LOG.info("HMaster started on " + this.address.toString()); + + while (!this.closed.get()) { + // check if we should be shutting down + if (this.shutdownRequested.get()) { + // The region servers won't all exit until we stop scanning the + // meta regions + this.regionManager.stopScanners(); + if (this.serverManager.numServers() == 0) { + startShutdown(); + break; + } + else { + LOG.debug("Waiting on " + + this.serverManager.getServersToServerInfo().keySet().toString()); + } + } + + // process the operation, handle the result + ProcessingResultCode resultCode = regionServerOperationQueue.process(); + // If FAILED op processing, bad. Will exit. + if(resultCode == ProcessingResultCode.FAILED) { + break; + } + // If bad filesystem, exit + else if(resultCode == ProcessingResultCode.REQUEUED_BUT_PROBLEM) { + if (!fileSystemManager.checkFileSystem()) { + break; + } + } + // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED + } + } catch (Throwable t) { + LOG.fatal("Unhandled exception. Starting shutdown.", t); + setClosed(); + } + + // Wait for all the remaining region servers to report in. + this.serverManager.letRegionServersShutdown(); + + // Clean up and close up shop + if (this.infoServer != null) { + LOG.info("Stopping infoServer"); + try { + this.infoServer.stop(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + this.rpcServer.stop(); + this.regionManager.stop(); + this.zooKeeperWrapper.close(); + HBaseExecutorService.shutdown(); + LOG.info("HMaster main thread exiting"); } - + /** - * Returns true if this master process was responsible for starting the - * cluster. + * Returns true if this master process was responsible for starting the + * cluster, false if not. */ public boolean isClusterStartup() { return isClusterStartup; } - - public void resetClusterStartup() { - isClusterStartup = false; + + /** + * Sets whether this is a cluster startup or not. Used by the + * {@link ActiveMasterManager} to set to false if we determine another master + * has become the primary. + * @param isClusterStartup false if another master became active before us + */ + public void setClusterStartup(boolean isClusterStartup) { + this.isClusterStartup = isClusterStartup; } - + public HServerAddress getHServerAddress() { return address; } @@ -274,7 +376,7 @@ public class HMaster extends Thread impl public InfoServer getInfoServer() { return this.infoServer; } - + /** * Return the file systen manager instance */ @@ -304,7 +406,7 @@ public class HMaster extends Thread impl public void setClosed() { this.closed.set(true); } - + public AtomicBoolean getClosed() { return this.closed; } @@ -330,71 +432,11 @@ public class HMaster extends Thread impl public Path getRootDir() { return fileSystemManager.getRootDir(); } - + public RegionServerOperationQueue getRegionServerOperationQueue() { return this.regionServerOperationQueue; } - /** Main processing loop */ - @Override - public void run() { - joinCluster(); - startServiceThreads(); - /* Main processing loop */ - try { - FINISHED: while (!this.closed.get()) { - // check if we should be shutting down - if (this.shutdownRequested.get()) { - // The region servers won't all exit until we stop scanning the - // meta regions - this.regionManager.stopScanners(); - if (this.serverManager.numServers() == 0) { - startShutdown(); - break; - } else { - LOG.debug("Waiting on " + - this.serverManager.getServersToServerInfo().keySet().toString()); - } - } - switch (this.regionServerOperationQueue.process()) { - case FAILED: - // If FAILED op processing, bad. Exit. - break FINISHED; - case REQUEUED_BUT_PROBLEM: - if (!fileSystemManager.checkFileSystem()) - // If bad filesystem, exit. - break FINISHED; - default: - // Continue run loop if conditions are PROCESSED, NOOP, REQUEUED - break; - } - } - } catch (Throwable t) { - LOG.fatal("Unhandled exception. Starting shutdown.", t); - setClosed(); - } - - // Wait for all the remaining region servers to report in. - this.serverManager.letRegionServersShutdown(); - - /* - * Clean up and close up shop - */ - if (this.infoServer != null) { - LOG.info("Stopping infoServer"); - try { - this.infoServer.stop(); - } catch (Exception ex) { - ex.printStackTrace(); - } - } - this.rpcServer.stop(); - this.regionManager.stop(); - this.zooKeeperWrapper.close(); - HBaseExecutorService.shutdown(); - LOG.info("HMaster main thread exiting"); - } - /* * Joins cluster. Checks to see if this instance of HBase is fresh or the * master was started following a failover. In the second case, it inspects @@ -461,6 +503,15 @@ public class HMaster extends Thread impl */ private void startServiceThreads() { try { + // Start the unassigned watcher - which will create the unassigned region + // in ZK. This is needed before RegionManager() constructor tries to assign + // the root region. + ZKUnassignedWatcher.start(this.conf, this); + // start the "close region" executor service + HBaseEventType.RS2ZK_REGION_CLOSED.startMasterExecutorService(address.toString()); + // start the "open region" executor service + HBaseEventType.RS2ZK_REGION_OPENED.startMasterExecutorService(address.toString()); + // start the region manager this.regionManager.start(); // Put up info server. int port = this.conf.getInt("hbase.master.info.port", 60010); @@ -578,7 +629,8 @@ public class HMaster extends Thread impl startKey = endKey; } } - for (int tries = 0; tries < this.numRetries; tries++) { + int numRetries = conf.getInt("hbase.client.retries.number", 2); + for (int tries = 0; tries < numRetries; tries++) { try { // We can not create a table unless meta regions have already been // assigned and scanned. @@ -594,7 +646,7 @@ public class HMaster extends Thread impl } catch (TableExistsException e) { throw e; } catch (IOException e) { - if (tries == this.numRetries - 1) { + if (tries == numRetries - 1) { throw RemoteExceptionHandler.checkIOException(e); } this.sleeper.sleep(); @@ -688,11 +740,14 @@ public class HMaster extends Thread impl new MetaScannerVisitor() { @Override public boolean processRow(Result data) throws IOException { - if (data == null || data.size() <= 0) + if (data == null || data.size() <= 0) { return true; + } Pair pair = metaRowToRegionPair(data); - if (pair == null) return false; + if (pair == null) { + return false; + } if (!Bytes.equals(pair.getFirst().getTableDesc().getName(), tableName)) { return false; @@ -702,10 +757,10 @@ public class HMaster extends Thread impl } }; - MetaScanner.metaScan(conf, visitor, tableName); + MetaScanner.metaScan(conf, visitor, tableName); return result; } - + private Pair metaRowToRegionPair( Result data) throws IOException { HRegionInfo info = Writables.getHRegionInfo( @@ -719,7 +774,7 @@ public class HMaster extends Thread impl } else { //undeployed return new Pair(info, null); - } + } } /** @@ -733,16 +788,19 @@ public class HMaster extends Thread impl throws IOException { final AtomicReference> result = new AtomicReference>(null); - + MetaScannerVisitor visitor = new MetaScannerVisitor() { @Override public boolean processRow(Result data) throws IOException { - if (data == null || data.size() <= 0) + if (data == null || data.size() <= 0) { return true; + } Pair pair = metaRowToRegionPair(data); - if (pair == null) return false; + if (pair == null) { + return false; + } if (!Bytes.equals(pair.getFirst().getTableDesc().getName(), tableName)) { return false; @@ -755,12 +813,12 @@ public class HMaster extends Thread impl MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1); return result.get(); } - + Pair getTableRegionFromName( final byte [] regionName) throws IOException { byte [] tableName = HRegionInfo.parseRegionName(regionName)[0]; - + Set regions = regionManager.getMetaRegionsForTable(tableName); for (MetaRegion m: regions) { byte [] metaRegionName = m.getRegionName(); @@ -770,7 +828,9 @@ public class HMaster extends Thread impl HConstants.REGIONINFO_QUALIFIER); get.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); Result data = srvr.get(metaRegionName, get); - if(data == null || data.size() <= 0) continue; + if(data == null || data.size() <= 0) { + continue; + } return metaRowToRegionPair(data); } return null; @@ -808,8 +868,9 @@ public class HMaster extends Thread impl switch (op) { case TABLE_SET_HTD: if (args == null || args.length < 1 || - !(args[0] instanceof HTableDescriptor)) + !(args[0] instanceof HTableDescriptor)) { throw new IOException("SET_HTD request requires an HTableDescriptor"); + } HTableDescriptor htd = (HTableDescriptor) args[0]; LOG.info("modifyTable(SET_HTD): " + htd); new ModifyTableMeta(this, tableName, htd).process(); @@ -820,9 +881,10 @@ public class HMaster extends Thread impl case TABLE_MAJOR_COMPACT: case TABLE_FLUSH: if (args != null && args.length > 0) { - if (!(args[0] instanceof ImmutableBytesWritable)) + if (!(args[0] instanceof ImmutableBytesWritable)) { throw new IOException( "request argument must be ImmutableBytesWritable"); + } Pair pair = null; if(tableName == null) { byte [] regionName = ((ImmutableBytesWritable)args[0]).get(); @@ -837,7 +899,9 @@ public class HMaster extends Thread impl } } else { for (Pair pair: getTableRegions(tableName)) { - if (pair.getSecond() == null) continue; // undeployed + if (pair.getSecond() == null) { + continue; // undeployed + } this.regionManager.startAction(pair.getFirst().getRegionName(), pair.getFirst(), pair.getSecond(), op); } @@ -869,7 +933,9 @@ public class HMaster extends Thread impl // an open or whatever. this.regionManager.clearFromInTransition(regionname); // If hostnameAndPort is still null, then none, exit. - if (hostnameAndPort == null) break; + if (hostnameAndPort == null) { + break; + } long startCode = Bytes.toLong(rr.getValue(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER)); @@ -905,8 +971,8 @@ public class HMaster extends Thread impl */ @Override public void process(WatchedEvent event) { - LOG.debug("Event " + event.getType() + - " with state " + event.getState() + + LOG.debug("Event " + event.getType() + + " with state " + event.getState() + " with path " + event.getPath()); // Master should kill itself if its session expired or if its // znode was deleted manually (usually for testing purposes) @@ -925,15 +991,13 @@ public class HMaster extends Thread impl zooKeeperWrapper = new ZooKeeperWatcher(conf, HMaster.class.getName(), this); zooKeeperWrapper.registerListener(this); - this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper); - if(!this.zkMasterAddressWatcher. - writeAddressToZooKeeper(this.address,false)) { - throw new Exception("Another Master is currently active"); - } + activeMasterManager = new ActiveMasterManager(zooKeeperWrapper, + this.address, this); + activeMasterManager.blockUntilBecomingActiveMaster(); - // we are a failed over master, reset the fact that we started the + // we are a failed over master, reset the fact that we started the // cluster - resetClusterStartup(); + setClusterStartup(false); // Verify the cluster to see if anything happened while we were away joinCluster(); } catch (Exception e) { Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/MasterStatus.java Wed Jul 14 03:41:11 2010 @@ -25,10 +25,10 @@ import org.apache.hadoop.hbase.ServerSta import org.apache.hadoop.hbase.client.ServerConnection; /** - * These are the set of functions implemented by the HMaster and accessed by + * These are the set of functions implemented by the HMaster and accessed by * the other packages in the master. - * - * TODO: this list has to be cleaned up, this is a re-factor only change that + * + * TODO: this list has to be cleaned up, this is a re-factor only change that * preserves the functions in the interface. */ public interface MasterStatus extends ServerStatus { @@ -42,26 +42,32 @@ public interface MasterStatus extends Se * Return the region manager for region related info */ public RegionManager getRegionManager(); - + /** * Return the file system manager for dealing with FS related stuff */ public FileSystemManager getFileSystemManager(); /** - * Is this the master that is starting the cluster up? If true, yes. + * Is this the master that is starting the cluster up? If true, yes. * Otherwise this is a failed over master. */ public boolean isClusterStartup(); /** + * Set whether this is a cluster starting up. + * @param isClusterStartup whether this is a cluster startup or failover + */ + public void setClusterStartup(boolean isClusterStartup); + + /** * Return the server RPC connection */ public ServerConnection getServerConnection(); - - // TODO: the semantics of the following methods should be defined. Once that + + // TODO: the semantics of the following methods should be defined. Once that // is clear, most of these should move to server status - + // start shutting down the server public void startShutdown(); // is a shutdown requested Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Wed Jul 14 03:41:11 2010 @@ -154,8 +154,6 @@ public class RegionManager { HConstants.DEFAULT_ZOOKEEPER_RETRIES); zooKeeperPause = conf.getInt(HConstants.ZOOKEEPER_PAUSE, HConstants.DEFAULT_ZOOKEEPER_PAUSE); - - reassignRootRegion(); } void start() { @@ -174,7 +172,7 @@ public class RegionManager { } } - void reassignRootRegion() { + public void reassignRootRegion() { unsetRootRegion(); if (!masterStatus.getShutdownRequested().get()) { synchronized (regionsInTransition) { Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKMasterAddressWatcher.java Wed Jul 14 03:41:11 2010 @@ -1,129 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HServerAddress; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; - -import java.util.concurrent.atomic.AtomicBoolean; - - -/** - * ZooKeeper watcher for the master address. Also watches the cluster state - * flag so will shutdown this master if cluster has been shutdown. - *

Used by the Master. Waits on the master address ZNode delete event. When - * multiple masters are brought up, they race to become master by writing their - * address to ZooKeeper. Whoever wins becomes the master, and the rest wait for - * that ephemeral node in ZooKeeper to evaporate (meaning the master went down), - * at which point they try to write their own address to become the new master. - */ -class ZKMasterAddressWatcher implements Watcher { - private static final Log LOG = LogFactory.getLog(ZKMasterAddressWatcher.class); - - private ZooKeeperWrapper zookeeper; - private final AtomicBoolean requestShutdown; - - /** - * Create this watcher using passed ZooKeeperWrapper instance. - * @param zk ZooKeeper - * @param flag Flag to set to request shutdown. - */ - ZKMasterAddressWatcher(final ZooKeeperWrapper zk, final AtomicBoolean flag) { - this.requestShutdown = flag; - this.zookeeper = zk; - } - - /** - * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent) - */ - @Override - public synchronized void process (WatchedEvent event) { - EventType type = event.getType(); - LOG.debug(("Got event " + type + " with path " + event.getPath())); - if (type.equals(EventType.NodeDeleted)) { - if (event.getPath().equals(this.zookeeper.clusterStateZNode)) { - LOG.info("Cluster shutdown while waiting, shutting down" + - " this master."); - this.requestShutdown.set(true); - } else { - LOG.debug("Master address ZNode deleted, notifying waiting masters"); - notifyAll(); - } - } else if(type.equals(EventType.NodeCreated) && - event.getPath().equals(this.zookeeper.clusterStateZNode)) { - LOG.debug("Resetting watch on cluster state node."); - this.zookeeper.setClusterStateWatch(); - } - } - - /** - * Wait for master address to be available. This sets a watch in ZooKeeper and - * blocks until the master address ZNode gets deleted. - */ - public synchronized void waitForMasterAddressAvailability() { - while (zookeeper.readMasterAddress(this) != null) { - try { - LOG.debug("Waiting for master address ZNode to be deleted " + - "(Also watching cluster state node)"); - this.zookeeper.setClusterStateWatch(); - wait(); - } catch (InterruptedException e) { - } - } - } - - /** - * Write address to zookeeper. Parks here until we successfully write our - * address (or until cluster shutdown). - * @param address Address whose format is HServerAddress.toString - */ - boolean writeAddressToZooKeeper( - final HServerAddress address, boolean retry) { - do { - waitForMasterAddressAvailability(); - // Check if we need to shutdown instead of taking control - if (this.requestShutdown.get()) { - LOG.debug("Won't start Master because cluster is shuting down"); - return false; - } - if(this.zookeeper.writeMasterAddress(address)) { - this.zookeeper.setClusterState(true); - this.zookeeper.setClusterStateWatch(); - // Watch our own node - this.zookeeper.readMasterAddress(this); - return true; - } - } while(retry); - return false; - } - - /** - * Reset the ZK in case a new connection is required - * @param zookeeper new instance - */ - public void setZookeeper(ZooKeeperWrapper zookeeper) { - this.zookeeper = zookeeper; - } -} \ No newline at end of file Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jul 14 03:41:11 2010 @@ -327,7 +327,7 @@ public class HRegionServer implements HR zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this); // create the master address manager, register with zk, and start it - masterAddressManager = new MasterAddressManager(zooKeeper); + masterAddressManager = new MasterAddressManager(zooKeeper, this); zooKeeper.registerListener(masterAddressManager); masterAddressManager.monitorMaster(); } Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java Wed Jul 14 03:41:11 2010 @@ -22,21 +22,23 @@ package org.apache.hadoop.hbase.regionse import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerStatus; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * Manages the location of the current active Master for this RegionServer. - * - * Listens for ZooKeeper events related to the master address. The node /master - * will contain the address of the current master. This listener is interested + * + * Listens for ZooKeeper events related to the master address. The node /master + * will contain the address of the current master. This listener is interested * in NodeDeleted and NodeCreated events on /master. - * + * * This class is thread-safe and takes care of re-setting all watchers to * ensure it always knows the up-to-date master. To kick it off, instantiate * the class and run the {@link #monitorMaster()} method. - * + * * You can get the current master via {@link #getMasterAddress()} or the * blocking method {@link #waitMasterAddress()}. */ @@ -45,25 +47,29 @@ public class MasterAddressManager extend // Address of the current primary master, null if no primary master private HServerAddress masterAddress; - + + // Status and controller for the regionserver + private ServerStatus status; + /** * Construct a master address listener with the specified zookeeper reference. - * + * * This constructor does not trigger any actions, you must call methods * explicitly. Normally you will just want to execute {@link #monitorMaster()} - * and you will ensure to - * + * and you will ensure to + * * @param watcher zk reference and watcher */ - public MasterAddressManager(ZooKeeperWatcher watcher) { + public MasterAddressManager(ZooKeeperWatcher watcher, ServerStatus status) { super(watcher); - masterAddress = null; + this.status = status; + this.masterAddress = null; } - + /** * Get the address of the current master if one is available. Returns null * if no current master. - * + * * Use {@link #waitMasterAddress} if you want to block until the master is * available. * @return server address of current active master, or null if none available @@ -71,7 +77,7 @@ public class MasterAddressManager extend public synchronized HServerAddress getMasterAddress() { return masterAddress; } - + /** * Check if there is a master available. * @return true if there is a master set, false if not. @@ -79,14 +85,14 @@ public class MasterAddressManager extend public synchronized boolean hasMaster() { return masterAddress != null; } - + /** * Get the address of the current master. If no master is available, method * will block until one is available, the thread is interrupted, or timeout * has passed. - * + * * TODO: Make this work, currently unused, kept with existing retry semantics. - * + * * @return server address of current active master, null if timed out * @throws InterruptedException if the thread is interrupted while waiting */ @@ -94,22 +100,28 @@ public class MasterAddressManager extend throws InterruptedException { return masterAddress; } - + /** * Setup to watch for the primary master of the cluster. - * + * * If the master is already available in ZooKeeper, this method will ensure * it gets set and that any further changes are also watched for. - * + * * If no master is available, this method ensures we become aware of it and * will take care of setting it. */ public void monitorMaster() { - if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) { - handleNewMaster(); + try { + if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) { + handleNewMaster(); + } + } catch(KeeperException ke) { + // If we have a ZK exception trying to find the master we must abort + LOG.fatal("Unexpected ZooKeeper exception", ke); + status.abortServer(); } } - + @Override public void nodeCreated(String path) { LOG.info("nodeCreated(" + path + ")"); @@ -118,7 +130,7 @@ public class MasterAddressManager extend } monitorMaster(); } - + @Override public void nodeDeleted(String path) { if(path.equals(watcher.masterAddressZNode)) { @@ -126,7 +138,7 @@ public class MasterAddressManager extend } monitorMaster(); } - + /** * Set the master address to the specified address. This operation is * idempotent, a master will only be set if there is currently no master set. @@ -137,7 +149,7 @@ public class MasterAddressManager extend masterAddress = address; } } - + /** * Unsets the master address. Used when the master goes offline so none is * available. @@ -148,34 +160,40 @@ public class MasterAddressManager extend masterAddress = null; } } - + /** * Handle a new master being set. - * + * * This method should be called to check if there is a new master. If there * is already a master set, this method returns immediately. If none is set, * this will attempt to grab the master location from ZooKeeper and will set * it. - * - * This method uses an atomic operation to ensure a new master is only set + * + * This method uses an atomic operation to ensure a new master is only set * once. */ private void handleNewMaster() { if(hasMaster()) { return; } - HServerAddress address = - ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + HServerAddress address = null; + try { + address = ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + } catch (KeeperException ke) { + // If we have a ZK exception trying to find the master we must abort + LOG.fatal("Unexpected ZooKeeper exception", ke); + status.abortServer(); + } if(address != null) { setMasterAddress(address); } } - + /** * Handle a master failure. - * + * * Triggered when a master node is deleted. - * + * * TODO: Other ways we figure master is "dead"? What do we do if set in ZK * but we can't communicate with TCP? */ Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Jul 14 03:41:11 2010 @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.zookeeper; import java.io.IOException; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.Log; @@ -27,28 +28,34 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * Internal HBase utility class for ZooKeeper. - * - * Contains only static methods and constants. + * + *

Contains only static methods and constants. + * + *

Methods all throw {@link KeeperException} if there is an unexpected + * zookeeper exception, so callers of these methods must handle appropriately. + * If ZK is required for the operation, the server will need to be aborted. */ public class ZKUtil { private static final Log LOG = LogFactory.getLog(ZKUtil.class); // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved. private static final char ZNODE_PATH_SEPARATOR = '/'; - + /** * Creates a new connection to ZooKeeper, pulling settings and quorum config * from the specified configuration object using methods from {@link ZKConfig}. - * + * * Sets the connection status monitoring watcher to the specified watcher. - * + * * @param conf configuration to pull quorum and other settings from * @param watcher watcher to monitor connection changes * @return connection to zookeeper @@ -66,12 +73,16 @@ public class ZKUtil { return new ZooKeeper(quorum, timeout, watcher); } + // + // Helper methods + // + /** * Join the prefix znode name with the suffix znode name to generate a proper * full znode name. - * + * * Assumes prefix does not end with slash and suffix does not begin with it. - * + * * @param prefix beginning of znode name * @param suffix ending of znode name * @return result of properly joining prefix with suffix @@ -80,16 +91,22 @@ public class ZKUtil { return prefix + ZNODE_PATH_SEPARATOR + suffix; } + // + // Existence checks and watches + // + /** * Watch the specified znode for delete/create/change events. The watcher is * set whether or not the node exists. If the node already exists, the method * returns true. If the node does not exist, the method returns false. - * + * * @param zkw zk reference * @param znode path of node to watch * @return true if znode exists, false if does not exist or error + * @throws KeeperException if unexpected zookeeper exception */ - public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) { + public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) + throws KeeperException { try { Stat s = zkw.getZooKeeper().exists(znode, zkw); zkw.debug("Set watcher on existing znode (" + znode + ")"); @@ -104,21 +121,103 @@ public class ZKUtil { return false; } } - + + // + // Znode listings + // + + /** + * Lists the children znodes of the specified znode. Also sets a watch on + * the specified znode which will capture a NodeDeleted event on the specified + * znode as well as NodeChildrenChanged if any children of the specified znode + * are created or deleted. + * + * Returns null if the specified node does not exist. Otherwise returns a + * list of children of the specified node. If the node exists but it has no + * children, an empty list will be returned. + * + * @param zkw zk reference + * @param znode path of node to list and watch children of + * @returns list of children of the specified node, an empty list if the node + * exists but has no children, and null if the node does not exist + * @throws KeeperException if unexpected zookeeper exception + */ + public static List listChildrenAndWatchForNewChildren( + ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + List children = zkw.getZooKeeper().getChildren(znode, zkw); + return children; + } catch(KeeperException.NoNodeException ke) { + zkw.debug("Unable to list children of znode (" + znode + ") " + + "because node does not exist (not an error)"); + return null; + } catch (KeeperException e) { + zkw.warn("Unable to list children of znode (" + znode + ")", e); + zkw.keeperException(e); + return null; + } catch (InterruptedException e) { + zkw.warn("Unable to list children of znode (" + znode + ")", e); + zkw.interruptedException(e); + return null; + } + } + + /** + * Checks if the specified znode has any children. Sets no watches. + * + * Returns true if the node exists and has children. Returns false if the + * node does not exist or if the node does not have any children. + * + * Used during master initialization to determine if the master is a + * failed-over-to master or the first master during initial cluster startup. + * If the directory for regionserver ephemeral nodes is empty then this is + * a cluster startup, if not then it is not cluster startup. + * + * @param zkw zk reference + * @param znode path of node to check for children of + * @return true if node has children, false if not or node does not exist + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode) + throws KeeperException { + try { + return !zkw.getZooKeeper().getChildren(znode, null).isEmpty(); + } catch(KeeperException.NoNodeException ke) { + zkw.debug("Unable to list children of znode (" + znode + ") " + + "because node does not exist (not an error)"); + return false; + } catch (KeeperException e) { + zkw.warn("Unable to list children of znode (" + znode + ")", e); + zkw.keeperException(e); + return false; + } catch (InterruptedException e) { + zkw.warn("Unable to list children of znode (" + znode + ")", e); + zkw.interruptedException(e); + return false; + } + } + + // + // Data retrieval + // + /** * Get the data at the specified znode and set a watch. - * + * * Returns the data and sets a watch if the node exists. Returns null and no * watch is set if the node does not exist or there is an exception. - * + * * @param zkw zk reference * @param znode path of node * @return data of the specified znode, or null + * @throws KeeperException if unexpected zookeeper exception */ - public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) { + public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) + throws KeeperException { try { byte [] data = zkw.getZooKeeper().getData(znode, zkw, null); - zkw.debug("Retrieved " + data.length + " bytes of data from znode (" + + zkw.debug("Retrieved " + data.length + " bytes of data from znode (" + znode + ") and set a watcher"); return data; } catch (KeeperException.NoNodeException e) { @@ -135,21 +234,23 @@ public class ZKUtil { return null; } } - + /** * Get the data at the specified znode, deserialize it as an HServerAddress, * and set a watch. - * + * * Returns the data as a server address and sets a watch if the node exists. * Returns null and no watch is set if the node does not exist or there is an * exception. - * + * * @param zkw zk reference * @param znode path of node * @return data of the specified node as a server address, or null + * @throws KeeperException if unexpected zookeeper exception */ - public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw, - String znode) { + public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw, + String znode) + throws KeeperException { byte [] data = getDataAndWatch(zkw, znode); if(data == null) { return null; @@ -158,4 +259,86 @@ public class ZKUtil { zkw.debug("Read server address from znode (" + znode + "): " + addrString); return new HServerAddress(addrString); } + + /** + * Set the specified znode to be an ephemeral node carrying the specified + * server address. Used by masters for their ephemeral node and regionservers + * for their ephemeral node. + * + * If the node is created successfully, a watcher is also set on the node. + * + * If the node is not created successfully because it already exists, this + * method will also set a watcher on the node. + * + * If there is another problem, a KeeperException will be thrown. + * + * @param zkw zk reference + * @param znode path of node + * @param address server address + * @return true if address set, false if not, watch set in both cases + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean setAddressAndWatch(ZooKeeperWatcher zkw, + String znode, HServerAddress address) + throws KeeperException { + return createEphemeralNodeAndWatch(zkw, znode, + Bytes.toBytes(address.toString())); + } + /** + * + * Set the specified znode to be an ephemeral node carrying the specified + * data. + * + * If the node is created successfully, a watcher is also set on the node. + * + * If the node is not created successfully because it already exists, this + * method will also set a watcher on the node. + * + * If there is another problem, a KeeperException will be thrown. + * + * @param zkw zk reference + * @param znode path of node + * @param data data of node + * @return true if node created, false if not, watch set in both cases + * @throws KeeperException if unexpected zookeeper exception + */ + public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw, + String znode, byte [] data) + throws KeeperException { + try { + zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL); + } catch (KeeperException.NodeExistsException nee) { + if(!watchAndCheckExists(zkw, znode)) { + // It did exist but now it doesn't, try again + return createEphemeralNodeAndWatch(zkw, znode, data); + } + return false; + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + } + return true; + } + + /** + * Creates the specified node, if the node does not exist. Does not set a + * watch and fails silently if the node already exists. + * + * The node created is persistent and open access. + * + * @param zkw zk reference + * @param znode path of node + * @throws KeeperException if unexpected zookeeper exception + */ + public static void createIfNotExists(ZooKeeperWatcher zkw, + String znode) + throws KeeperException { + try { + zkw.getZooKeeper().create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch(KeeperException.NodeExistsException nee) { + } catch(InterruptedException ie) { + zkw.interruptedException(ie); + } + } } \ No newline at end of file Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (original) +++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Wed Jul 14 03:41:11 2010 @@ -88,6 +88,12 @@ public class ZooKeeperWatcher extends Zo this.server = server; info("Connected to ZooKeeper"); setNodeNames(conf); + try { + ZKUtil.createIfNotExists(this, baseZNode); + } catch (KeeperException e) { + error("Unexpected KeeperException creating base node", e); + throw new IOException(e); + } } /** @@ -198,9 +204,10 @@ public class ZooKeeperWatcher extends Zo // Abort the server if Disconnected or Expired // TODO: Åny reason to handle these two differently? case Disconnected: + info("Received Disconnected from ZooKeeper, ignoring"); + break; case Expired: - error("Received Disconnected/Expired [" + event.getState() + "] " + - "from ZooKeeper, aborting server"); + error("Received Expired from ZooKeeper, aborting server"); if(server != null) { server.abortServer(); } @@ -213,13 +220,15 @@ public class ZooKeeperWatcher extends Zo * * This may be temporary but for now this gives one place to deal with these. * - * TODO: Currently this method aborts the server. + * TODO: Currently this method rethrows the exception to let the caller handle * * @param ke + * @throws KeeperException */ - public void keeperException(KeeperException ke) { - error("Received unexpected KeeperException, aborting server", ke); - server.abortServer(); + public void keeperException(KeeperException ke) + throws KeeperException { + error("Received unexpected KeeperException, re-throwing exception", ke); + throw ke; } /** @@ -229,6 +238,7 @@ public class ZooKeeperWatcher extends Zo * * TODO: Currently, this method does nothing. * Is this ever expected to happen? Do we abort or can we let it run? + * Maybe this should be logged as WARN? It shouldn't happen? * * @param ie */ Added: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java?rev=963935&view=auto ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java (added) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestActiveMasterManager.java Wed Jul 14 03:41:11 2010 @@ -0,0 +1,269 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.client.ServerConnection; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestActiveMasterManager { + private static final Log LOG = LogFactory.getLog(TestActiveMasterManager.class); + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + /** + * Unit tests that uses ZooKeeper but does not use the master-side methods + * but rather acts directly on ZK. + * @throws Exception + */ + @Test + public void testActiveMasterManagerFromZK() throws Exception { + + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "testActiveMasterManagerFromZK", null); + zk.createZNodeIfNotExists(zk.baseZNode); + try { + zk.deleteZNode(zk.masterAddressZNode); + } catch(KeeperException.NoNodeException nne) {} + + // Create the master node with a dummy address + HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234); + HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234); + + // Should not have a master yet + DummyMasterStatus ms1 = new DummyMasterStatus(); + ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk, + firstMasterAddress, ms1); + zk.registerListener(activeMasterManager); + assertFalse(activeMasterManager.clusterHasActiveMaster.get()); + + // First test becoming the active master uninterrupted + activeMasterManager.blockUntilBecomingActiveMaster(); + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertMaster(zk, firstMasterAddress); + + // New manager will now try to become the active master in another thread + WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress); + zk.registerListener(t.manager); + t.start(); + // Wait for this guy to figure out there is another active master + // Wait for 1 second at most + int sleeps = 0; + while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) { + Thread.sleep(10); + sleeps++; + } + + // Both should see that there is an active master + assertTrue(activeMasterManager.clusterHasActiveMaster.get()); + assertTrue(t.manager.clusterHasActiveMaster.get()); + // But secondary one should not be the active master + assertFalse(t.isActiveMaster); + + // Close the first server and delete it's master node + ms1.setClosed(); + + // Use a listener to capture when the node is actually deleted + NodeDeletionListener listener = new NodeDeletionListener(zk, zk.masterAddressZNode); + zk.registerListener(listener); + + LOG.info("Deleting master node"); + zk.deleteZNode(zk.masterAddressZNode); + + // Wait for the node to be deleted + LOG.info("Waiting for active master manager to be notified"); + listener.waitForDeletion(); + LOG.info("Master node deleted"); + + // Now we expect the secondary manager to have and be the active master + // Wait for 1 second at most + sleeps = 0; + while(!t.isActiveMaster && sleeps < 100) { + Thread.sleep(10); + sleeps++; + } + LOG.debug("Slept " + sleeps + " times"); + + assertTrue(t.manager.clusterHasActiveMaster.get()); + assertTrue(t.isActiveMaster); + } + + /** + * Assert there is an active master and that it has the specified address. + * @param zk + * @param thisMasterAddress + * @throws KeeperException + */ + private void assertMaster(ZooKeeperWatcher zk, + HServerAddress expectedAddress) throws KeeperException { + HServerAddress readAddress = ZKUtil.getDataAsAddress(zk, zk.masterAddressZNode); + assertNotNull(readAddress); + assertTrue(expectedAddress.equals(readAddress)); + } + + public static class WaitToBeMasterThread extends Thread { + + ActiveMasterManager manager; + boolean isActiveMaster; + + public WaitToBeMasterThread(ZooKeeperWatcher zk, + HServerAddress address) { + this.manager = new ActiveMasterManager(zk, address, + new DummyMasterStatus()); + isActiveMaster = false; + } + + @Override + public void run() { + manager.blockUntilBecomingActiveMaster(); + LOG.info("Second master has become the active master!"); + isActiveMaster = true; + } + } + + public static class NodeDeletionListener extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class); + + private Semaphore lock; + private String node; + + public NodeDeletionListener(ZooKeeperWatcher watcher, String node) { + super(watcher); + lock = new Semaphore(0); + this.node = node; + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(node)) { + LOG.debug("nodeDeleted(" + path + ")"); + lock.release(); + } + } + + public void waitForDeletion() throws InterruptedException { + lock.acquire(); + } + } + + public static class DummyMasterStatus implements MasterStatus { + + private AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public AtomicBoolean getClosed() { + return closed; + } + + @Override + public FileSystemManager getFileSystemManager() { + return null; + } + + @Override + public RegionManager getRegionManager() { + return null; + } + + @Override + public ServerConnection getServerConnection() { + return null; + } + + @Override + public ServerManager getServerManager() { + return null; + } + + @Override + public AtomicBoolean getShutdownRequested() { + return null; + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public boolean isClusterStartup() { + return false; + } + + @Override + public void setClosed() { + closed.set(true); + } + + @Override + public void setClusterStartup(boolean isClusterStartup) {} + + @Override + public void shutdown() {} + + @Override + public void startShutdown() {} + + @Override + public void abortServer() {} + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public HServerAddress getHServerAddress() { + return null; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return null; + } + + } +} Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java?rev=963935&r1=963934&r2=963935&view=diff ============================================================================== --- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java (original) +++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java Wed Jul 14 03:41:11 2010 @@ -40,12 +40,12 @@ public class TestMasterAddressManager { private static final Log LOG = LogFactory.getLog(TestMasterAddressManager.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); } - + @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniZKCluster(); @@ -57,29 +57,29 @@ public class TestMasterAddressManager { */ @Test public void testMasterAddressManagerFromZK() throws Exception { - - ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + + ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "testMasterAddressManagerFromZK", null); zk.createZNodeIfNotExists(zk.baseZNode); - + // Should not have a master yet - MasterAddressManager addressManager = new MasterAddressManager(zk); + MasterAddressManager addressManager = new MasterAddressManager(zk, null); addressManager.monitorMaster(); assertFalse(addressManager.hasMaster()); zk.registerListener(addressManager); - + // Use a listener to capture when the node is actually created NodeCreationListener listener = new NodeCreationListener(zk, zk.masterAddressZNode); zk.registerListener(listener); - + // Create the master node with a dummy address String host = "hostname"; int port = 1234; HServerAddress dummyAddress = new HServerAddress(host, port); LOG.info("Creating master node"); - zk.createZNodeIfNotExists(zk.masterAddressZNode, + zk.createZNodeIfNotExists(zk.masterAddressZNode, Bytes.toBytes(dummyAddress.toString()), CreateMode.EPHEMERAL, false); - + // Wait for the node to be created LOG.info("Waiting for master address manager to be notified"); listener.waitForCreation(); @@ -87,21 +87,21 @@ public class TestMasterAddressManager { assertTrue(addressManager.hasMaster()); HServerAddress pulledAddress = addressManager.getMasterAddress(); assertTrue(pulledAddress.equals(dummyAddress)); - + } - + public static class NodeCreationListener extends ZooKeeperListener { private static final Log LOG = LogFactory.getLog(NodeCreationListener.class); - + private Semaphore lock; private String node; - + public NodeCreationListener(ZooKeeperWatcher watcher, String node) { super(watcher); lock = new Semaphore(0); this.node = node; } - + @Override public void nodeCreated(String path) { if(path.equals(node)) { @@ -109,7 +109,7 @@ public class TestMasterAddressManager { lock.release(); } } - + public void waitForCreation() throws InterruptedException { lock.acquire(); }