Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 40586 invoked from network); 31 Aug 2010 23:53:15 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 31 Aug 2010 23:53:15 -0000 Received: (qmail 13959 invoked by uid 500); 31 Aug 2010 23:53:14 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 13918 invoked by uid 500); 31 Aug 2010 23:53:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 13902 invoked by uid 99); 31 Aug 2010 23:53:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Aug 2010 23:53:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Aug 2010 23:53:10 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 42C0B2388A41; Tue, 31 Aug 2010 23:51:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r991397 [4/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/or... Date: Tue, 31 Aug 2010 23:51:50 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100831235153.42C0B2388A41@eris.apache.org> Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java?rev=991397&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java Tue Aug 31 23:51:44 2010 @@ -0,0 +1,210 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.executor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Writable; + +/** + * Data serialized into ZooKeeper for region transitions. + */ +public class RegionTransitionData implements Writable { + /** + * Type of transition event (offline, opening, opened, closing, closed). + * Required. + */ + private EventType eventType; + + /** Region being transitioned. Required. */ + private byte [] regionName; + + /** Server event originated from. Optional. */ + private String serverName; + + /** Time the event was created. Required but automatically set. */ + private long stamp; + + /** + * Writable constructor. Do not use directly. + */ + public RegionTransitionData() {} + + /** + * Construct data for a new region transition event with the specified event + * type and region name. + * + *

Used when the server name is not known (the master is setting it). This + * happens during cluster startup or during failure scenarios. When + * processing a failed regionserver, the master assigns the regions from that + * server to other servers though the region was never 'closed'. During + * master failover, the new master may have regions stuck in transition + * without a destination so may have to set regions offline and generate a new + * assignment. + * + *

Since only the master uses this constructor, the type should always be + * {@link EventType#M2ZK_REGION_OFFLINE}. + * + * @param eventType type of event + * @param regionName name of region + */ + public RegionTransitionData(EventType eventType, byte [] regionName) { + this(eventType, regionName, null); + } + + /** + * Construct data for a new region transition event with the specified event + * type, region name, and server name. + * + *

Used when the server name is known (a regionserver is setting it). + * + *

Valid types for this constructor are {@link EventType#RS2ZK_REGION_CLOSING}, + * {@link EventType#RS2ZK_REGION_CLOSED}, {@link EventType#RS2ZK_REGION_OPENING}, + * and {@link EventType#RS2ZK_REGION_OPENED}. + * + * @param eventType type of event + * @param regionName name of region + * @param serverName name of server setting data + */ + public RegionTransitionData(EventType eventType, byte [] regionName, + String serverName) { + this.eventType = eventType; + this.stamp = System.currentTimeMillis(); + this.regionName = regionName; + this.serverName = serverName; + } + + /** + * Gets the type of region transition event. + * + *

One of: + *

    + *
  • {@link EventType#M2ZK_REGION_OFFLINE} + *
  • {@link EventType#RS2ZK_REGION_CLOSING} + *
  • {@link EventType#RS2ZK_REGION_CLOSED} + *
  • {@link EventType#RS2ZK_REGION_OPENING} + *
  • {@link EventType#RS2ZK_REGION_OPENED} + *
+ * @return type of region transition event + */ + public EventType getEventType() { + return eventType; + } + + /** + * Gets the name of the region being transitioned. + * + *

Region name is required so this never returns null. + * @return region name + */ + public byte [] getRegionName() { + return regionName; + } + + /** + * Gets the server the event originated from. If null, this event originated + * from the master. + * + * @return server name of originating regionserver, or null if from master + */ + public String getServerName() { + return serverName; + } + + /** + * Gets the timestamp when this event was created. + * + * @return stamp event was created + */ + public long getStamp() { + return stamp; + } + + @Override + public void readFields(DataInput in) throws IOException { + // the event type byte + eventType = EventType.values()[in.readShort()]; + // the timestamp + stamp = in.readLong(); + // the encoded name of the region being transitioned + regionName = Bytes.readByteArray(in); + // remaining fields are optional so prefixed with boolean + // the name of the regionserver sending the data + if(in.readBoolean()) { + serverName = in.readUTF(); + } else { + serverName = null; + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeShort(eventType.ordinal()); + out.writeLong(System.currentTimeMillis()); + Bytes.writeByteArray(out, regionName); + // remaining fields are optional so prefixed with boolean + out.writeBoolean(serverName != null); + if(serverName != null) { + out.writeUTF(serverName); + } + } + + /** + * Get the bytes for this instance. Throws a {@link RuntimeException} if + * there is an error deserializing this instance because it represents a code + * bug. + * @return binary representation of this instance + */ + public byte [] getBytes() { + try { + return Writables.getBytes(this); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Get an instance from bytes. Throws a {@link RuntimeException} if + * there is an error serializing this instance from bytes because it + * represents a code bug. + * @param bytes binary representation of this instance + * @return instance of this class + */ + public static RegionTransitionData fromBytes(byte [] bytes) { + try { + RegionTransitionData data = new RegionTransitionData(); + Writables.getWritable(bytes, data); + return data; + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "region=" + Bytes.toString(regionName) + ", server=" + serverName + + ", state=" + eventType; + } +} Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=991397&r1=991396&r2=991397&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Aug 31 23:51:44 2010 @@ -23,11 +23,10 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,26 +44,37 @@ import org.apache.hadoop.hbase.HTableDes import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.MultiAction; -import org.apache.hadoop.hbase.client.Action; -import org.apache.hadoop.hbase.client.MultiResponse; +import org.apache.hadoop.hbase.client.MultiPut; +import org.apache.hadoop.hbase.client.MultiPutResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.MultiPutResponse; -import org.apache.hadoop.hbase.client.MultiPut; -import org.apache.hadoop.hbase.filter.*; -import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.DependentColumnFilter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.SkipFilter; +import org.apache.hadoop.hbase.filter.ValueFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.util.Bytes; /** * This is a customized version of the polymorphic hadoop @@ -173,16 +183,10 @@ public class HbaseObjectWritable impleme addToMap(HLog.Entry[].class, code++); addToMap(HLogKey.class, code++); - // List addToMap(List.class, code++); + + addToMap(NavigableSet.class, code++); addToMap(ColumnPrefixFilter.class, code++); - - // Multi - addToMap(Row.class, code++); - addToMap(Action.class, code++); - addToMap(MultiAction.class, code++); - addToMap(MultiResponse.class, code++); - } private Class declaredClass; @@ -512,4 +516,4 @@ public class HbaseObjectWritable impleme public Configuration getConf() { return this.conf; } -} \ No newline at end of file +} Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=991397&r1=991396&r2=991397&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Tue Aug 31 23:51:44 2010 @@ -345,8 +345,9 @@ public class HBaseRPC { if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) { LOG.info("Server at " + addr + " could not be reached after " + reconnectAttempts + " tries, giving up."); - throw new RetriesExhaustedException("Failed setting up proxy to " + - addr.toString() + " after attempts=" + reconnectAttempts); + throw new RetriesExhaustedException("Failed setting up proxy " + + protocol + " to " + addr.toString() + " after attempts=" + + reconnectAttempts, se); } } catch(SocketTimeoutException te) { // namenode is busy LOG.info("Problem connecting to server: " + addr); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=991397&r1=991396&r2=991397&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Tue Aug 31 23:51:44 2010 @@ -76,7 +76,8 @@ public interface HBaseRPCProtocolVersion *

  • Version 22: HBASE-2209. Added List support to RPC
  • *
  • Version 23: HBASE-2066, multi-put.
  • *
  • Version 24: HBASE-2473, create table with regions.
  • + *
  • Version 25: Added openRegion and Stoppable/Abortable to API.
  • * */ - public static final long versionID = 24L; + public static final long versionID = 25L; } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=991397&r1=991396&r2=991397&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Tue Aug 31 23:51:44 2010 @@ -19,13 +19,12 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; + import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.io.Writable; - -import java.io.IOException; +import org.apache.hadoop.hbase.UnknownRegionException; /** * Clients interact with the HMasterInterface to gain access to meta-level @@ -73,12 +72,10 @@ public interface HMasterInterface extend /** * Modifies an existing column on the specified table * @param tableName table name - * @param columnName name of the column to edit * @param descriptor new column descriptor * @throws IOException e */ - public void modifyColumn(final byte [] tableName, final byte [] columnName, - HColumnDescriptor descriptor) + public void modifyColumn(final byte [] tableName, HColumnDescriptor descriptor) throws IOException; @@ -110,12 +107,11 @@ public interface HMasterInterface extend * Modify a table's metadata * * @param tableName table to modify - * @param op the operation to do - * @param args arguments for operation + * @param htd new descriptor for table * @throws IOException e */ - public void modifyTable(byte[] tableName, HConstants.Modify op, Writable[] args) - throws IOException; + public void modifyTable(byte[] tableName, HTableDescriptor htd) + throws IOException; /** * Shutdown an HBase cluster. @@ -124,8 +120,32 @@ public interface HMasterInterface extend public void shutdown() throws IOException; /** + * Stop HBase Master only. + * Does not shutdown the cluster. + * @throws IOException e + */ + public void stopMaster() throws IOException; + + /** * Return cluster status. * @return status object */ public ClusterStatus getClusterStatus(); -} + + + /** + * Move the region r to dest. + * @param encodedRegionName The encoded region name. + * @param destServerName The servername of the destination regionserver + * @throws UnknownRegionException Thrown if we can't find a region named + * encodedRegionName + */ + public void move(final byte [] encodedRegionName, final byte [] destServerName) + throws UnknownRegionException; + + /** + * @param b If true, enable balancer. If false, disable balancer. + * @return Previous balancer value + */ + public boolean balance(final boolean b); +} \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=991397&r1=991396&r2=991397&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Aug 31 23:51:44 2010 @@ -19,31 +19,31 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.MultiAction; -import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.client.MultiPutResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import java.io.IOException; -import java.util.List; - /** * Clients interact with HRegionServers using a handle to the HRegionInterface. * *

    NOTE: if you change the interface, you must change the RPC version * number in HBaseRPCProtocolVersion */ -public interface HRegionInterface extends HBaseRPCProtocolVersion { +public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Abortable { /** * Get metainfo about an HRegion * @@ -70,12 +70,6 @@ public interface HRegionInterface extend throws IOException; /** - * - * @return the regions served by this regionserver - */ - public HRegion [] getOnlineRegionsAsArray(); - - /** * Perform Get operation. * @param regionName name of region to get from * @param get Get operation @@ -260,11 +254,10 @@ public interface HRegionInterface extend /** - * Method used when a master is taking the place of another failed one. - * @return All regions assigned on this region server + * @return All regions online on this region server * @throws IOException e */ - public HRegionInfo[] getRegionsAssignment() throws IOException; + public NavigableSet getOnlineRegions(); /** * Method used when a master is taking the place of another failed one. @@ -273,13 +266,6 @@ public interface HRegionInterface extend */ public HServerInfo getHServerInfo() throws IOException; - /** - * Method used for doing multiple actions(Deletes, Gets and Puts) in one call - * @param multi - * @return MultiResult - * @throws IOException - */ - public MultiResponse multi(MultiAction multi) throws IOException; /** * Multi put for putting multiple regions worth of puts at once. @@ -296,6 +282,60 @@ public interface HRegionInterface extend public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) throws IOException; + // Master methods + + /** + * Opens the specified region. + * @param region region to open + */ + public void openRegion(final HRegionInfo region); + + /** + * Closes the specified region. + * @param region region to close + * @return true if closing region, false if not + */ + public boolean closeRegion(final HRegionInfo region) + throws NotServingRegionException; + + // Region administrative methods + + /** + * Flushes the MemStore of the specified region. + *

    + * This method is synchronous. + * @param regionInfo region to flush + * @throws NotServingRegionException + * @throws IOException + */ + void flushRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException; + + /** + * Splits the specified region. + *

    + * This method currently flushes the region and then forces a compaction which + * will then trigger a split. The flush is done synchronously but the + * compaction is asynchronous. + * @param regionInfo region to split + * @throws NotServingRegionException + * @throws IOException + */ + void splitRegion(HRegionInfo regionInfo) + throws NotServingRegionException, IOException; + + /** + * Compacts the specified region. Performs a major compaction if specified. + *

    + * This method is asynchronous. + * @param regionInfo region to compact + * @param major true to force major compaction + * @throws NotServingRegionException + * @throws IOException + */ + void compactRegion(HRegionInfo regionInfo, boolean major) + throws NotServingRegionException, IOException; + /** * Replicates the given entries. The guarantee is that the given entries * will be durable on the slave cluster if this method returns without @@ -306,5 +346,4 @@ public interface HRegionInterface extend * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException; - } Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=991397&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Tue Aug 31 23:51:44 2010 @@ -0,0 +1,173 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Handles everything on master-side related to master election. + * + *

    Listens and responds to ZooKeeper notifications on the master znode, + * both nodeCreated and nodeDeleted. + * + *

    Contains blocking methods which will hold up backup masters, waiting + * for the active master to fail. + * + *

    This class is instantiated in the HMaster constructor and the method + * {@link #blockUntilBecomingActiveMaster()} is called to wait until becoming + * the active master of the cluster. + */ +class ActiveMasterManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class); + + final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); + + private final HServerAddress address; + private final Server master; + + ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address, + Server master) { + super(watcher); + this.address = address; + this.master = master; + } + + @Override + public void nodeCreated(String path) { + if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) { + handleMasterNodeChange(); + } + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) { + handleMasterNodeChange(); + } + } + + /** + * Handle a change in the master node. Doesn't matter whether this was called + * from a nodeCreated or nodeDeleted event because there are no guarantees + * that the current state of the master node matches the event at the time of + * our next ZK request. + * + *

    Uses the watchAndCheckExists method which watches the master address node + * regardless of whether it exists or not. If it does exist (there is an + * active master), it returns true. Otherwise it returns false. + * + *

    A watcher is set which guarantees that this method will get called again if + * there is another change in the master node. + */ + private void handleMasterNodeChange() { + // Watch the node and check if it exists. + try { + synchronized(clusterHasActiveMaster) { + if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) { + // A master node exists, there is an active master + LOG.debug("A master is now available"); + clusterHasActiveMaster.set(true); + } else { + // Node is no longer there, cluster does not have an active master + LOG.debug("No master available. notifying waiting threads"); + clusterHasActiveMaster.set(false); + // Notify any thread waiting to become the active master + clusterHasActiveMaster.notifyAll(); + } + } + } catch (KeeperException ke) { + master.abort("Received an unexpected KeeperException, aborting", ke); + } + } + + /** + * Block until becoming the active master. + * + * Method blocks until there is not another active master and our attempt + * to become the new active master is successful. + * + * This also makes sure that we are watching the master znode so will be + * notified if another master dies. + * @return False if we did not start up this cluster, another + * master did, or if a problem (zookeeper, stop flag has been set on this + * Master) + */ + boolean blockUntilBecomingActiveMaster() { + boolean thisMasterStartedCluster = true; + // Try to become the active master, watch if there is another master + try { + if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode, + address)) { + // We are the master, return + clusterHasActiveMaster.set(true); + return thisMasterStartedCluster; + } + } catch (KeeperException ke) { + master.abort("Received an unexpected KeeperException, aborting", ke); + return false; + } + // There is another active master, this is not a cluster startup + // and we must wait until the active master dies + LOG.info("Another master is already the active master, waiting to become " + + "the next active master"); + clusterHasActiveMaster.set(true); + thisMasterStartedCluster = false; + synchronized(clusterHasActiveMaster) { + while(clusterHasActiveMaster.get() && !master.isStopped()) { + try { + clusterHasActiveMaster.wait(); + } catch (InterruptedException e) { + // We expect to be interrupted when a master dies, will fall out if so + LOG.debug("Interrupted waiting for master to die", e); + } + } + if(master.isStopped()) { + return thisMasterStartedCluster; + } + // Try to become active master again now that there is no active master + blockUntilBecomingActiveMaster(); + } + return thisMasterStartedCluster; + } + + public void stop() { + try { + // If our address is in ZK, delete it on our way out + HServerAddress zkAddress = + ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + // TODO: redo this to make it atomic (only added for tests) + if(zkAddress != null && + zkAddress.equals(address)) { + ZKUtil.deleteNode(watcher, watcher.masterAddressZNode); + } + } catch (KeeperException e) { + watcher.error("Error deleting our own master address node", e); + } + } +} \ No newline at end of file Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=991397&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Tue Aug 31 23:51:44 2010 @@ -0,0 +1,1188 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.catalog.RootLocationEditor; +import org.apache.hadoop.hbase.client.MetaScanner; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.RegionTransitionData; +import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; +import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler; +import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKTableDisable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.io.Writable; +import org.apache.zookeeper.KeeperException; + +/** + * Manages and performs region assignment. + *

    + * Monitors ZooKeeper for events related to regions in transition. + *

    + * Handles existing regions in transition during master failover. + */ +public class AssignmentManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(AssignmentManager.class); + + protected Server master; + + private ServerManager serverManager; + + private CatalogTracker catalogTracker; + + private TimeoutMonitor timeoutMonitor; + + /** Regions currently in transition. */ + private final Map regionsInTransition = + new TreeMap(); + + /** Plans for region movement. */ + // TODO: When do plans get cleaned out? Ever? + // Its cleaned on server shutdown processing -- St.Ack + private final Map regionPlans = + new TreeMap(); + + /** Set of tables that have been disabled. */ + private final Set disabledTables = + Collections.synchronizedSet(new HashSet()); + + /** + * Server to regions assignment map. + * Contains the set of regions currently assigned to a given server. + * This Map and {@link #regions} are tied. Always update this in tandem + * with the other under a lock on {@link #regions} + * @see #regions + */ + private final NavigableMap> servers = + new TreeMap>(); + + /** + * Region to server assignment map. + * Contains the server a given region is currently assigned to. + * This Map and {@link #servers} are tied. Always update this in tandem + * with the other under a lock on {@link #regions} + * @see #servers + */ + private final SortedMap regions = + new TreeMap(); + + private final ReentrantLock assignLock = new ReentrantLock(); + + private final ExecutorService executorService; + + /** + * Constructs a new assignment manager. + * + *

    This manager must be started with {@link #start()}. + * + * @param status master status + * @param serverManager + * @param catalogTracker + * @param service + */ + public AssignmentManager(Server master, ServerManager serverManager, + CatalogTracker catalogTracker, final ExecutorService service) { + super(master.getZooKeeper()); + this.master = master; + this.serverManager = serverManager; + this.catalogTracker = catalogTracker; + this.executorService = service; + Configuration conf = master.getConfiguration(); + this.timeoutMonitor = new TimeoutMonitor( + conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000), + master, + conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000)); + Threads.setDaemonThreadRunning(timeoutMonitor, + master.getServerName() + ".timeoutMonitor"); + } + + /** + * Reset all unassigned znodes. Called on startup of master. + * Call {@link #assignAllUserRegions()} after root and meta have been assigned. + * @throws IOException + * @throws KeeperException + */ + void cleanoutUnassigned() throws IOException, KeeperException { + // Cleanup any existing ZK nodes and start watching + ZKAssign.deleteAllNodes(watcher); + ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.assignmentZNode); + } + + /** + * Handle failover. Restore state from META and ZK. Handle any regions in + * transition. + * @throws KeeperException + * @throws IOException + */ + void processFailover() throws KeeperException, IOException { + // Concurrency note: In the below the accesses on regionsInTransition are + // outside of a synchronization block where usually all accesses to RIT are + // synchronized. The presumption is that in this case it is safe since this + // method is being played by a single thread on startup. + + // Scan META to build list of existing regions, servers, and assignment + rebuildUserRegions(); + // Pickup any disabled tables + rebuildDisabledTables(); + // Check existing regions in transition + List nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher, + watcher.assignmentZNode); + if(nodes.isEmpty()) { + LOG.info("No regions in transition in ZK to process on failover"); + return; + } + LOG.info("Failed-over master needs to process " + nodes.size() + + " regions in transition"); + for(String regionName : nodes) { + RegionTransitionData data = ZKAssign.getData(watcher, regionName); + HRegionInfo regionInfo = + MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst(); + String encodedName = regionInfo.getEncodedName(); + switch(data.getEventType()) { + case RS2ZK_REGION_CLOSING: + // Just insert region into RIT. + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.CLOSING, + data.getStamp())); + break; + + case RS2ZK_REGION_CLOSED: + // Region is closed, insert into RIT and handle it + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.CLOSED, + data.getStamp())); + new ClosedRegionHandler(master, this, data, regionInfo).process(); + break; + + case RS2ZK_REGION_OPENING: + // Just insert region into RIT + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.OPENING, + data.getStamp())); + break; + + case RS2ZK_REGION_OPENED: + // Region is opened, insert into RIT and handle it + regionsInTransition.put(encodedName, + new RegionState(regionInfo, RegionState.State.OPENING, + data.getStamp())); + new OpenedRegionHandler(master, this, data, regionInfo, + serverManager.getServerInfo(data.getServerName())).process(); + break; + } + } + } + + /** + * Handles various states an unassigned node can be in. + *

    + * Method is called when a state change is suspected for an unassigned node. + *

    + * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING + * yet). + * @param data + */ + private void handleRegion(RegionTransitionData data) { + synchronized(regionsInTransition) { + // Verify this is a known server + if(!serverManager.isServerOnline(data.getServerName())) { + LOG.warn("Attempted to handle region transition for server " + + data.getServerName() + " but server is not online"); + return; + } + String encodedName = HRegionInfo.encodeRegionName(data.getRegionName()); + String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName); + LOG.debug("Handling transition=" + data.getEventType() + + ", server=" + data.getServerName() + ", region=" + prettyPrintedRegionName); + RegionState regionState = regionsInTransition.get(encodedName); + switch(data.getEventType()) { + + case RS2ZK_REGION_CLOSING: + // Should see CLOSING after we have asked it to CLOSE or additional + // times after already being in state of CLOSING + if (regionState == null || + (!regionState.isPendingClose() && !regionState.isClosing())) { + LOG.warn("Received CLOSING for region " + prettyPrintedRegionName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_CLOSE or CLOSING states"); + return; + } + // Transition to CLOSING (or update stamp if already CLOSING) + regionState.update(RegionState.State.CLOSING, data.getStamp()); + break; + + case RS2ZK_REGION_CLOSED: + // Should see CLOSED after CLOSING but possible after PENDING_CLOSE + if (regionState == null || + (!regionState.isPendingClose() && !regionState.isClosing())) { + LOG.warn("Received CLOSED for region " + prettyPrintedRegionName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_CLOSE or CLOSING states"); + return; + } + // Handle CLOSED by assigning elsewhere or stopping if a disable + // If we got here all is good. Need to update RegionState -- else + // what follows will fail because not in expected state. + regionState.update(RegionState.State.CLOSED, data.getStamp()); + this.executorService.submit(new ClosedRegionHandler(master, + this, data, regionState.getRegion())); + break; + + case RS2ZK_REGION_OPENING: + // Should see OPENING after we have asked it to OPEN or additional + // times after already being in state of OPENING + if(regionState == null || + (!regionState.isPendingOpen() && !regionState.isOpening())) { + LOG.warn("Received OPENING for region " + + prettyPrintedRegionName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_OPEN or OPENING states"); + return; + } + // Transition to OPENING (or update stamp if already OPENING) + regionState.update(RegionState.State.OPENING, data.getStamp()); + break; + + case RS2ZK_REGION_OPENED: + // Should see OPENED after OPENING but possible after PENDING_OPEN + if(regionState == null || + (!regionState.isPendingOpen() && !regionState.isOpening())) { + LOG.warn("Received OPENED for region " + + prettyPrintedRegionName + + " from server " + data.getServerName() + " but region was in " + + " the state " + regionState + " and not " + + "in expected PENDING_OPEN or OPENING states"); + return; + } + // Handle OPENED by removing from transition and deleted zk node + this.executorService.submit( + new OpenedRegionHandler(master, this, data, regionState.getRegion(), + this.serverManager.getServerInfo(data.getServerName()))); + break; + } + } + } + + // ZooKeeper events + + /** + * New unassigned node has been created. + * + *

    This happens when an RS begins the OPENING or CLOSING of a region by + * creating an unassigned node. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further events
    2. + *
    3. Read and handle the state in the node
    4. + *
    + */ + @Override + public void nodeCreated(String path) { + if(path.startsWith(watcher.assignmentZNode)) { + synchronized(regionsInTransition) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + master.abort("Unexpected ZK exception reading unassigned node data", e); + } + } + } + } + + /** + * Existing unassigned node has had data changed. + * + *

    This happens when an RS transitions from OFFLINE to OPENING, or between + * OPENING/OPENED and CLOSING/CLOSED. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further events
    2. + *
    3. Read and handle the state in the node
    4. + *
    + */ + @Override + public void nodeDataChanged(String path) { + if(path.startsWith(watcher.assignmentZNode)) { + synchronized(regionsInTransition) { + try { + RegionTransitionData data = ZKAssign.getData(watcher, path); + if(data == null) { + return; + } + handleRegion(data); + } catch (KeeperException e) { + master.abort("Unexpected ZK exception reading unassigned node data", e); + } + } + } + } + + /** + * New unassigned node has been created. + * + *

    This happens when an RS begins the OPENING or CLOSING of a region by + * creating an unassigned node. + * + *

    When this happens we must: + *

      + *
    1. Watch the node for further children changed events
    2. + *
    3. Watch all new children for changed events
    4. + *
    5. Read all children and handle them
    6. + *
    + */ + @Override + public void nodeChildrenChanged(String path) { + if(path.equals(watcher.assignmentZNode)) { + synchronized(regionsInTransition) { + try { + List newNodes = ZKUtil.watchAndGetNewChildren(watcher, + watcher.assignmentZNode); + for(NodeAndData newNode : newNodes) { + LOG.debug("Handling new unassigned node: " + newNode); + handleRegion(RegionTransitionData.fromBytes(newNode.getData())); + } + } catch(KeeperException e) { + master.abort("Unexpected ZK exception reading unassigned children", e); + } + } + } + } + + /** + * Marks the region as online. Removes it from regions in transition and + * updates the in-memory assignment information. + *

    + * Used when a region has been successfully opened on a region server. + * @param regionInfo + * @param serverInfo + */ + public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) { + synchronized(regionsInTransition) { + regionsInTransition.remove(regionInfo.getEncodedName()); + regionsInTransition.notifyAll(); + } + synchronized(regions) { + regions.put(regionInfo, serverInfo); + addToServers(serverInfo, regionInfo); + } + } + + /** + * Marks the region as offline. Removes it from regions in transition and + * removes in-memory assignment information. + *

    + * Used when a region has been closed and should remain closed. + * @param regionInfo + * @param serverInfo + */ + public void regionOffline(HRegionInfo regionInfo) { + synchronized(regionsInTransition) { + regionsInTransition.remove(regionInfo.getEncodedName()); + regionsInTransition.notifyAll(); + } + synchronized(regions) { + HServerInfo serverInfo = regions.remove(regionInfo); + List serverRegions = servers.get(serverInfo); + serverRegions.remove(regionInfo); + } + } + + /** + * Sets the region as offline by removing in-memory assignment information but + * retaining transition information. + *

    + * Used when a region has been closed but should be reassigned. + * @param regionInfo + */ + public void setOffline(HRegionInfo regionInfo) { + synchronized(regions) { + HServerInfo serverInfo = regions.remove(regionInfo); + List serverRegions = servers.get(serverInfo); + serverRegions.remove(regionInfo); + } + } + + // Assignment methods + + /** + * Assigns the specified region. + *

    + * If a RegionPlan is available with a valid destination then it will be used + * to determine what server region is assigned to. If no RegionPlan is + * available, region will be assigned to a random available server. + *

    + * Updates the RegionState and sends the OPEN RPC. + *

    + * This will only succeed if the region is in transition and in a CLOSED or + * OFFLINE state or not in transition (in-memory not zk), and of course, the + * chosen server is up and running (It may have just crashed!). If the + * in-memory checks pass, the zk node is forced to OFFLINE before assigning. + * + * @param regionName server to be assigned + */ + public void assign(HRegionInfo region) { + LOG.debug("Starting assignment for region " + region.getRegionNameAsString()); + // Grab the state of this region and synchronize on it + String encodedName = region.getEncodedName(); + RegionState state; + // This assignLock is used bridging the two synchronization blocks. Once + // we've made it into the 'state' synchronization block, then we can let + // go of this lock. There must be a better construct that this -- St.Ack 20100811 + this.assignLock.lock(); + try { + synchronized(regionsInTransition) { + state = regionsInTransition.get(encodedName); + if(state == null) { + state = new RegionState(region, RegionState.State.OFFLINE); + regionsInTransition.put(encodedName, state); + } + } + synchronized(state) { + this.assignLock.unlock(); + assign(state); + } + } finally { + if (this.assignLock.isHeldByCurrentThread()) this.assignLock.unlock(); + } + } + + /** + * Caller must hold lock on the passed state object. + * @param state + */ + private void assign(final RegionState state) { + if(!state.isClosed() && !state.isOffline()) { + LOG.info("Attempting to assign region but it is in transition and in " + + "an unexpected state:" + state); + return; + } else { + state.update(RegionState.State.OFFLINE); + } + try { + if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), + state.getRegion(), master.getServerName())) { + LOG.warn("Attempted to create/force node into OFFLINE state before " + + "completing assignment but failed to do so"); + return; + } + } catch (KeeperException e) { + master.abort("Unexpected ZK exception creating/setting node OFFLINE", e); + return; + } + // Pickup existing plan or make a new one + String encodedName = state.getRegion().getEncodedName(); + RegionPlan plan; + synchronized(regionPlans) { + plan = regionPlans.get(encodedName); + if (plan == null) { + LOG.debug("No previous transition plan for " + + state.getRegion().getRegionNameAsString() + + " so generating a random one from " + serverManager.numServers() + + " ( " + serverManager.getOnlineServers().size() + ") available servers"); + plan = new RegionPlan(state.getRegion(), null, + LoadBalancer.randomAssignment(serverManager.getOnlineServersList())); + regionPlans.put(encodedName, plan); + } + } + try { + // Send OPEN RPC. This can fail if the server on other end is is not up. + serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); + // Transition RegionState to PENDING_OPEN + state.update(RegionState.State.PENDING_OPEN); + } catch (Throwable t) { + LOG.warn("Failed assignment of " + + state.getRegion().getRegionNameAsString() + " to " + + plan.getDestination(), t); + // Clean out plan we failed execute and one that doesn't look like it'll + // succeed anyways; we need a new plan! + synchronized(regionPlans) { + this.regionPlans.remove(encodedName); + } + } + } + + /** + * Unassigns the specified region. + *

    + * Updates the RegionState and sends the CLOSE RPC. + *

    + * If a RegionPlan is already set, it will remain. If this is being used + * to disable a table, be sure to use {@link #disableTable(String)} to ensure + * regions are not onlined after being closed. + * + * @param regionName server to be unassigned + */ + public void unassign(HRegionInfo region) { + LOG.debug("Starting unassignment of region " + + region.getRegionNameAsString() + " (offlining)"); + // Check if this region is currently assigned + if (!regions.containsKey(region)) { + LOG.debug("Attempted to unassign region " + region.getRegionNameAsString() + + " but it is not " + + "currently assigned anywhere"); + return; + } + String encodedName = region.getEncodedName(); + // Grab the state of this region and synchronize on it + RegionState state; + synchronized(regionsInTransition) { + state = regionsInTransition.get(encodedName); + if(state == null) { + state = new RegionState(region, RegionState.State.PENDING_CLOSE); + regionsInTransition.put(encodedName, state); + } else { + LOG.debug("Attempting to unassign region " + + region.getRegionNameAsString() + " but it is " + + "already in transition (" + state.getState() + ")"); + return; + } + } + // Send CLOSE RPC + try { + serverManager.sendRegionClose(regions.get(region), state.getRegion()); + } catch (NotServingRegionException e) { + LOG.warn("Attempted to close region " + region.getRegionNameAsString() + + " but got an NSRE", e); + } + } + + /** + * Waits until the specified region has completed assignment. + *

    + * If the region is already assigned, returns immediately. Otherwise, method + * blocks until the region is assigned. + * @param regionInfo region to wait on assignment for + * @throws InterruptedException + */ + public void waitForAssignment(HRegionInfo regionInfo) + throws InterruptedException { + synchronized(regions) { + while(!regions.containsKey(regionInfo)) { + regions.wait(); + } + } + } + + /** + * Assigns the ROOT region. + *

    + * Assumes that ROOT is currently closed and is not being actively served by + * any RegionServer. + *

    + * Forcibly unsets the current root region location in ZooKeeper and assigns + * ROOT to a random RegionServer. + * @throws KeeperException + */ + public void assignRoot() throws KeeperException { + RootLocationEditor.deleteRootLocation(this.master.getZooKeeper()); + assign(HRegionInfo.ROOT_REGIONINFO); + } + + /** + * Assigns the META region. + *

    + * Assumes that META is currently closed and is not being actively served by + * any RegionServer. + *

    + * Forcibly assigns META to a random RegionServer. + */ + public void assignMeta() { + // Force assignment to a random server + assign(HRegionInfo.FIRST_META_REGIONINFO); + } + + /** + * Assigns all user regions, if any exist. Used during cluster startup. + *

    + * This is a synchronous call and will return once every region has been + * assigned. If anything fails, an exception is thrown and the cluster + * should be shutdown. + */ + public void assignAllUserRegions() throws IOException { + // First experiment at synchronous assignment + // Simpler because just wait for no regions in transition + + // Scan META for all user regions + List allRegions = + MetaScanner.listAllRegions(master.getConfiguration()); + if (allRegions == null || allRegions.isEmpty()) { + return; + } + + // Get all available servers + List servers = serverManager.getOnlineServersList(); + + LOG.info("Assigning " + allRegions.size() + " across " + servers.size() + + " servers"); + + // Generate a cluster startup region placement plan + Map> bulkPlan = + LoadBalancer.bulkAssignment(allRegions, servers); + + // For each server, create OFFLINE nodes and send OPEN RPCs + for(Map.Entry> entry : bulkPlan.entrySet()) { + HServerInfo server = entry.getKey(); + List regions = entry.getValue(); + LOG.debug("Assigning " + regions.size() + " regions to " + server); + for(HRegionInfo region : regions) { + LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + server); + String regionName = region.getEncodedName(); + RegionPlan plan = new RegionPlan(region, null,server); + regionPlans.put(regionName, plan); + assign(region); + } + } + + // Wait for no regions to be in transition + try { + waitUntilNoRegionsInTransition(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for regions to be assigned", e); + throw new IOException(e); + } + + LOG.info("All user regions have been assigned"); + } + + private void rebuildUserRegions() throws IOException { + Map allRegions = + MetaReader.fullScan(catalogTracker); + for(Map.Entry region : allRegions.entrySet()) { + HServerAddress regionLocation = region.getValue(); + HRegionInfo regionInfo = region.getKey(); + if(regionLocation == null) { + regions.put(regionInfo, null); + continue; + } + HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation); + regions.put(regionInfo, serverInfo); + addToServers(serverInfo, regionInfo); + } + } + + /* + * Presumes caller has taken care of necessary locking modifying servers Map. + * @param hsi + * @param hri + */ + private void addToServers(final HServerInfo hsi, final HRegionInfo hri) { + List hris = servers.get(hsi); + if (hris == null) { + hris = new ArrayList(); + servers.put(hsi, hris); + } + hris.add(hri); + } + + /** + * Blocks until there are no regions in transition. It is possible that there + * are regions in transition immediately after this returns but guarantees + * that if it returns without an exception that there was a period of time + * with no regions in transition from the point-of-view of the in-memory + * state of the Master. + * @throws InterruptedException + */ + public void waitUntilNoRegionsInTransition() throws InterruptedException { + synchronized(regionsInTransition) { + while(regionsInTransition.size() > 0) { + regionsInTransition.wait(); + } + } + } + + /** + * @return A copy of the Map of regions currently in transition. + */ + public NavigableMap getRegionsInTransition() { + return new TreeMap(this.regionsInTransition); + } + + /** + * @return True if regions in transition. + */ + public boolean isRegionsInTransition() { + return !this.regionsInTransition.isEmpty(); + } + + /** + * Checks if the specified table has been disabled by the user. + * @param tableName + * @return + */ + public boolean isTableDisabled(String tableName) { + synchronized(disabledTables) { + return disabledTables.contains(tableName); + } + } + + /** + * Checks if the table of the specified region has been disabled by the user. + * @param regionName + * @return + */ + public boolean isTableOfRegionDisabled(byte [] regionName) { + return isTableDisabled(Bytes.toString( + HRegionInfo.getTableName(regionName))); + } + + /** + * Sets the specified table to be disabled. + * @param tableName table to be disabled + */ + public void disableTable(String tableName) { + synchronized(disabledTables) { + if(!isTableDisabled(tableName)) { + disabledTables.add(tableName); + try { + ZKTableDisable.disableTable(master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as disabled", e); + } + } + } + } + + /** + * Unsets the specified table from being disabled. + *

    + * This operation only acts on the in-memory + * @param tableName table to be undisabled + */ + public void undisableTable(String tableName) { + synchronized(disabledTables) { + if(isTableDisabled(tableName)) { + disabledTables.remove(tableName); + try { + ZKTableDisable.undisableTable(master.getZooKeeper(), tableName); + } catch (KeeperException e) { + LOG.warn("ZK error setting table as disabled", e); + } + } + } + } + + /** + * Rebuild the set of disabled tables from zookeeper. Used during master + * failover. + */ + private void rebuildDisabledTables() { + synchronized(disabledTables) { + List disabledTables; + try { + disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper()); + } catch (KeeperException e) { + LOG.warn("ZK error getting list of disabled tables", e); + return; + } + if(!disabledTables.isEmpty()) { + LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " + + "tables from zookeeper"); + disabledTables.addAll(disabledTables); + } + } + } + + /** + * Gets the online regions of the specified table. + * @param tableName + * @return + */ + public List getRegionsOfTable(byte[] tableName) { + List tableRegions = new ArrayList(); + for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo( + new HTableDescriptor(tableName), null, null)).keySet()) { + if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { + tableRegions.add(regionInfo); + } else { + break; + } + } + return tableRegions; + } + + /** + * Unsets the specified table as disabled (enables it). + */ + public class TimeoutMonitor extends Chore { + + private final int timeout; + + /** + * Creates a periodic monitor to check for time outs on region transition + * operations. This will deal with retries if for some reason something + * doesn't happen within the specified timeout. + * @param period + * @param stopper When {@link Stoppable#isStopped()} is true, this thread will + * cleanup and exit cleanly. + * @param timeout + */ + public TimeoutMonitor(final int period, final Stoppable stopper, + final int timeout) { + super("AssignmentTimeoutMonitor", period, stopper); + this.timeout = timeout; + } + + @Override + protected void chore() { + synchronized (regionsInTransition) { + // Iterate all regions in transition checking for time outs + long now = System.currentTimeMillis(); + for (RegionState regionState : regionsInTransition.values()) { + if(regionState.getStamp() + timeout <= now) { + HRegionInfo regionInfo = regionState.getRegion(); + LOG.info("Regions in transition timed out: " + regionState); + // Expired! Do a retry. + switch(regionState.getState()) { + case OFFLINE: + case CLOSED: + LOG.info("Region has been OFFLINE or CLOSED for too long, " + + "reassigning " + regionInfo.getRegionNameAsString()); + assign(regionState.getRegion()); + break; + case PENDING_OPEN: + case OPENING: + LOG.info("Region has been PENDING_OPEN or OPENING for too " + + "long, reassigning " + regionInfo.getRegionNameAsString()); + assign(regionState.getRegion()); + break; + case OPEN: + LOG.warn("Long-running region in OPEN state? This should " + + "not happen"); + break; + case PENDING_CLOSE: + case CLOSING: + LOG.info("Region has been PENDING_CLOSE or CLOSING for too " + + "long, resending close rpc"); + unassign(regionInfo); + break; + } + } + } + } + } + } + + /** + * Process shutdown server removing any assignments. + * @param hsi Server that went down. + */ + public void processServerShutdown(final HServerInfo hsi) { + // Clean out any exisiting assignment plans for this server + synchronized (this.regionPlans) { + for (Iterator > i = + this.regionPlans.entrySet().iterator(); i.hasNext();) { + Map.Entry e = i.next(); + if (e.getValue().getDestination().equals(hsi)) { + // Use iterator's remove else we'll get CME.fail a + i.remove(); + } + } + } + synchronized (regionsInTransition) { + // Iterate all regions in transition checking if were on this server + final String serverName = hsi.getServerName(); + for (Map.Entry e: this.regionsInTransition.entrySet()) { + if (!e.getKey().equals(serverName)) continue; + RegionState regionState = e.getValue(); + switch(regionState.getState()) { + case PENDING_OPEN: + case OPENING: + case OFFLINE: + case CLOSED: + case PENDING_CLOSE: + case CLOSING: + LOG.info("Region " + regionState.getRegion().getRegionNameAsString() + + " was in state=" + regionState.getState() + " on shutdown server=" + + serverName + ", reassigning"); + assign(regionState.getRegion()); + break; + + case OPEN: + LOG.warn("Long-running region in OPEN state? Should not happen"); + break; + } + } + } + } + + /** + * Update inmemory structures. + * @param hsi Server that reported the split + * @param parent Parent region that was split + * @param a Daughter region A + * @param b Daughter region B + */ + public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent, + final HRegionInfo a, final HRegionInfo b) { + synchronized (this.regions) { + checkRegion(hsi, parent, true); + checkRegion(hsi, a, false); + this.regions.put(a, hsi); + this.regions.put(b, hsi); + removeFromServers(hsi, parent, true); + removeFromServers(hsi, a, false); + removeFromServers(hsi, b, false); + addToServers(hsi, a); + addToServers(hsi, b); + } + } + + /* + * Caller must hold locks on regions Map. + * @param hsi + * @param hri + * @param expected + */ + private void checkRegion(final HServerInfo hsi, final HRegionInfo hri, + final boolean expected) { + HServerInfo serverInfo = regions.remove(hri); + if (expected) { + if (serverInfo == null) { + LOG.info("Region not on a server: " + hri.getRegionNameAsString()); + } + } else { + if (serverInfo != null) { + LOG.warn("Region present on " + hsi + "; unexpected"); + } + } + } + + /* + * Caller must hold locks on servers Map. + * @param hsi + * @param hri + * @param expected + */ + private void removeFromServers(final HServerInfo hsi, final HRegionInfo hri, + final boolean expected) { + List serverRegions = this.servers.get(hsi); + boolean removed = serverRegions.remove(hri); + if (expected) { + if (!removed) { + LOG.warn(hri.getRegionNameAsString() + " not found on " + hsi + + "; unexpected"); + } + } else { + if (removed) { + LOG.warn(hri.getRegionNameAsString() + " found on " + hsi + + "; unexpected"); + } + } + } + + /** + * @return A clone of current assignments. Note, this is assignments only. + * If a new server has come in and it has no regions, it will not be included + * in the returned Map. + */ + Map> getAssignments() { + // This is an EXPENSIVE clone. Cloning though is the safest thing to do. + // Can't let out original since it can change and at least the loadbalancer + // wants to iterate this exported list. We need to synchronize on regions + // since all access to this.servers is under a lock on this.regions. + Map> result = null; + synchronized (this.regions) { + result = new HashMap>(this.servers.size()); + for (Map.Entry> e: this.servers.entrySet()) { + List shallowCopy = new ArrayList(e.getValue()); + HServerInfo clone = new HServerInfo(e.getKey()); + // Set into server load the number of regions this server is carrying + // The load balancer calculation needs it at least and its handy. + clone.getLoad().setNumberOfRegions(e.getValue().size()); + result.put(clone, shallowCopy); + } + } + return result; + } + + /** + * @param encodedRegionName Region encoded name. + * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo} + * and the hosting servers {@link HServerInfo}. + */ + Pair getAssignment(final byte [] encodedRegionName) { + String name = Bytes.toString(encodedRegionName); + synchronized(this.regions) { + for (Map.Entry e: this.regions.entrySet()) { + if (e.getKey().getEncodedName().equals(name)) { + return new Pair(e.getKey(), e.getValue()); + } + } + } + return null; + } + + /** + * @param plan Plan to execute. + */ + void balance(final RegionPlan plan) { + synchronized (this.regionPlans) { + this.regionPlans.put(plan.getRegionName(), plan); + } + unassign(plan.getRegionInfo()); + } + + public static class RegionState implements Writable { + private HRegionInfo region; + + public enum State { + OFFLINE, // region is in an offline state + PENDING_OPEN, // sent rpc to server to open but has not begun + OPENING, // server has begun to open but not yet done + OPEN, // server opened region and updated meta + PENDING_CLOSE, // sent rpc to server to close but has not begun + CLOSING, // server has begun to close but not yet done + CLOSED // server closed region and updated meta + } + + private State state; + private long stamp; + + public RegionState() {} + + RegionState(HRegionInfo region, State state) { + this(region, state, System.currentTimeMillis()); + } + + RegionState(HRegionInfo region, State state, long stamp) { + this.region = region; + this.state = state; + this.stamp = stamp; + } + + public void update(State state, long stamp) { + this.state = state; + this.stamp = stamp; + } + + public void update(State state) { + this.state = state; + this.stamp = System.currentTimeMillis(); + } + + public State getState() { + return state; + } + + public long getStamp() { + return stamp; + } + + public HRegionInfo getRegion() { + return region; + } + + public boolean isClosing() { + return state == State.CLOSING; + } + + public boolean isClosed() { + return state == State.CLOSED; + } + + public boolean isPendingClose() { + return state == State.PENDING_CLOSE; + } + + public boolean isOpening() { + return state == State.OPENING; + } + + public boolean isOpened() { + return state == State.OPEN; + } + + public boolean isPendingOpen() { + return state == State.PENDING_OPEN; + } + + public boolean isOffline() { + return state == State.OFFLINE; + } + + @Override + public String toString() { + return region.getRegionNameAsString() + " state=" + state + + ", ts=" + stamp; + } + + @Override + public void readFields(DataInput in) throws IOException { + region = new HRegionInfo(); + region.readFields(in); + state = State.valueOf(in.readUTF()); + stamp = in.readLong(); + } + + @Override + public void write(DataOutput out) throws IOException { + region.write(out); + out.writeUTF(state.name()); + out.writeLong(stamp); + } + } +} Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java?rev=991397&view=auto ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (added) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java Tue Aug 31 23:51:44 2010 @@ -0,0 +1,133 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.HServerInfo; + +/** + * Class to hold dead servers list and utility querying dead server list. + */ +public class DeadServer implements Set { + /** + * Set of known dead servers. On znode expiration, servers are added here. + * This is needed in case of a network partitioning where the server's lease + * expires, but the server is still running. After the network is healed, + * and it's server logs are recovered, it will be told to call server startup + * because by then, its regions have probably been reassigned. + */ + private final Set deadServers = new HashSet(); + + + /** + * @param serverName + * @return true if server is dead + */ + public boolean isDeadServer(final String serverName) { + return isDeadServer(serverName, false); + } + + /** + * @param serverName Servername as either host:port or + * host,port,startcode. + * @param hostAndPortOnly True if serverName is host and + * port only (host:port) and if so, then we do a prefix compare + * (ignoring start codes) looking for dead server. + * @return true if server is dead + */ + boolean isDeadServer(final String serverName, final boolean hostAndPortOnly) { + return HServerInfo.isServer(this, serverName, hostAndPortOnly); + } + + public synchronized Set clone() { + Set clone = new HashSet(this.deadServers.size()); + clone.addAll(this.deadServers); + return clone; + } + + public synchronized int size() { + return deadServers.size(); + } + + public synchronized boolean isEmpty() { + return deadServers.isEmpty(); + } + + public synchronized boolean contains(Object o) { + return deadServers.contains(o); + } + + public Iterator iterator() { + return this.deadServers.iterator(); + } + + public synchronized Object[] toArray() { + return deadServers.toArray(); + } + + public synchronized T[] toArray(T[] a) { + return deadServers.toArray(a); + } + + public synchronized boolean add(String e) { + return deadServers.add(e); + } + + public synchronized boolean remove(Object o) { + return deadServers.remove(o); + } + + public synchronized boolean containsAll(Collection c) { + return deadServers.containsAll(c); + } + + public synchronized boolean addAll(Collection c) { + return deadServers.addAll(c); + } + + public synchronized boolean retainAll(Collection c) { + return deadServers.retainAll(c); + } + + public synchronized boolean removeAll(Collection c) { + return deadServers.removeAll(c); + } + + public synchronized void clear() { + throw new NotImplementedException(); + } + + public synchronized boolean equals(Object o) { + return deadServers.equals(o); + } + + public synchronized int hashCode() { + return deadServers.hashCode(); + } + + public synchronized String toString() { + return this.deadServers.toString(); + } +} \ No newline at end of file