hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1301790 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/mas...
Date Fri, 16 Mar 2012 22:09:13 GMT
Author: mbautin
Date: Fri Mar 16 22:09:11 2012
New Revision: 1301790

URL: http://svn.apache.org/viewvc?rev=1301790&view=rev
Log:
[master] Integrate the favoredNodes into the new assignment manager

Summary:
There are several changes in the revision. All these changes has been tested on the dev cluster.

1) Master only reads the assignment plan (region -> favored node) from META region. It will keep itself updated by the base scanner. The assignment manager and new load balancer will assign regions according to on the assignment plan.

2) RegionPlacement tool can be run as a separate process or inside of the Master. In V1, the RegionPlacement will run independent.

2.1) The main job for the RegionPlacement tool is to generate new assignment plan based on the current locality information, region assignment and online region servers.

2.2) Also the tool can update the META with the ANY assignment plan and update Region Server with its corresponding assignment plan.  The assignment plan could be generated by the tool or manually customized.

2.3) In addition, the tool can support light-wight assignment verification such as printing the current assignment plan and verify the consistency between the actual region assignment and the assignment plan,  such as the number of regions running on the primary region server.
In reality, If all the region servers are live, all the user regions are supposed to be assigned to the primary region server only.

3) Write a unit test to cover most of the use cases.

3.1) In the normal cases, all user regions will be assigned to the primary region server.

3.2) The second case is the region placement tool can correctly update the META and Region Server with new assignment plan. Also master can correctly assign these regions based on the new assignment plan from the META region. The number of region movements is expected and all the regions are still running on the primary region server.

3.3) The last case is if any region servers, including region server hosting META or ROOT, is dead, the number of region movements and the number of regions running are the primary region server are still expected. All the regions from the dead region server will be assigned to the secondary or tertiary.

Test Plan:
1) Tested on dev cluster
2) Testing in the migrate002 cluster

Reviewers: kannan, kranganathan

Reviewed By: kranganathan

Differential Revision: https://phabricator.fb.com/D407485

Task ID: 713834

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacement.java
      - copied, changed from r1301789, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/RegionPlacement.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestRegionPlacement.java
Removed:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/PreferredAssignmentManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/RegionPlacement.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Result.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/MetaRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementPolicy.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ThrottledRegionReopener.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/metrics/MasterMetrics.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Mar 16 22:09:11 2012
@@ -455,6 +455,9 @@ public final class HConstants {
 
   public static final int REGION_SERVER_MSG_INTERVAL = 1 * 1000;
 
+  /** The number of favored nodes for each region */
+  public static final int FAVORED_NODES_NUM = 3;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerAddress.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerAddress.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HServerAddress.java Fri Mar 16 22:09:11 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.WritableComp
  */
 public class HServerAddress implements WritableComparable<HServerAddress> {
   private InetSocketAddress address;
-  String stringValue;
+  private String stringValue;
 
   public HServerAddress() {
     this.address = null;
@@ -46,8 +46,7 @@ public class HServerAddress implements W
    */
   public HServerAddress(InetSocketAddress address) {
     this.address = address;
-    this.stringValue = address.getAddress().getHostAddress() + ":" +
-      address.getPort();
+    this.stringValue = getHostAddressWithPort();
     checkBindAddressCanBeResolved();
   }
 
@@ -62,7 +61,7 @@ public class HServerAddress implements W
     String host = hostAndPort.substring(0, colonIndex);
     int port = Integer.parseInt(hostAndPort.substring(colonIndex + 1));
     this.address = new InetSocketAddress(host, port);
-    this.stringValue = hostAndPort;
+    this.stringValue = getHostAddressWithPort();
     checkBindAddressCanBeResolved();
   }
 
@@ -88,6 +87,27 @@ public class HServerAddress implements W
     checkBindAddressCanBeResolved();
   }
 
+  /**
+   * Get the normalized hostAddress:port as a string format
+   * @param address
+   * @return the normalized hostAddress:port as a string format
+   */
+  public String getHostAddressWithPort() {
+    if (address == null) return null;
+    return this.getBindAddress() + ":" + address.getPort();
+  }
+
+  /**
+   * Get the normalized hostName:port as a string format
+   * @param address
+   * @return the normalized hostName:port as a string format
+   */
+  public String getHostNameWithPort() {
+    if (address == null) return null;
+    return address.getHostName() + ":" +
+      address.getPort();
+  }
+
   /** @return Bind address */
   public String getBindAddress() {
     final InetAddress addr = address.getAddress();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Result.java Fri Mar 16 22:09:11 2012
@@ -20,13 +20,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -38,6 +31,13 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
 /**
  * Single row result of a {@link Get} or {@link Scan} query.<p>
  *
@@ -320,6 +320,17 @@ public class Result implements Writable,
     return entry == null? null: entry.getValue();
   }
 
+  /**
+   * Get the latest time stamp.
+   * @param family family name
+   * @param qualifier column qualifier
+   * @return the latest time stamp
+   */
+  public long getLastestTimeStamp(byte [] family, byte [] qualifier) {
+    Map.Entry<Long,byte[]> entry = getKeyValue(family, qualifier);
+    return entry == null? Long.MIN_VALUE: entry.getKey();
+  }
+
   private Map.Entry<Long,byte[]> getKeyValue(byte[] family, byte[] qualifier) {
     if(this.familyMap == null) {
       getMap();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Mar 16 22:09:11 2012
@@ -23,9 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -45,23 +43,41 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiPut;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.MultiPutResponse;
-import org.apache.hadoop.hbase.client.MultiPut;
-import org.apache.hadoop.hbase.filter.*;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.DependentColumnFilter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SkipFilter;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
 
 /**
  * This is a customized version of the polymorphic hadoop
@@ -180,6 +196,9 @@ public class HbaseObjectWritable impleme
     // Online schema change
     addToMap(Integer.class, code++);
     addToMap(Pair.class, code++);
+
+    // Favored Assignment
+    addToMap(AssignmentPlan.class, code++);
   }
 
   private Class<?> declaredClass;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri Mar 16 22:09:11 2012
@@ -20,20 +20,6 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import javax.net.SocketFactory;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -52,6 +38,21 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Fri Mar 16 22:09:11 2012
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
-
 /**
  * Clients interact with the HMasterInterface to gain access to meta-level
  * HBase functionality, like finding an HRegionServer and creating/destroying

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Fri Mar 16 22:09:11 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.AssignmentPlan;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 
@@ -361,4 +362,11 @@ public interface HRegionInterface extend
    */
   public void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
   throws IOException;
+
+  /**
+   * Update the assignment plan for each region server.
+   * @param updatedFavoredNodesMap
+   */
+  public int updateFavoredNodes(AssignmentPlan plan)
+  throws IOException;
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1301790&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Mar 16 22:09:11 2012
@@ -0,0 +1,313 @@
+package org.apache.hadoop.hbase.master;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.master.AssignmentPlan.POSITION;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+
+/**
+ * Manages the preferences for assigning regions to specific servers.
+ * It get the assignment plan from scanning the META region and keep this
+ * assignment plan updated.
+ *
+ * The assignment manager executes the assignment plan by adding the regions
+ * with its most favored live region server into the transient assignment.
+ * Each transient assignment will be only valid for a configurable time
+ * before expire. During these valid time, the region will only be assigned
+ * based on the transient assignment.
+ *
+ * All access to this class is thread-safe.
+ */
+public class AssignmentManager {
+  protected static final Log LOG = LogFactory.getLog(
+      AssignmentManager.class);
+
+  /**
+   * The transient assignment map between each region server and
+   * its attempted assigning regions
+   */
+  private Map<HServerAddress, Set<HRegionInfo>> transientAssignments;
+
+  /**
+   * Set of all regions that have a transient preferred assignment, used for
+   * quick lookup.
+   */
+  private Set<HRegionInfo> regionsWithTransientAssignment;
+
+  /**
+   * Queue of transient assignments and their upcoming timeouts. When a
+   * transient assignment expires from this queue, it will be removed from
+   * the transient assignment map.
+   */
+  private DelayQueue<TransisentAssignment> transientAssignmentTimeout;
+
+  /**
+   * This thread polls the timeout queue and removes any assignments which
+   * have timed out.
+   */
+  private TransientAssignmentHandler transientAssignmentHandler;
+
+  /**
+   * The assignment plan which contains the mapping between each region and its
+   * favored region server list.
+   */
+  private AssignmentPlan assignmentPlan;
+
+  private final HMaster master;
+  private final Configuration conf;
+  private long millisecondDelay;
+  private POSITION[] positions;
+
+  public AssignmentManager(HMaster master) {
+    this.master = master;
+    this.conf = master.getConfiguration();
+
+    this.transientAssignmentTimeout = new DelayQueue<TransisentAssignment>();
+    this.transientAssignmentHandler = new TransientAssignmentHandler();
+    this.transientAssignments = new HashMap<HServerAddress, Set<HRegionInfo>>();
+    this.regionsWithTransientAssignment = new HashSet<HRegionInfo>();
+    this.millisecondDelay = conf.getLong(
+        "hbase.regionserver.transientAssignment.regionHoldPeriod", 30000);
+
+    this.assignmentPlan = new AssignmentPlan();
+    positions = AssignmentPlan.POSITION.values();
+  }
+
+  public void start() {
+    Threads.setDaemonThreadRunning(transientAssignmentHandler,
+        "RegionManager.transientAssignmentHandler");
+  }
+
+  /**
+   * Add a transient assignment for a region to a server. If the region already
+   * has a transient assignment, then this method will do nothing.
+   * @param server
+   * @param region
+   */
+  public void addTransientAssignment(HServerAddress server,
+      HRegionInfo region) {
+    synchronized (transientAssignments) {
+      if (regionsWithTransientAssignment.contains(region)) {
+        LOG.info("Attempted to add transient assignment for " +
+            region.getRegionNameAsString() + " to " +
+            server.getHostNameWithPort() +
+            " but already had assignment, new assignment ignored");
+        return;
+      }
+      Set<HRegionInfo> regions = transientAssignments.get(server);
+      if (regions == null) {
+        regions = new ConcurrentSkipListSet<HRegionInfo>();
+        transientAssignments.put(server, regions);
+      }
+      regions.add(region);
+      regionsWithTransientAssignment.add(region);
+      LOG.info("Add transient assignment for " +
+          region.getRegionNameAsString() + " to " + server.getHostname());
+    }
+    // Add to delay queue
+    transientAssignmentTimeout.add(new TransisentAssignment(region, server,
+        EnvironmentEdgeManager.currentTimeMillis(), millisecondDelay));
+  }
+
+  /**
+   * The assignment manager executes the assignment plan by adding the regions
+   * with its most favored live region server into the transient assignment.
+   * @param region get a transient assignment for this region
+   */
+  public void executeAssignmentPlan(HRegionInfo region) {
+    List<HServerAddress> servers = this.getAssignmentFromPlan(region);
+    if (servers != null) {
+      for (int i = 0; i < servers.size(); i++) {
+        HServerAddress server = servers.get(i);
+        HServerInfo info = master.getServerManager().getHServerInfo(server);
+        // A preferred server is only eligible for assignment if the master
+        // knows about the server's info, the server is not in the collection of
+        // dead servers, and the server has load information. Absence of load
+        // information may indicate that the server is in the process of
+        // shutting down.
+        if (info != null &&
+            !master.getServerManager().isDead(info.getServerName()) &&
+            master.getServerManager().getServersToLoad()
+            .get(info.getServerName()) != null) {
+          LOG.info("Add a transient assignment from the assignment plan: "
+              + " region " + region.getRegionNameAsString() + " to the "
+              + positions[i] + " region server" + info.getHostnamePort());
+          addTransientAssignment(info.getServerAddress(), region);
+          return;
+        }
+      }
+      LOG.warn("There is NO live favored region servers for the region " +
+          region.getRegionNameAsString());
+      return;
+    }
+    LOG.warn("There is no assignment plan for the region " +
+        region.getRegionNameAsString());
+  }
+
+  public boolean removeTransientAssignment(HServerAddress server,
+      HRegionInfo region) {
+    synchronized (transientAssignments) {
+      regionsWithTransientAssignment.remove(region);
+      Set<HRegionInfo> regions = transientAssignments.get(server);
+      if (regions != null) {
+        regions.remove(region);
+        if (regions.size() == 0) {
+          transientAssignments.remove(server);
+        }
+        LOG.debug("Remove the transisent assignment: region " +
+            region.getRegionNameAsString() + " to " +
+            server.getHostNameWithPort());
+        return true;
+      }
+      return false;
+    }
+  }
+
+  public Set<HRegionInfo> getTransientAssignments(HServerAddress server) {
+    synchronized (transientAssignments) {
+      return transientAssignments.get(server);
+    }
+  }
+
+  public boolean hasTransientAssignment(HRegionInfo region) {
+    synchronized (transientAssignments) {
+      return regionsWithTransientAssignment.contains(region);
+    }
+  }
+
+  /**
+   * Add the assignment to the plan
+   * @param region
+   * @param servers
+   * @param updateTimeStamp
+   */
+  public void updateAssignmentPlan(HRegionInfo region,
+      List<HServerAddress> servers, long updateTimeStamp) {
+   this.assignmentPlan.updateAssignmentPlan(region, servers,
+       updateTimeStamp);
+  }
+
+  /**
+   * Remove the assignment from the plan
+   * @param region
+   */
+  public void removeAssignmentFromPlan(HRegionInfo region) {
+    this.assignmentPlan.removeAssignment(region);
+  }
+
+  /**
+   * @param region
+   * @return true if there is a assignment for the region
+   */
+  public boolean hasAssignmentFromPlan(HRegionInfo region) {
+    return this.assignmentPlan.hasAssignment(region);
+  }
+
+  /**
+   * @param region
+   * @return the server list which the region are supposed to be assigned to
+   * based on the plan.
+   */
+  public List<HServerAddress> getAssignmentFromPlan(HRegionInfo region) {
+    return this.assignmentPlan.getAssignment(region);
+  }
+
+  /**
+   * @param region
+   * @return the latest update time stamp for this region's favored assignment.
+   */
+  public long getAssignmentPlanUpdateTimeStamp(HRegionInfo region){
+    return this.assignmentPlan.getAssignmentUpdateTS(region);
+  }
+
+  private class TransientAssignmentHandler extends Thread {
+    @Override
+    public void run() {
+      LOG.debug("Started TransientAssignmentHandler");
+      TransisentAssignment plan = null;
+      int resetFrequency = master.getConfiguration().getInt(
+          "hbase.master.meta.thread.rescanfrequency", 60 * 1000);
+      while (!master.getClosed().get()) {
+        try {
+          // check if any regions waiting time expired
+          plan = transientAssignmentTimeout.poll(resetFrequency,
+              TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          // no problem, just continue
+          continue;
+        }
+        if (null == plan) {
+          continue;
+        }
+        if (removeTransientAssignment(plan.getServer(), plan.getRegionInfo())) {
+          LOG.info("Removed region from transient assignment: " +
+              plan.getRegionInfo().getRegionNameAsString());
+        }
+      }
+    }
+  }
+
+  private class TransisentAssignment implements Delayed {
+    private long creationTime;
+    private HRegionInfo region;
+    private HServerAddress server;
+    private long millisecondDelay;
+
+    TransisentAssignment(HRegionInfo region, HServerAddress addr,
+        long creationTime, long millisecondDelay) {
+      this.region = region;
+      this.server = addr;
+      this.creationTime = creationTime;
+      this.millisecondDelay = millisecondDelay;
+    }
+
+    public HServerAddress getServer() {
+      return this.server;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return this.region;
+    }
+
+    @Override
+    public int compareTo(Delayed arg0) {
+      long delta = this.getDelay(TimeUnit.MILLISECONDS)
+          - arg0.getDelay(TimeUnit.MILLISECONDS);
+      return (this.equals(arg0) ? 0 : (delta > 0 ? 1 : (delta < 0 ? -1 : 0)));
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(
+          (this.creationTime + millisecondDelay) - System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof TransisentAssignment) {
+        TransisentAssignment assignment = (TransisentAssignment)o;
+        if (assignment.getServer().equals(this.getServer()) &&
+            assignment.getRegionInfo().equals(this.getRegionInfo())) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java?rev=1301790&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentPlan.java Fri Mar 16 22:09:11 2012
@@ -0,0 +1,227 @@
+/**
+ * Copyright 2012 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * AssignmentPlan is a writable object for the region assignment plan.
+ * It contains the mapping information between each region and
+ * its favored region server list.
+ *
+ * All the access to this class is thread-safe.
+ */
+public class AssignmentPlan implements Writable{
+  protected static final Log LOG = LogFactory.getLog(
+      AssignmentPlan.class);
+  private static final int VERSION = 1;
+
+  /** the map between each region and its favored region server list */
+  private Map<HRegionInfo, List<HServerAddress>> assignmentMap;
+
+  /** the map between each region and its lasted favored server list update
+   * time stamp
+  */
+  private Map<HRegionInfo, Long> assignmentUpdateTS;
+
+  public static enum POSITION {
+    PRIMARY,
+    SECONDARY,
+    TERTIARY;
+  };
+
+  public AssignmentPlan() {
+    assignmentMap = new HashMap<HRegionInfo, List<HServerAddress>>();
+    assignmentUpdateTS = new HashMap<HRegionInfo, Long>();
+  }
+
+  /**
+   * Add an assignment to the plan
+   * @param region
+   * @param servers
+   * @param ts
+   */
+  public synchronized void updateAssignmentPlan(HRegionInfo region,
+      List<HServerAddress> servers, long ts) {
+    if (region == null || servers == null || servers.size() ==0)
+      return;
+    this.assignmentUpdateTS.put(region, Long.valueOf(ts));
+    this.assignmentMap.put(region, servers);
+    LOG.info("Update the assignment plan for region " +
+        region.getRegionNameAsString() + " to favored nodes " +
+        RegionPlacement.getFavoredNodes(servers)
+        + " at time stamp " + ts);
+  }
+
+  /**
+   * Add an assignment to the plan
+   * @param region
+   * @param servers
+   */
+  public synchronized void updateAssignmentPlan(HRegionInfo region,
+      List<HServerAddress> servers) {
+    if (region == null || servers == null || servers.size() ==0)
+      return;
+    this.assignmentMap.put(region, servers);
+    LOG.info("Update the assignment plan for region " +
+        region.getRegionNameAsString() + " ; favored nodes " +
+        RegionPlacement.getFavoredNodes(servers));
+  }
+
+  /**
+   * Remove one assignment from the plan
+   * @param region
+   */
+  public synchronized void removeAssignment(HRegionInfo region) {
+    this.assignmentMap.remove(region);
+    this.assignmentUpdateTS.remove(region);
+  }
+
+  /**
+   * @param region
+   * @return true if there is an assignment plan for the particular region.
+   */
+  public synchronized boolean hasAssignment(HRegionInfo region) {
+    return assignmentMap.containsKey(region);
+  }
+
+  /**
+   * @param region
+   * @return the list of favored region server for this region based on the plan
+   */
+  public synchronized List<HServerAddress> getAssignment(HRegionInfo region) {
+    return assignmentMap.get(region);
+  }
+
+  /**
+   * @param region
+   * @return the last update time stamp for the region in the plan
+   */
+  public synchronized long getAssignmentUpdateTS(HRegionInfo region) {
+    Long updateTS = assignmentUpdateTS.get(region);
+    if (updateTS == null)
+      return Long.MIN_VALUE;
+    else
+      return updateTS.longValue();
+  }
+
+  /**
+   * @return the mapping between each region to its favored region server list
+   */
+  public synchronized Map<HRegionInfo, List<HServerAddress>> getAssignmentMap() {
+    return assignmentMap;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(VERSION);
+    if (this.assignmentMap == null) {
+      out.writeInt(0);
+    }
+    // write the size of the favored assignment map
+    out.writeInt(this.assignmentMap.size());
+    for (Map.Entry<HRegionInfo,  List<HServerAddress>> entry :
+      assignmentMap.entrySet()) {
+      // write the region info
+      entry.getKey().write(out);
+      // write the list of favored server list
+      List<HServerAddress> serverList = entry.getValue();
+      // write the size of the list
+      out.writeInt(serverList.size());
+      for (HServerAddress addr : serverList) {
+        // write the element of the list
+        addr.write(out);
+      }
+    }
+  }
+
+ @Override
+  public void readFields(DataInput in) throws IOException{
+   int version = in.readInt();
+   if (version != VERSION) {
+     throw new IOException("The version mismatch for the assignment plan. " +
+		"The expected versioin is " + VERSION +
+		" but the verion from the assigment plan is " + version);
+   }
+   // read the favoredAssignmentMap size
+   int assignmentMapSize = in.readInt();
+   for (int i = 0; i < assignmentMapSize; i++) {
+     // read each region info
+     HRegionInfo region = new HRegionInfo();
+     region.readFields(in);
+     // read the size of favored server list
+     int serverListSize = in.readInt();
+     List<HServerAddress> serverList =
+       new ArrayList<HServerAddress>(serverListSize);
+     for (int j = 0; j < serverListSize; j++) {
+       HServerAddress addr = new HServerAddress();
+       addr.readFields(in);
+       serverList.add(addr);
+     }
+
+     // add the assignment to favoredAssignmentMap
+     this.assignmentMap.put(region, serverList);
+   }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+   if (this == o) {
+     return true;
+   }
+   if (o == null) {
+     return false;
+   }
+   if (getClass() != o.getClass()) {
+     return false;
+   }
+   // To compare the map from objec o is identical to current assignment map.
+   Map<HRegionInfo, List<HServerAddress>> comparedMap=
+     ((AssignmentPlan)o).getAssignmentMap();
+
+   // compare the size
+   if (comparedMap.size() != this.assignmentMap.size())
+     return false;
+
+   // compare each element in the assignment map
+   for (Map.Entry<HRegionInfo, List<HServerAddress>> entry :
+     comparedMap.entrySet()) {
+     List<HServerAddress> serverList = this.assignmentMap.get(entry.getKey());
+     if (serverList == null && entry.getValue() != null) {
+       return false;
+     } else if (!serverList.equals(entry.getValue())) {
+       return false;
+     }
+   }
+   return true;
+ }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/BaseScanner.java Fri Mar 16 22:09:11 2012
@@ -120,7 +120,6 @@ abstract class BaseScanner extends Chore
     }
   }
   private final boolean rootRegion;
-  private final boolean readFavoredNodes;
   protected final HMaster master;
 
   protected boolean initialScanComplete;
@@ -140,13 +139,6 @@ abstract class BaseScanner extends Chore
     this.rootRegion = rootRegion;
     this.master = master;
     this.initialScanComplete = false;
-
-    // Only read favored nodes if using the assignment-based load balancer.
-    this.readFavoredNodes = master.getConfiguration().getClass(
-        HConstants.LOAD_BALANCER_IMPL, Object.class).equals(
-        RegionManager.AssignmentLoadBalancer.class);
-    LOG.debug("Whether to read the favoredNodes from meta: " +
-        (readFavoredNodes ? "Yes" : "No"));
   }
 
   /** @return true if initial scan completed successfully */
@@ -165,14 +157,14 @@ abstract class BaseScanner extends Chore
   }
 
   /**
-   * @param region Region to scan
+   * @param metaRegion Region to scan
    * @throws IOException
    */
-  protected void scanRegion(final MetaRegion region) throws IOException {
+  protected void scanRegion(final MetaRegion metaRegion) throws IOException {
     HRegionInterface regionServer = null;
     long scannerId = -1L;
     LOG.info(Thread.currentThread().getName() + " scanning meta region " +
-      region.toString());
+      metaRegion.toString());
 
     // Array to hold list of split parents found.  Scan adds to list.  After
     // scan we go check if parents can be removed and that their daughters
@@ -182,46 +174,58 @@ abstract class BaseScanner extends Chore
     int rows = 0;
     try {
       regionServer =
-        this.master.getServerConnection().getHRegionConnection(region.getServer());
+        this.master.getServerConnection().getHRegionConnection(metaRegion.getServer());
       Scan s = new Scan().addFamily(HConstants.CATALOG_FAMILY);
       // Make this scan do a row at a time otherwise, data can be stale.
       s.setCaching(1);
-      scannerId = regionServer.openScanner(region.getRegionName(), s);
+      scannerId = regionServer.openScanner(metaRegion.getRegionName(), s);
       while (true) {
         Result values = regionServer.next(scannerId);
         if (values == null || values.size() == 0) {
           break;
         }
-        HRegionInfo info = master.getHRegionInfo(values.getRow(), values);
-        if (info == null) {
+        HRegionInfo region = master.getHRegionInfo(values.getRow(), values);
+        if (region == null) {
           emptyRows.add(values.getRow());
           continue;
         }
-        String serverAddress = getServerAddress(values);
-        long startCode = getStartCode(values);
-
-        // Note Region has been assigned.
-        checkAssigned(regionServer, region, info, serverAddress, startCode, true);
-        if (isSplitParent(info)) {
-          splitParents.put(info, values);
-        }
-        rows += 1;
-
-        if (this.readFavoredNodes) {
+        // Process the favored nodes
+        if (this.master.shouldAssignRegionsWithFavoredNodes()) {
           byte[] favoredNodes = values.getValue(HConstants.CATALOG_FAMILY,
               HConstants.FAVOREDNODES_QUALIFIER);
+          AssignmentManager assignmentManager =
+            this.master.getRegionManager().getAssignmentManager();
+
           if (favoredNodes != null) {
-            List<HServerAddress> addresses = new ArrayList<HServerAddress>();
-            for (String address : new String(favoredNodes).split(",")) {
-              addresses.add(new HServerAddress(address));
+            // compare the update TS
+            long updateTimeStamp =
+              values.getLastestTimeStamp(HConstants.CATALOG_FAMILY,
+                HConstants.FAVOREDNODES_QUALIFIER);
+            long lastUpdate =
+              assignmentManager.getAssignmentPlanUpdateTimeStamp(region);
+            if (lastUpdate < updateTimeStamp) {
+              // need to update the persistent assignment
+              List<HServerAddress> servers =
+                RegionPlacement.getFavoredNodesList(favoredNodes);
+              assignmentManager.updateAssignmentPlan(region,
+                  servers, updateTimeStamp);
             }
-            this.master.getRegionManager().assignmentManager
-                .addPersistentAssignment(info, addresses);
           } else {
-            this.master.getRegionManager().assignmentManager
-                .removePersistentAssignment(info);
+            assignmentManager.removeAssignmentFromPlan(region);
           }
         }
+
+        String serverAddress = getServerAddress(values);
+        long startCode = getStartCode(values);
+
+        // Verify region has been validly assigned.
+        checkAssigned(regionServer, metaRegion, region, serverAddress,
+            startCode, true);
+        if (isSplitParent(region)) {
+          splitParents.put(region, values);
+        }
+        rows += 1;
+
       }
       if (rootRegion) {
         this.master.getRegionManager().setNumMetaRegions(rows);
@@ -253,8 +257,8 @@ abstract class BaseScanner extends Chore
     // First clean up any meta region rows which had null HRegionInfos
     if (emptyRows.size() > 0) {
       LOG.warn("Found " + emptyRows.size() + " rows with empty HRegionInfo " +
-        "while scanning meta region " + Bytes.toString(region.getRegionName()));
-      this.master.deleteEmptyMetaRows(regionServer, region.getRegionName(),
+        "while scanning meta region " + Bytes.toString(metaRegion.getRegionName()));
+      this.master.deleteEmptyMetaRows(regionServer, metaRegion.getRegionName(),
           emptyRows);
     }
 
@@ -263,12 +267,12 @@ abstract class BaseScanner extends Chore
     if (splitParents.size() > 0) {
       for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) {
         HRegionInfo hri = e.getKey();
-        cleanupAndVerifySplits(region.getRegionName(), regionServer,
+        cleanupAndVerifySplits(metaRegion.getRegionName(), regionServer,
           hri, e.getValue());
       }
     }
     LOG.info(Thread.currentThread().getName() + " scan of " + rows +
-      " row(s) of meta region " + region.toString() + " complete");
+      " row(s) of meta region " + metaRegion.toString() + " complete");
   }
 
   /*

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Mar 16 22:09:11 2012
@@ -103,7 +103,6 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionPlacement;
 import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Threads;
@@ -217,6 +216,8 @@ public class HMaster extends Thread impl
   /** Log directories split on startup for testing master failover */
   private List<String> logDirsSplitOnStartup;
 
+  private boolean shouldAssignRegionsWithFavoredNodes = false;
+
   /**
    * Constructor
    * @param conf configuration
@@ -321,6 +322,17 @@ public class HMaster extends Thread impl
         });
 
     regionPlacement = new RegionPlacement(this.conf);
+
+    // Only read favored nodes if using the assignment-based load balancer.
+    this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
+        HConstants.LOAD_BALANCER_IMPL, Object.class).equals(
+        RegionManager.AssignmentLoadBalancer.class);
+    LOG.debug("Whether to read the favoredNodes from meta: " +
+        (shouldAssignRegionsWithFavoredNodes ? "Yes" : "No"));
+  }
+
+  public boolean shouldAssignRegionsWithFavoredNodes() {
+    return shouldAssignRegionsWithFavoredNodes;
   }
 
   /**
@@ -1257,7 +1269,7 @@ public class HMaster extends Thread impl
         if (!this.regionManager.areAllMetaRegionsOnline()) {
           throw new NotAllMetaRegionsOnlineException();
         }
-        if (!this.serverManager.canAssignUserRegions()) {
+        if (!this.serverManager.hasEnoughRegionServers()) {
           throw new IOException("not enough servers to create table yet");
         }
         createTable(newRegions);
@@ -1303,17 +1315,19 @@ public class HMaster extends Thread impl
       srvr.close(scannerid);
     }
 
-    // get the favorite nodes map from the regionPlacement
-    Map<HRegionInfo, List<HServerInfo>> favoriteNodesMap =
-      regionPlacement.getFaroredNodesForNewRegions(newRegions,
-          getClusterStatus().getServerInfo());
+    AssignmentPlan assignmentPlan = null;
+    if (this.shouldAssignRegionsWithFavoredNodes) {
+      // Get the assignment plan for the new regions
+      assignmentPlan = regionPlacement.getAssignmentPlan(newRegions);
+    }
 
     for(HRegionInfo newRegion : newRegions) {
-      if (favoriteNodesMap != null) {
+      if (assignmentPlan != null) {
         // create the region with favorite nodes.
-        List<HServerInfo> favoriteNodes = favoriteNodesMap.get(newRegion);
+        List<HServerAddress> favoredNodes =
+          assignmentPlan.getAssignment(newRegion);
         regionManager.createRegion(newRegion, srvr, metaRegionName,
-            favoriteNodes);
+            favoredNodes);
       } else {
         regionManager.createRegion(newRegion, srvr, metaRegionName);
       }
@@ -1653,7 +1667,8 @@ public class HMaster extends Thread impl
       HServerAddress serverAddress = new HServerAddress(hostnameAndPort);
 
       // Assign the specified host to be the preferred host for the specified region.
-      this.regionManager.assignmentManager.addTransientAssignment(serverAddress, hri);
+      this.regionManager.getAssignmentManager().
+        addTransientAssignment(serverAddress, hri);
 
       // Close the region so that it will be re-opened by the preferred host.
       modifyTable(tableName, HConstants.Modify.CLOSE_REGION, new Writable[]{args[0]});
@@ -1775,6 +1790,7 @@ public class HMaster extends Thread impl
     }
   }
 
+
   /**
    * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.WatchedEvent)
    */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/MetaRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/MetaRegion.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/MetaRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/MetaRegion.java Fri Mar 16 22:09:11 2012
@@ -42,7 +42,7 @@ public class MetaRegion implements Compa
 
   @Override
   public String toString() {
-    return "{server: " + this.server.toString() + ", regionname: " +
+    return "{server: " + this.server.getHostNameWithPort() + ", regionname: " +
         regionInfo.getRegionNameAsString() + ", startKey: <" +
         Bytes.toString(regionInfo.getStartKey()) + ">}";
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1301790&r1=1301789&r2=1301790&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Fri Mar 16 22:09:11 2012
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,7 +36,6 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -86,7 +84,7 @@ public class RegionManager {
 
   private static final byte [] META_REGION_PREFIX = Bytes.toBytes(".META.,");
 
-  final PreferredAssignmentManager assignmentManager;
+  private final AssignmentManager assignmentManager;
 
   /**
    * Map key -> tableName, value -> ThrottledRegionReopener
@@ -178,22 +176,12 @@ public class RegionManager {
         ZooKeeperWrapper.getInstance(conf, master.getZKWrapperName());
     this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
 
-    LoadBalancer loadBalancerImpl;
-    try {
-      Class<? extends LoadBalancer> theClass = conf.getClass(
-          HConstants.LOAD_BALANCER_IMPL, DefaultLoadBalancer.class,
-          LoadBalancer.class);
-      Constructor<? extends LoadBalancer> meth =
-          theClass.getDeclaredConstructor(RegionManager.class);
-      meth.setAccessible(true);
-      loadBalancerImpl = meth.newInstance(this);
-    } catch (Exception e) {
-      loadBalancerImpl = new DefaultLoadBalancer();
+    if (master.shouldAssignRegionsWithFavoredNodes()) {
+      this.loadBalancer = new AssignmentLoadBalancer();
+    } else {
+      this.loadBalancer = new DefaultLoadBalancer();
     }
-    this.loadBalancer = loadBalancerImpl;
-    LOG.info("The load balancer is " + this.loadBalancer.getClass().getName());
-
-    this.assignmentManager = new PreferredAssignmentManager(master);
+    this.assignmentManager = new AssignmentManager(master);
 
     // The root region
     rootScannerThread = new RootScanner(master);
@@ -209,6 +197,10 @@ public class RegionManager {
     reassignRootRegion();
   }
 
+  public LoadBalancer getLoadBalancer() {
+    return this.loadBalancer;
+  }
+
   void start() {
     assignmentManager.start();
     Threads.setDaemonThreadRunning(rootScannerThread,
@@ -217,6 +209,10 @@ public class RegionManager {
       "RegionManager.metaScanner");
   }
 
+  public AssignmentManager getAssignmentManager() {
+    return this.assignmentManager;
+  }
+
   void unsetRootRegion() {
     synchronized (regionsInTransition) {
       rootRegionLocation.set(null);
@@ -263,6 +259,33 @@ public class RegionManager {
       // be assigned when the region server reports next
       return;
     }
+
+    if (this.master.shouldAssignRegionsWithFavoredNodes()) {
+      // assign regions with favored nodes
+      assignRegionsWithFavoredNodes(info, mostLoadedRegions, returnMsgs);
+    } else {
+      // assign regions without favored nodes
+      assignRegionsWithoutFavoredNodes(info, mostLoadedRegions, returnMsgs);
+    }
+  }
+
+  private void assignRegionsWithFavoredNodes(HServerInfo regionServer,
+      HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
+    // get the regions that are waiting for assignment for that region server
+    Set<RegionState> regionsToAssign = regionsAwaitingAssignment(regionServer);
+
+    if (regionsToAssign.isEmpty() &&
+        master.getRegionServerOperationQueue().isEmpty()) {
+        // There are no regions waiting to be assigned.
+        // load balance as before
+        this.loadBalancer.loadBalancing(regionServer, mostLoadedRegions, returnMsgs);
+    } else {
+      assignRegionsToOneServer(regionsToAssign, regionServer, returnMsgs);
+    }
+  }
+
+  private void assignRegionsWithoutFavoredNodes(HServerInfo info,
+      HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
     // the region may assigned to this region server
     Set<RegionState> regionsToAssign = null;
 
@@ -459,11 +482,11 @@ public class RegionManager {
       this.regionsInTransition.put(regionName, rs);
     }
 
-    if (assignmentManager.hasPersistentAssignment(rs.getRegionInfo())) {
-      String nodes = StringUtils.join(assignmentManager
-          .getPreferredAssignments(rs.getRegionInfo()), ',');
+    if (assignmentManager.hasAssignmentFromPlan(rs.getRegionInfo())) {
+      String favoredNodes = RegionPlacement.getFavoredNodes(
+          assignmentManager.getAssignmentFromPlan(rs.regionInfo));
       returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, rs.getRegionInfo(),
-          nodes.getBytes()));
+          favoredNodes.getBytes()));
     } else {
       returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, rs.getRegionInfo()));
     }
@@ -507,6 +530,122 @@ public class RegionManager {
    * only caller (assignRegions, whose caller is ServerManager.processMsgs) owns
    * the monitor for RegionManager
    */
+  private Set<RegionState> regionsAwaitingAssignment(HServerInfo server) {
+    // set of regions we want to assign to this server
+    Set<RegionState> regionsToAssign = new HashSet<RegionState>();
+    boolean isSingleServer = this.master.numServers() == 1;
+    HServerAddress addr = server.getServerAddress();
+    boolean isMetaServer = isMetaServer(addr);
+    RegionState rootState = null;
+    boolean isPreferredAssignment = false;
+    boolean reassigningMetas =
+      (numberOfMetaRegions.get() != onlineMetaRegions.size());
+    boolean isMetaOrRoot = isMetaServer || isRootServer(addr);
+
+    // Assign ROOT region if ROOT region is offline.
+    synchronized (this.regionsInTransition) {
+      rootState = regionsInTransition.get(HRegionInfo.ROOT_REGIONINFO
+          .getRegionNameAsString());
+    }
+    if (rootState != null && rootState.isUnassigned()) {
+      // just make sure it isn't hosting META regions (unless
+      // it's the only server left).
+      if (!isMetaServer || isSingleServer) {
+        regionsToAssign.add(rootState);
+        LOG.debug("Going to assign -ROOT- region to server " +
+            server.getHostnamePort());
+      }
+      return regionsToAssign;
+    }
+
+    // Don't assign META to this server who has already hosted any ROOT or META
+    if (isMetaOrRoot && reassigningMetas && !isSingleServer) {
+      return regionsToAssign;
+    }
+
+    // Get the set of the regions which are preserved
+    // for the current region server
+    Set<HRegionInfo> preservedRegionsForCurrentRS =
+      assignmentManager.getTransientAssignments(addr);
+
+    synchronized (this.regionsInTransition) {
+      int nonPreferredAssignment = 0;
+      for (RegionState regionState : regionsInTransition.values()) {
+        HRegionInfo regionInfo = regionState.getRegionInfo();
+        if (regionInfo == null) continue;
+        if (regionInfo.isRootRegion() && !regionState.isUnassigned()) {
+          LOG.debug("The -ROOT- region"
+              + " has been assigned and will be online soon. " +
+                  "Do nothing for server " + server.getHostnamePort());
+          break;
+        }
+        // Assign the META region here explicitly
+        if (regionInfo.isMetaRegion()) {
+          if (regionState.isUnassigned()) {
+            regionsToAssign.clear();
+            regionsToAssign.add(regionState);
+            LOG.debug("Going to assign META region: " +
+                regionInfo.getRegionNameAsString() + " to server " +
+                server.getHostnamePort());
+          } else {
+            LOG.debug("The .META. region " + regionInfo.getRegionNameAsString()
+                + " has been assigned and will be online soon. " +
+                "Do nothing for server " + server.getHostnamePort());
+          }
+          break;
+        }
+
+        // Can't assign user regions until all meta regions have been assigned,
+        // the initial meta scan is done and there are enough online
+        // region servers
+        if (reassigningMetas || !this.isInitialMetaScanComplete() ||
+            !master.getServerManager().hasEnoughRegionServers()) {
+          LOG.debug("Cannot assign region " + regionInfo.getRegionNameAsString()
+              + " because not all the META are online, "
+              + "or the initial META scan is not completed, or there are no "
+              + "enough online region servers");
+          continue;
+        }
+
+        // Cannot assign region which is NOT in the unassigned state
+        if (!regionState.isUnassigned()) {
+          continue;
+        }
+
+        if (preservedRegionsForCurrentRS == null ||
+            !preservedRegionsForCurrentRS.contains(regionInfo)) {
+          if (assignmentManager.hasTransientAssignment(regionInfo) ||
+              nonPreferredAssignment > this.maxAssignInOneGo) {
+            // Hold the region for its favored nodes and limit the number of
+            // non preferred assignments for each region server.
+            continue;
+          }
+          // This is a non preferred assignment.
+          isPreferredAssignment = false;
+          nonPreferredAssignment++;
+        } else {
+          isPreferredAssignment = true;
+        }
+
+        // Assign the current region to the region server.
+        regionsToAssign.add(regionState);
+        LOG.debug("Going to assign user region " +
+            regionInfo.getRegionNameAsString() +
+            " to server " + server.getHostnamePort() + " in a " +
+            (isPreferredAssignment ? "": "non-") + "preferred way");
+
+      }
+    }
+    return regionsToAssign;
+  }
+
+  /**
+   * Get the set of regions that should be assignable in this pass.
+   *
+   * Note that no synchronization on regionsInTransition is needed because the
+   * only caller (assignRegions, whose caller is ServerManager.processMsgs) owns
+   * the monitor for RegionManager
+   */
   private Set<RegionState> regionsAwaitingAssignment(HServerAddress addr,
       boolean isSingleServer, MutableBoolean isPreferredAssignment,
       boolean assignmentByLocality, boolean holdRegionForBestRegionserver,
@@ -515,7 +654,7 @@ public class RegionManager {
     // set of regions we want to assign to this server
     Set<RegionState> regionsToAssign = new HashSet<RegionState>();
 
-    Set<HRegionInfo> regions = assignmentManager.getPreferredAssignments(addr);
+    Set<HRegionInfo> regions = assignmentManager.getTransientAssignments(addr);
     if (null != regions) {
       isPreferredAssignment.setValue(true);
       // One could use regionsInTransition.keySet().containsAll(regions) but
@@ -588,7 +727,7 @@ public class RegionManager {
           continue;
         }
         if (!i.isMetaRegion()
-            && !master.getServerManager().canAssignUserRegions()) {
+            && !master.getServerManager().hasEnoughRegionServers()) {
           LOG.debug("user region " + i.getRegionNameAsString()
               + " is in transition but not enough servers yet");
           continue;
@@ -919,7 +1058,7 @@ public class RegionManager {
    * @throws IOException
    */
   public void createRegion(HRegionInfo newRegion, HRegionInterface server,
-      byte [] metaRegionName,  List<HServerInfo> favoriteNodeList)
+      byte [] metaRegionName,  List<HServerAddress> favoriteNodeList)
   throws IOException {
     // 2. Create the HRegion
     HRegion region = HRegion.createHRegion(newRegion, this.master.getRootDir(),
@@ -936,10 +1075,7 @@ public class RegionManager {
 
     // 3.2 Put the favorite nodes into meta.
     if (favoriteNodeList != null) {
-      String favoredNodes = "";
-      for (HServerInfo favoriteServer : favoriteNodeList) {
-        favoredNodes += favoriteServer.getHostnamePort() + ",";
-      }
+      String favoredNodes = RegionPlacement.getFavoredNodes(favoriteNodeList);
       favoredNodes = favoredNodes.substring(0, favoredNodes.length() - 1);
       put.add(HConstants.CATALOG_FAMILY, HConstants.FAVOREDNODES_QUALIFIER,
           EnvironmentEdgeManager.currentTimeMillis(), favoredNodes.getBytes());
@@ -1201,10 +1337,10 @@ public class RegionManager {
       }
     }
     if (force || (!s.isPendingOpen() && !s.isOpen())) {
-      s.setUnassigned();
       // Refresh assignment information when a region is marked unassigned so
       // that it opens on the preferred server.
-      this.assignmentManager.putTransientFromPersistent(info);
+      this.assignmentManager.executeAssignmentPlan(info);
+      s.setUnassigned();
     }
   }
 
@@ -1295,7 +1431,7 @@ public class RegionManager {
       if (!setOffline) {
         // Refresh assignment information when a region is closed and not
         // marked offline so that it opens on the preferred server.
-        this.assignmentManager.putTransientFromPersistent(regionInfo);
+        this.assignmentManager.executeAssignmentPlan(regionInfo);
       }
     }
   }
@@ -1720,7 +1856,7 @@ public class RegionManager {
       // primary is alive, unassign that region and let it move to the primary.
       for (HRegionInfo region : mostLoadedRegions) {
         List<HServerAddress> preferences =
-            assignmentManager.getPreferredAssignments(region);
+            assignmentManager.getAssignmentFromPlan(region);
         if (preferences == null || preferences.size() == 0) {
           // No prefered assignment, do nothing.
           continue;
@@ -1763,7 +1899,7 @@ public class RegionManager {
       int regionsUnassigned = 0;
       for (HRegionInfo region : mostLoadedRegions) {
         List<HServerAddress> preferences =
-            assignmentManager.getPreferredAssignments(region);
+            assignmentManager.getAssignmentFromPlan(region);
         if (preferences == null || preferences.size() == 0) {
           // No preferredAssignment, do nothing.
           continue;
@@ -1826,7 +1962,7 @@ public class RegionManager {
 
       for (HRegionInfo region : mostLoadedRegions) {
         List<HServerAddress> preferences =
-            assignmentManager.getPreferredAssignments(region);
+            assignmentManager.getAssignmentFromPlan(region);
         if (preferences == null || preferences.size() == 0) {
           // No preferredAssignment, do nothing.
           continue;



Mime
View raw message