hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhs...@apache.org
Subject svn commit: r1446173 [2/5] - in /hbase/branches/hbase-7290v2: ./ bin/ hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/test/...
Date Thu, 14 Feb 2013 13:35:59 GMT
Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Thu Feb 14 13:35:54 2013
@@ -23,8 +23,10 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.util.Base
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
@@ -218,6 +221,67 @@ public class TableMapReduceUtil {
       initTableMapperJob(table, scan, mapper, outputKeyClass,
               outputValueClass, job, addDependencyJars, TableInputFormat.class);
   }
+  
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<? extends WritableComparable> outputKeyClass,
+      Class<? extends Writable> outputValueClass, Job job) throws IOException {
+    initTableMapperJob(scans, mapper, outputKeyClass, outputValueClass, job,
+        true);
+  }
+
+  /**
+   * Use this before submitting a Multi TableMap job. It will appropriately set
+   * up the job.
+   *
+   * @param scans The list of {@link Scan} objects to read from.
+   * @param mapper The mapper class to use.
+   * @param outputKeyClass The class of the output key.
+   * @param outputValueClass The class of the output value.
+   * @param job The current job to adjust. Make sure the passed job is carrying
+   *          all necessary HBase configuration.
+   * @param addDependencyJars upload HBase jars and jars for any of the
+   *          configured job classes via the distributed cache (tmpjars).
+   * @throws IOException When setting up the details fails.
+   */
+  public static void initTableMapperJob(List<Scan> scans,
+      Class<? extends TableMapper> mapper,
+      Class<? extends WritableComparable> outputKeyClass,
+      Class<? extends Writable> outputValueClass, Job job,
+      boolean addDependencyJars) throws IOException {
+    job.setInputFormatClass(MultiTableInputFormat.class);
+    if (outputValueClass != null) {
+      job.setMapOutputValueClass(outputValueClass);
+    }
+    if (outputKeyClass != null) {
+      job.setMapOutputKeyClass(outputKeyClass);
+    }
+    job.setMapperClass(mapper);
+    HBaseConfiguration.addHbaseResources(job.getConfiguration());
+    List<String> scanStrings = new ArrayList<String>();
+
+    for (Scan scan : scans) {
+      scanStrings.add(convertScanToString(scan));
+    }
+    job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
+      scanStrings.toArray(new String[scanStrings.size()]));
+
+    if (addDependencyJars) {
+      addDependencyJars(job);
+    }
+  }
 
   public static void initCredentials(Job job) throws IOException {
     if (User.isHBaseSecurityEnabled(job.getConfiguration())) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java Thu Feb 14 13:35:54 2013
@@ -23,30 +23,68 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 
 /**
- * A table split corresponds to a key range (low, high). All references to row
- * below refer to the key of the row.
+ * A table split corresponds to a key range (low, high) and an optional scanner.
+ * All references to row below refer to the key of the row.
  */
 @InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
 public class TableSplit extends InputSplit
 implements Writable, Comparable<TableSplit> {
+  public static final Log LOG = LogFactory.getLog(TableSplit.class);
+  
+  // should be < 0 (@see #readFields(DataInput))
+  // version 1 supports Scan data member
+  enum Version {
+    UNVERSIONED(0),
+    // Initial number we put on TableSplit when we introduced versioning.
+    INITIAL(-1);
+
+    final int code;
+    static final Version[] byCode;
+    static {
+      byCode = Version.values();
+      for (int i = 0; i < byCode.length; i++) {
+        if (byCode[i].code != -1 * i) {
+          throw new AssertionError("Values in this enum should be descending by one");
+        }
+      }
+    }
+
+    Version(int code) {
+      this.code = code;
+    }
+
+    boolean atLeast(Version other) {
+      return code <= other.code;
+    }
 
+    static Version fromCode(int code) {
+      return byCode[code * -1];
+    }
+  }
+  
+  private static final Version VERSION = Version.INITIAL;
   private byte [] tableName;
   private byte [] startRow;
   private byte [] endRow;
   private String regionLocation;
+  private String scan = ""; // stores the serialized form of the Scan
 
   /** Default constructor. */
   public TableSplit() {
-    this(HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
+    this(HConstants.EMPTY_BYTE_ARRAY, null, HConstants.EMPTY_BYTE_ARRAY,
       HConstants.EMPTY_BYTE_ARRAY, "");
   }
 
@@ -54,17 +92,47 @@ implements Writable, Comparable<TableSpl
    * Creates a new instance while assigning all variables.
    *
    * @param tableName  The name of the current table.
+   * @param scan The scan associated with this split.
    * @param startRow  The start row of the split.
    * @param endRow  The end row of the split.
    * @param location  The location of the region.
    */
-  public TableSplit(byte [] tableName, byte [] startRow, byte [] endRow,
+  public TableSplit(byte [] tableName, Scan scan, byte [] startRow, byte [] endRow,
       final String location) {
     this.tableName = tableName;
+    try {
+      this.scan =
+        (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
+    } catch (IOException e) {
+      LOG.warn("Failed to convert Scan to String", e);
+    }
     this.startRow = startRow;
     this.endRow = endRow;
     this.regionLocation = location;
   }
+  
+  /**
+   * Creates a new instance without a scanner.
+   *
+   * @param tableName The name of the current table.
+   * @param startRow The start row of the split.
+   * @param endRow The end row of the split.
+   * @param location The location of the region.
+   */
+  public TableSplit(byte[] tableName, byte[] startRow, byte[] endRow,
+      final String location) {
+    this(tableName, null, startRow, endRow, location);
+  }
+
+  /**
+   * Returns a Scan object from the stored string representation.
+   *
+   * @return Returns a Scan object based on the stored scanner.
+   * @throws IOException
+   */
+  public Scan getScan() throws IOException {
+    return TableMapReduceUtil.convertStringToScan(this.scan);
+  }
 
   /**
    * Returns the table name.
@@ -133,10 +201,29 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public void readFields(DataInput in) throws IOException {
-    tableName = Bytes.readByteArray(in);
+    Version version = Version.UNVERSIONED;
+    // TableSplit was not versioned in the beginning.
+    // In order to introduce it now, we make use of the fact
+    // that tableName was written with Bytes.writeByteArray,
+    // which encodes the array length as a vint which is >= 0.
+    // Hence if the vint is >= 0 we have an old version and the vint
+    // encodes the length of tableName.
+    // If < 0 we just read the version and the next vint is the length.
+    // @see Bytes#readByteArray(DataInput)
+    int len = WritableUtils.readVInt(in);
+    if (len < 0) {
+      // what we just read was the version
+      version = Version.fromCode(len);
+      len = WritableUtils.readVInt(in);
+    }
+    tableName = new byte[len];
+    in.readFully(tableName);
     startRow = Bytes.readByteArray(in);
     endRow = Bytes.readByteArray(in);
     regionLocation = Bytes.toString(Bytes.readByteArray(in));
+    if (version.atLeast(Version.INITIAL)) {
+      scan = Bytes.toString(Bytes.readByteArray(in));
+    }
   }
 
   /**
@@ -147,10 +234,12 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, VERSION.code);
     Bytes.writeByteArray(out, tableName);
     Bytes.writeByteArray(out, startRow);
     Bytes.writeByteArray(out, endRow);
     Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
+    Bytes.writeByteArray(out, Bytes.toBytes(scan));
   }
 
   /**
@@ -174,7 +263,12 @@ implements Writable, Comparable<TableSpl
    */
   @Override
   public int compareTo(TableSplit split) {
-    return Bytes.compareTo(getStartRow(), split.getStartRow());
+    // If The table name of the two splits is the same then compare start row
+    // otherwise compare based on table names
+    int tableNameComparison =
+        Bytes.compareTo(getTableName(), split.getTableName());
+    return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
+        getStartRow(), split.getStartRow());
   }
 
   @Override
@@ -191,6 +285,7 @@ implements Writable, Comparable<TableSpl
     @Override
     public int hashCode() {
         int result = tableName != null ? Arrays.hashCode(tableName) : 0;
+        result = 31 * result + (scan != null ? scan.hashCode() : 0);
         result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
         result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
         result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Feb 14 13:35:54 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.master.ha
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
@@ -157,6 +158,18 @@ public class AssignmentManager extends Z
 
   private final RegionStates regionStates;
 
+  // The threshold to use bulk assigning. Using bulk assignment
+  // only if assigning at least this many regions to at least this
+  // many servers. If assigning fewer regions to fewer servers,
+  // bulk assigning may be not as efficient.
+  private final int bulkAssignThresholdRegions;
+  private final int bulkAssignThresholdServers;
+
+  // Should bulk assignment wait till all regions are assigned,
+  // or it is timed out?  This is useful to measure bulk assignment
+  // performance, but not needed in most use cases.
+  private final boolean bulkAssignWaitTillAllAssigned;
+
   /**
    * Indicator that AssignmentManager has recovered the region states so
    * that ServerShutdownHandler can be fully enabled and re-assign regions
@@ -206,6 +219,11 @@ public class AssignmentManager extends Z
     this.metricsMaster = metricsMaster;// can be null only with tests.
     this.regionStates = new RegionStates(server, serverManager);
 
+    this.bulkAssignWaitTillAllAssigned =
+      conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
+    this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
+    this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+
     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
@@ -393,7 +411,7 @@ public class AssignmentManager extends Z
       LOG.info("Found regions out on cluster or in RIT; failover");
       // Process list of dead servers and regions in RIT.
       // See HBASE-4580 for more information.
-      processDeadServersAndRecoverLostRegions(deadServers, nodes);
+      processDeadServersAndRecoverLostRegions(deadServers);
     } else {
       // Fresh cluster startup.
       LOG.info("Clean cluster startup. Assigning userregions");
@@ -474,87 +492,122 @@ public class AssignmentManager extends Z
    */
   void processRegionsInTransition(
       final RegionTransition rt, final HRegionInfo regionInfo,
-      int expectedVersion) throws KeeperException {
+      final int expectedVersion) throws KeeperException {
     EventType et = rt.getEventType();
     // Get ServerName.  Could not be null.
-    ServerName sn = rt.getServerName();
+    final ServerName sn = rt.getServerName();
     String encodedRegionName = regionInfo.getEncodedName();
     LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
 
+
     if (regionStates.isRegionInTransition(encodedRegionName)) {
       // Just return
       return;
     }
     switch (et) {
-    case M_ZK_REGION_CLOSING:
-      // If zk node of the region was updated by a live server skip this
-      // region and just add it into RIT.
-      if (!serverManager.isServerOnline(sn)) {
-        // If was not online, its closed now. Force to OFFLINE and this
-        // will get it reassigned if appropriate
-        forceOffline(regionInfo, rt);
-      } else {
-        // Just insert region into RIT.
-        // If this never updates the timeout will trigger new assignment
-        regionStates.updateRegionState(rt, RegionState.State.CLOSING);
-      }
-      break;
+      case M_ZK_REGION_CLOSING:
+        // If zk node of the region was updated by a live server skip this
+        // region and just add it into RIT.
+        if (!serverManager.isServerOnline(sn)) {
+          // If was not online, its closed now. Force to OFFLINE and this
+          // will get it reassigned if appropriate
+          forceOffline(regionInfo, rt);
+        } else {
+          // Insert into RIT & resend the query to the region server: may be the previous master
+          // died before sending the query the first time.
+          regionStates.updateRegionState(rt, RegionState.State.CLOSING);
+          final RegionState rs = regionStates.getRegionState(regionInfo);
+          this.executorService.submit(
+              new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+                @Override
+                public void process() throws IOException {
+                  ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
+                  try {
+                    unassign(regionInfo, rs, expectedVersion, sn, true);
+                  } finally {
+                    lock.unlock();
+                  }
+                }
+              });
+        }
+        break;
 
-    case RS_ZK_REGION_CLOSED:
-    case RS_ZK_REGION_FAILED_OPEN:
-      // Region is closed, insert into RIT and handle it
-      addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
-      break;
+      case RS_ZK_REGION_CLOSED:
+      case RS_ZK_REGION_FAILED_OPEN:
+        // Region is closed, insert into RIT and handle it
+        addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
+        break;
 
-    case M_ZK_REGION_OFFLINE:
-      // If zk node of the region was updated by a live server skip this
-      // region and just add it into RIT.
-      if (!serverManager.isServerOnline(sn)) {
-        // Region is offline, insert into RIT and handle it like a closed
-        addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
-      } else {
-        // Just insert region into RIT.
-        // If this never updates the timeout will trigger new assignment
-        regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
-      }
-      break;
+      case M_ZK_REGION_OFFLINE:
+        // If zk node of the region was updated by a live server skip this
+        // region and just add it into RIT.
+        if (!serverManager.isServerOnline(sn)) {
+          // Region is offline, insert into RIT and handle it like a closed
+          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+        } else {
+          // Insert in RIT and resend to the regionserver
+          regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
+          final RegionState rs = regionStates.getRegionState(regionInfo);
+          this.executorService.submit(
+              new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+                @Override
+                public void process() throws IOException {
+                  ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
+                  try {
+                    assign(rs, false, false);
+                  } finally {
+                    lock.unlock();
+                  }
+                }
+              });
+        }
+        break;
 
-    case RS_ZK_REGION_OPENING:
-      regionStates.updateRegionState(rt, RegionState.State.OPENING);
-      if (regionInfo.isMetaTable() || !serverManager.isServerOnline(sn)) {
-        // If ROOT or .META. table is waiting for timeout monitor to assign
-        // it may take lot of time when the assignment.timeout.period is
-        // the default value which may be very long.  We will not be able
-        // to serve any request during this time.
-        // So we will assign the ROOT and .META. region immediately.
-        // For a user region, if the server is not online, it takes
-        // some time for timeout monitor to kick in.  We know the region
-        // won't open. So we will assign the opening
-        // region immediately too.
-        //
-        // Otherwise, just insert region into RIT. If the state never
-        // updates, the timeout will trigger new assignment
-        processOpeningState(regionInfo);
-      }
-      break;
+      case RS_ZK_REGION_OPENING:
+        if (!serverManager.isServerOnline(sn)) {
+          forceOffline(regionInfo, rt);
+        } else {
+          regionStates.updateRegionState(rt, RegionState.State.OPENING);
+        }
+        break;
 
-    case RS_ZK_REGION_OPENED:
-      if (!serverManager.isServerOnline(sn)) {
-        forceOffline(regionInfo, rt);
-      } else {
-        // Region is opened, insert into RIT and handle it
-        regionStates.updateRegionState(rt, RegionState.State.OPEN);
-        new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
-      }
-      break;
-    case RS_ZK_REGION_SPLITTING:
-      LOG.debug("Processed region in state : " + et);
-      break;
-    case RS_ZK_REGION_SPLIT:
-      LOG.debug("Processed region in state : " + et);
-      break;
-    default:
-      throw new IllegalStateException("Received region in state :" + et + " is not valid");
+      case RS_ZK_REGION_OPENED:
+        if (!serverManager.isServerOnline(sn)) {
+          forceOffline(regionInfo, rt);
+        } else {
+          // Region is opened, insert into RIT and handle it
+          // This could be done asynchronously, we would need then to acquire the lock in the
+          //  handler.
+          regionStates.updateRegionState(rt, RegionState.State.OPEN);
+          new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
+        }
+        break;
+      case RS_ZK_REGION_SPLITTING:
+        if (!serverManager.isServerOnline(sn)) {
+          // The regionserver started the split, but died before updating the status.
+          // It means (hopefully) that the split was not finished
+          // TBD - to study. In the meantime, do nothing as in the past.
+          LOG.warn("Processed region " + regionInfo.getEncodedName() + " in state : " + et +
+              " on a dead regionserver: " + sn + " doing nothing");
+        } else {
+          LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
+              et + " nothing to do.");
+          // We don't do anything. The way the code is written in RS_ZK_REGION_SPLIT management,
+          //  it adds the RS_ZK_REGION_SPLITTING state if needed. So we don't have to do it here.
+        }
+        break;
+      case RS_ZK_REGION_SPLIT:
+        if (!serverManager.isServerOnline(sn)) {
+          forceOffline(regionInfo, rt);
+        } else {
+          LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
+              et + " nothing to do.");
+          // We don't do anything. The regionserver is supposed to update the znode
+          // multiple times so if it's still up we will receive an update soon.
+        }
+        break;
+      default:
+        throw new IllegalStateException("Received region in state :" + et + " is not valid.");
     }
   }
 
@@ -694,7 +747,7 @@ public class AssignmentManager extends Z
           }
           // Check it has daughters.
           byte [] payload = rt.getPayload();
-          List<HRegionInfo> daughters = null;
+          List<HRegionInfo> daughters;
           try {
             daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
           } catch (IOException e) {
@@ -834,7 +887,7 @@ public class AssignmentManager extends Z
    */
   private boolean convertPendingCloseToSplitting(final RegionState rs) {
     if (!rs.isPendingClose()) return false;
-    LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
+    LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
     regionStates.updateRegionState(
       rs.getRegion(), RegionState.State.SPLITTING);
     // Clean up existing state.  Clear from region plans seems all we
@@ -857,7 +910,7 @@ public class AssignmentManager extends Z
     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
     switch (rt.getEventType()) {
       case M_ZK_REGION_OFFLINE:
-        HRegionInfo regionInfo = null;
+        HRegionInfo regionInfo;
         if (regionState != null) {
           regionInfo = regionState.getRegion();
         } else {
@@ -1350,7 +1403,7 @@ public class AssignmentManager extends Z
         HRegionInfo region = state.getRegion();
         String encodedRegionName = region.getEncodedName();
         Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
-        if (nodeVersion == null || nodeVersion.intValue() == -1) {
+        if (nodeVersion == null || nodeVersion == -1) {
           LOG.warn("failed to offline in zookeeper: " + region);
           failedToOpenRegions.add(region); // assign individually later
           Lock lock = locks.remove(encodedRegionName);
@@ -1555,12 +1608,13 @@ public class AssignmentManager extends Z
     RegionPlan plan = null;
     long maxRegionServerStartupWaitTime = -1;
     HRegionInfo region = state.getRegion();
+    RegionOpeningState regionOpenState;
     for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
       if (plan == null) { // Get a server for the region at first
         plan = getRegionPlan(region, forceNewPlan);
       }
       if (plan == null) {
-        LOG.debug("Unable to determine a plan to assign " + region);
+        LOG.warn("Unable to determine a plan to assign " + region);
         this.timeoutMonitor.setAllRegionServersOffline(true);
         return; // Should get reassigned later when RIT times out.
       }
@@ -1592,118 +1646,119 @@ public class AssignmentManager extends Z
         LOG.debug("Server stopped; skipping assign of " + region);
         return;
       }
-      try {
-        LOG.info("Assigning region " + region.getRegionNameAsString() +
+      LOG.info("Assigning region " + region.getRegionNameAsString() +
           " to " + plan.getDestination().toString());
-        // Transition RegionState to PENDING_OPEN
-        currentState = regionStates.updateRegionState(region,
+      // Transition RegionState to PENDING_OPEN
+      currentState = regionStates.updateRegionState(region,
           RegionState.State.PENDING_OPEN, plan.getDestination());
-        // Send OPEN RPC. This can fail if the server on other end is is not up.
-        // Pass the version that was obtained while setting the node to OFFLINE.
-        RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
-            .getDestination(), region, versionOfOfflineNode);
-        if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
-          processAlreadyOpenedRegion(region, plan.getDestination());
-        } else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
-          // Failed opening this region
-          throw new Exception("Get regionOpeningState=" + regionOpenState);
+
+      boolean needNewPlan;
+      final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
+          " to " + plan.getDestination();
+      try {
+        regionOpenState = serverManager.sendRegionOpen(
+            plan.getDestination(), region, versionOfOfflineNode);
+
+        if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+          // Failed opening this region, looping again on a new server.
+          needNewPlan = true;
+          LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
+              " trying to assign elsewhere instead; " +
+              "try=" + i + " of " + this.maximumAttempts);
+        } else {
+          // we're done
+          if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
+            processAlreadyOpenedRegion(region, plan.getDestination());
+          }
+          return;
         }
-        break;
+
       } catch (Throwable t) {
         if (t instanceof RemoteException) {
           t = ((RemoteException) t).unwrapRemoteException();
         }
-        boolean regionAlreadyInTransitionException = false;
-        boolean serverNotRunningYet = false;
-        boolean socketTimedOut = false;
-        if (t instanceof RegionAlreadyInTransitionException) {
-          regionAlreadyInTransitionException = true;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Failed assignment in: " + plan.getDestination() + " due to "
-              + t.getMessage());
-          }
-        } else if (t instanceof ServerNotRunningYetException) {
+
+        // Should we wait a little before retrying? If the server is starting it's yes.
+        // If the region is already in transition, it's yes as well: we want to be sure that
+        //  the region will get opened but we don't want a double assignment.
+        boolean hold = (t instanceof RegionAlreadyInTransitionException ||
+            t instanceof ServerNotRunningYetException);
+
+        // In case socket is timed out and the region server is still online,
+        // the openRegion RPC could have been accepted by the server and
+        // just the response didn't go through.  So we will retry to
+        // open the region on the same server to avoid possible
+        // double assignment.
+        boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
+            && this.serverManager.isServerOnline(plan.getDestination()));
+
+
+        if (hold) {
+          LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
+              "try=" + i + " of " + this.maximumAttempts, t);
+
           if (maxRegionServerStartupWaitTime < 0) {
-            maxRegionServerStartupWaitTime = System.currentTimeMillis() +
-              this.server.getConfiguration().
-                getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+            maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
+                this.server.getConfiguration().
+                    getLong("hbase.regionserver.rpc.startup.waittime", 60000);
           }
           try {
-            long now = System.currentTimeMillis();
+            long now = EnvironmentEdgeManager.currentTimeMillis();
             if (now < maxRegionServerStartupWaitTime) {
               LOG.debug("Server is not yet up; waiting up to " +
-                (maxRegionServerStartupWaitTime - now) + "ms", t);
-              serverNotRunningYet = true;
+                  (maxRegionServerStartupWaitTime - now) + "ms", t);
               Thread.sleep(100);
               i--; // reset the try count
+              needNewPlan = false;
             } else {
               LOG.debug("Server is not up for a while; try a new one", t);
+              needNewPlan = true;
             }
           } catch (InterruptedException ie) {
             LOG.warn("Failed to assign "
-              + region.getRegionNameAsString() + " since interrupted", ie);
+                + region.getRegionNameAsString() + " since interrupted", ie);
             Thread.currentThread().interrupt();
             return;
           }
-        } else if (t instanceof java.net.SocketTimeoutException
-            && this.serverManager.isServerOnline(plan.getDestination())) {
-          // In case socket is timed out and the region server is still online,
-          // the openRegion RPC could have been accepted by the server and
-          // just the response didn't go through.  So we will retry to
-          // open the region on the same server to avoid possible
-          // double assignment.
-          socketTimedOut = true;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Call openRegion() to " + plan.getDestination()
-              + " has timed out when trying to assign "
-              + region.getRegionNameAsString()
-              + ", but the region might already be opened on "
-              + plan.getDestination() + ".", t);
-          }
+        } else if (retry) {
+          needNewPlan = false;
+          LOG.warn(assignMsg + ", trying to assign to the same region server " +
+              "try=" + i + " of " + this.maximumAttempts, t);
+        } else {
+          needNewPlan = true;
+          LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
+              " try=" + i + " of " + this.maximumAttempts, t);
         }
+      }
+
+      if (i == this.maximumAttempts) {
+        // Don't reset the region state or get a new plan any more.
+        // This is the last try.
+        continue;
+      }
+
+      // If region opened on destination of present plan, reassigning to new
+      // RS may cause double assignments. In case of RegionAlreadyInTransitionException
+      // reassigning to same RS.
+      if (needNewPlan) {
+        // Force a new plan and reassign. Will return null if no servers.
+        // The new plan could be the same as the existing plan since we don't
+        // exclude the server of the original plan, which should not be
+        // excluded since it could be the only server up now.
+        RegionPlan newPlan = getRegionPlan(region, true);
 
-        LOG.warn("Failed assignment of "
-          + region.getRegionNameAsString()
-          + " to "
-          + plan.getDestination()
-          + ", trying to assign "
-          + (regionAlreadyInTransitionException || serverNotRunningYet || socketTimedOut
-            ? "to the same region server because of RegionAlreadyInTransitionException"
-              + "/ServerNotRunningYetException/SocketTimeoutException;"
-              : "elsewhere instead; ")
-          + "try=" + i + " of " + this.maximumAttempts, t);
-
-        if (i == this.maximumAttempts) {
-          // Don't reset the region state or get a new plan any more.
-          // This is the last try.
-          continue;
-        }
-
-        // If region opened on destination of present plan, reassigning to new
-        // RS may cause double assignments. In case of RegionAlreadyInTransitionException
-        // reassigning to same RS.
-        RegionPlan newPlan = plan;
-        if (!(regionAlreadyInTransitionException
-            || serverNotRunningYet || socketTimedOut)) {
-          // Force a new plan and reassign. Will return null if no servers.
-          // The new plan could be the same as the existing plan since we don't
-          // exclude the server of the original plan, which should not be
-          // excluded since it could be the only server up now.
-          newPlan = getRegionPlan(region, true);
-        }
         if (newPlan == null) {
           this.timeoutMonitor.setAllRegionServersOffline(true);
           LOG.warn("Unable to find a viable location to assign region " +
-            region.getRegionNameAsString());
+              region.getRegionNameAsString());
           return;
         }
-        if (plan != newPlan
-            && !plan.getDestination().equals(newPlan.getDestination())) {
+
+        if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
           // Clean out plan we failed execute and one that doesn't look like it'll
           // succeed anyways; we need a new plan!
           // Transition back to OFFLINE
-          currentState = regionStates.updateRegionState(
-            region, RegionState.State.OFFLINE);
+          currentState = regionStates.updateRegionState(region, RegionState.State.OFFLINE);
           versionOfOfflineNode = -1;
           plan = newPlan;
         }
@@ -1723,7 +1778,7 @@ public class AssignmentManager extends Z
     } catch (KeeperException.NoNodeException e) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("The unassigned node " + encodedRegionName
-            + " doesnot exist.");
+            + " does not exist.");
       }
     } catch (KeeperException e) {
       server.abort(
@@ -1761,7 +1816,7 @@ public class AssignmentManager extends Z
     }
     regionStates.updateRegionState(state.getRegion(),
       RegionState.State.OFFLINE);
-    int versionOfOfflineNode = -1;
+    int versionOfOfflineNode;
     try {
       // get the version after setting the znode to OFFLINE
       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
@@ -1812,7 +1867,7 @@ public class AssignmentManager extends Z
 
     RegionPlan randomPlan = null;
     boolean newPlan = false;
-    RegionPlan existingPlan = null;
+    RegionPlan existingPlan;
 
     synchronized (this.regionPlans) {
       existingPlan = this.regionPlans.get(encodedName);
@@ -2017,7 +2072,6 @@ public class AssignmentManager extends Z
       server.abort(
         "Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
           + encodedName, ke);
-      return;
     }
   }
 
@@ -2115,11 +2169,8 @@ public class AssignmentManager extends Z
     Map<ServerName, List<HRegionInfo>> bulkPlan =
       balancer.retainAssignment(regions, servers);
 
-    LOG.info("Bulk assigning " + regions.size() + " region(s) across " +
-      servers.size() + " server(s), retainAssignment=true");
-    BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
-    ba.bulkAssign();
-    LOG.info("Bulk assigning done");
+    assign(regions.size(), servers.size(),
+      "retainAssignment=true", bulkPlan);
   }
 
   /**
@@ -2135,6 +2186,7 @@ public class AssignmentManager extends Z
     if (regions == null || regions.isEmpty()) {
       return;
     }
+
     List<ServerName> servers = serverManager.createDestinationServersList();
     if (servers == null || servers.isEmpty()) {
       throw new IOException("Found no destination server to assign region(s)");
@@ -2144,13 +2196,36 @@ public class AssignmentManager extends Z
     Map<ServerName, List<HRegionInfo>> bulkPlan
       = balancer.roundRobinAssignment(regions, servers);
 
-    LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across "
-      + servers.size() + " server(s)");
+    assign(regions.size(), servers.size(),
+      "round-robin=true", bulkPlan);
+  }
+
+  private void assign(int regions, int totalServers,
+      String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
+          throws InterruptedException, IOException {
+
+    int servers = bulkPlan.size();
+    if (servers == 1 || (regions < bulkAssignThresholdRegions
+        && servers < bulkAssignThresholdServers)) {
+
+      // Not use bulk assignment.  This could be more efficient in small
+      // cluster, especially mini cluster for testing, so that tests won't time out
+      LOG.info("Not use bulk assigning since we are assigning only "
+        + regions + " region(s) to " + servers + " server(s)");
+
+      for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
+        assign(plan.getKey(), plan.getValue());
+      }
+    } else {
+      LOG.info("Bulk assigning " + regions + " region(s) across "
+        + totalServers + " server(s), " + message);
 
-    // Use fixed count thread pool assigning.
-    BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
-    ba.bulkAssign();
-    LOG.info("Bulk assigning done");
+      // Use fixed count thread pool assigning.
+      BulkAssigner ba = new GeneralBulkAssigner(
+        this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
+      ba.bulkAssign();
+      LOG.info("Bulk assigning done");
+    }
   }
 
   /**
@@ -2372,16 +2447,15 @@ public class AssignmentManager extends Z
    * that were in RIT.
    * <p>
    *
+   *
    * @param deadServers
    *          The list of dead servers which failed while there was no active
    *          master. Can be null.
-   * @param nodes
-   *          The regions in RIT
    * @throws IOException
    * @throws KeeperException
    */
   private void processDeadServersAndRecoverLostRegions(
-      Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
+      Map<ServerName, List<HRegionInfo>> deadServers)
           throws IOException, KeeperException {
     if (deadServers != null) {
       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
@@ -2391,7 +2465,7 @@ public class AssignmentManager extends Z
         }
       }
     }
-    nodes = ZKUtil.listChildrenAndWatchForNewChildren(
+    List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
       this.watcher, this.watcher.assignmentZNode);
     if (!nodes.isEmpty()) {
       for (String encodedRegionName : nodes) {
@@ -2634,12 +2708,9 @@ public class AssignmentManager extends Z
       invokeAssign(regionInfo);
     } catch (KeeperException ke) {
       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
-      return;
     } catch (DeserializationException e) {
       LOG.error("Unexpected exception parsing CLOSING region", e);
-      return;
     }
-    return;
   }
 
   void invokeAssign(HRegionInfo regionInfo) {
@@ -2705,7 +2776,7 @@ public class AssignmentManager extends Z
    * @param sn Server that went down.
    * @return list of regions in transition on this server
    */
-  public List<RegionState> processServerShutdown(final ServerName sn) {
+  public List<HRegionInfo> processServerShutdown(final ServerName sn) {
     // Clean out any existing assignment plans for this server
     synchronized (this.regionPlans) {
       for (Iterator <Map.Entry<String, RegionPlan>> i =
@@ -2719,7 +2790,30 @@ public class AssignmentManager extends Z
         }
       }
     }
-    return regionStates.serverOffline(sn);
+    List<HRegionInfo> regions = regionStates.serverOffline(sn);
+    for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
+      HRegionInfo hri = it.next();
+      String encodedName = hri.getEncodedName();
+
+      // We need a lock on the region as we could update it
+      Lock lock = locker.acquireLock(encodedName);
+      try {
+        RegionState regionState =
+          regionStates.getRegionTransitionState(encodedName);
+        if (regionState == null
+            || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+          LOG.info("Skip region " + hri
+            + " since it is not opening on the dead server any more: " + sn);
+          it.remove();
+        } else {
+          // Mark the region closed and assign it again by SSH
+          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+    return regions;
   }
 
   /**

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java Thu Feb 14 13:35:54 2013
@@ -18,20 +18,26 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import java.util.Collection;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ServerName;
-
 /**
  * Class to hold dead servers list and utility querying dead server list.
  */
 @InterfaceAudience.Private
-public class DeadServer implements Set<ServerName> {
+public class DeadServer {
   /**
    * Set of known dead servers.  On znode expiration, servers are added here.
    * This is needed in case of a network partitioning where the server's lease
@@ -39,75 +45,66 @@ public class DeadServer implements Set<S
    * and it's server logs are recovered, it will be told to call server startup
    * because by then, its regions have probably been reassigned.
    */
-  private final Set<ServerName> deadServers = new HashSet<ServerName>();
-
-  /** Number of dead servers currently being processed */
-  private int numProcessing;
-
-  public DeadServer() {
-    super();
-    this.numProcessing = 0;
-  }
+  private final Map<ServerName, Long> deadServers = new HashMap<ServerName, Long>();
 
   /**
-   * @param serverName Server name
-   * @return true if server is dead
+   * Number of dead servers currently being processed
    */
-  public boolean isDeadServer(final String serverName) {
-    return isDeadServer(new ServerName(serverName));
-  }
+  private int numProcessing = 0;
 
   /**
-   * A dead server that comes back alive has a different start code.
+   * A dead server that comes back alive has a different start code. The new start code should be
+   *  greater than the old one, but we don't take this into account in this method.
+   *
    * @param newServerName Servername as either <code>host:port</code> or
-   * <code>host,port,startcode</code>.
+   *                      <code>host,port,startcode</code>.
    * @return true if this server was dead before and coming back alive again
    */
-  public boolean cleanPreviousInstance(final ServerName newServerName) {
-    ServerName sn =
-      ServerName.findServerWithSameHostnamePort(this.deadServers, newServerName);
-    if (sn == null) return false;
-    return this.deadServers.remove(sn);
+  public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
+    Iterator<ServerName> it = deadServers.keySet().iterator();
+    while (it.hasNext()) {
+      ServerName sn = it.next();
+      if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
+        it.remove();
+        return true;
+      }
+    }
+
+    return false;
   }
 
   /**
    * @param serverName
    * @return true if this server is on the dead servers list.
    */
-  boolean isDeadServer(final ServerName serverName) {
-    return this.deadServers.contains(serverName);
-  }
-
-  /**
-   * @return True if we have a server with matching hostname and port.
-   */
-  boolean isDeadServerWithSameHostnamePort(final ServerName serverName) {
-    return ServerName.findServerWithSameHostnamePort(this.deadServers,
-      serverName) != null;
+  public synchronized boolean isDeadServer(final ServerName serverName) {
+    return deadServers.containsKey(serverName);
   }
 
   /**
    * Checks if there are currently any dead servers being processed by the
    * master.  Returns true if at least one region server is currently being
    * processed as dead.
+   *
    * @return true if any RS are being processed as dead
    */
-  public boolean areDeadServersInProgress() {
+  public synchronized boolean areDeadServersInProgress() {
     return numProcessing != 0;
   }
 
-  public synchronized Set<ServerName> clone() {
-    Set<ServerName> clone = new HashSet<ServerName>(this.deadServers.size());
-    clone.addAll(this.deadServers);
+  public synchronized Set<ServerName> copyServerNames() {
+    Set<ServerName> clone = new HashSet<ServerName>(deadServers.size());
+    clone.addAll(deadServers.keySet());
     return clone;
   }
 
   public synchronized boolean add(ServerName e) {
     this.numProcessing++;
-    return deadServers.add(e);
+    return deadServers.put(e, EnvironmentEdgeManager.currentTimeMillis()) != null;
   }
 
-  public synchronized void finish(ServerName e) {
+  @SuppressWarnings("UnusedParameters")
+  public synchronized void finish(ServerName ignored) {
     this.numProcessing--;
   }
 
@@ -119,55 +116,51 @@ public class DeadServer implements Set<S
     return deadServers.isEmpty();
   }
 
-  public synchronized boolean contains(Object o) {
-    return deadServers.contains(o);
-  }
-
-  public Iterator<ServerName> iterator() {
-    return this.deadServers.iterator();
-  }
-
-  public synchronized Object[] toArray() {
-    return deadServers.toArray();
-  }
-
-  public synchronized <T> T[] toArray(T[] a) {
-    return deadServers.toArray(a);
-  }
-
-  public synchronized boolean remove(Object o) {
-    return this.deadServers.remove(o);
-  }
-
-  public synchronized boolean containsAll(Collection<?> c) {
-    return deadServers.containsAll(c);
-  }
-
-  public synchronized boolean addAll(Collection<? extends ServerName> c) {
-    return deadServers.addAll(c);
-  }
-
-  public synchronized boolean retainAll(Collection<?> c) {
-    return deadServers.retainAll(c);
-  }
-
-  public synchronized boolean removeAll(Collection<?> c) {
-    return deadServers.removeAll(c);
-  }
-
-  public synchronized void clear() {
-    throw new NotImplementedException();
-  }
-
-  public synchronized boolean equals(Object o) {
-    return deadServers.equals(o);
-  }
-
-  public synchronized int hashCode() {
-    return deadServers.hashCode();
+  public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
+    Iterator<ServerName> it = deadServers.keySet().iterator();
+    while (it.hasNext()) {
+      ServerName sn = it.next();
+      if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
+        it.remove();
+      }
+    }
   }
 
   public synchronized String toString() {
-    return this.deadServers.toString();
+    StringBuilder sb = new StringBuilder();
+    for (ServerName sn : deadServers.keySet()) {
+      if (sb.length() > 0) {
+        sb.append(", ");
+      }
+      sb.append(sn.toString());
+    }
+    return sb.toString();
   }
+
+  /**
+   * Extract all the servers dead since a given time, and sort them.
+   * @param ts the time, 0 for all
+   * @return a sorted array list, by death time, lowest values first.
+   */
+  public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
+    List<Pair<ServerName, Long>> res =  new ArrayList<Pair<ServerName, Long>>(size());
+
+    for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
+      if (entry.getValue() >= ts){
+        res.add(new Pair<ServerName, Long>(entry.getKey(), entry.getValue()));
+      }
+    }
+
+    Collections.sort(res, ServerNameDeathDateComparator);
+    return res;
+  }
+
+  private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
+      new Comparator<Pair<ServerName, Long>>(){
+
+    @Override
+    public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
+      return o1.getSecond().compareTo(o2.getSecond());
+    }
+  };
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Thu Feb 14 13:35:54 2013
@@ -50,13 +50,15 @@ public class GeneralBulkAssigner extends
 
   final Map<ServerName, List<HRegionInfo>> bulkPlan;
   final AssignmentManager assignmentManager;
+  final boolean waitTillAllAssigned;
 
   GeneralBulkAssigner(final Server server,
       final Map<ServerName, List<HRegionInfo>> bulkPlan,
-      final AssignmentManager am) {
+      final AssignmentManager am, final boolean waitTillAllAssigned) {
     super(server);
     this.bulkPlan = bulkPlan;
     this.assignmentManager = am;
+    this.waitTillAllAssigned = waitTillAllAssigned;
   }
 
   @Override
@@ -133,6 +135,10 @@ public class GeneralBulkAssigner extends
           regionInfoIterator.remove();
         }
       }
+      if (!waitTillAllAssigned) {
+        // No need to wait, let assignment going on asynchronously
+        break;
+      }
       if (!regionSet.isEmpty()) {
         regionStates.waitForUpdate(100);
       }
@@ -142,7 +148,7 @@ public class GeneralBulkAssigner extends
       long elapsedTime = System.currentTimeMillis() - startTime;
       String status = "successfully";
       if (!regionSet.isEmpty()) {
-        status = "with " + regionSet.size() + " regions still not assigned yet";
+        status = "with " + regionSet.size() + " regions still in transition";
       }
       LOG.debug("bulk assigning total " + regionCount + " regions to "
         + serverCount + " servers, took " + elapsedTime + "ms, " + status);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Feb 14 13:35:54 2013
@@ -600,7 +600,6 @@ Server {
    * @param abortable If fatal exception we'll call abort on this.  May be null.
    * If it is we'll use the Connection associated with the passed
    * {@link Configuration} as our {@link Abortable}.
-   * @param defaultTimeout Timeout to use.  Pass zero for no timeout
    * ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
    * @throws IOException
    */
@@ -1429,7 +1428,7 @@ Server {
    * @param b If false, the catalog janitor won't do anything.
    */
   public void setCatalogJanitorEnabled(final boolean b) {
-    ((CatalogJanitor)this.catalogJanitorChore).setEnabled(b);
+    this.catalogJanitorChore.setEnabled(b);
   }
 
   @Override
@@ -1880,7 +1879,7 @@ Server {
     return new ClusterStatus(VersionInfo.getVersion(),
       this.fileSystemManager.getClusterId().toString(),
       this.serverManager.getOnlineServers(),
-      this.serverManager.getDeadServers(),
+      this.serverManager.getDeadServers().copyServerNames(),
       this.serverName,
       backupMasters,
       this.assignmentManager.getRegionStates().getRegionsInTransition(),
@@ -1929,7 +1928,7 @@ Server {
   public String[] getCoprocessors() {
     Set<String> masterCoprocessors =
         getCoprocessorHost().getCoprocessors();
-    return masterCoprocessors.toArray(new String[0]);
+    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
   }
 
   @Override

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java Thu Feb 14 13:35:54 2013
@@ -121,8 +121,7 @@ public class HMasterCommandLine extends 
       // If 'local', defer to LocalHBaseCluster instance.  Starts master
       // and regionserver both in the one JVM.
       if (LocalHBaseCluster.isLocal(conf)) {
-        final MiniZooKeeperCluster zooKeeperCluster =
-          new MiniZooKeeperCluster();
+        final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf);
         File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR));
         int zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);
         if (zkClientPort == 0) {

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java Thu Feb 14 13:35:54 2013
@@ -63,7 +63,7 @@ public class MasterStatusServlet extends
     ServerName rootLocation = getRootLocationOrNull(master);
     ServerName metaLocation = master.getCatalogTracker().getMetaLocation();
     List<ServerName> servers = master.getServerManager().getOnlineServersList();
-    Set<ServerName> deadServers = master.getServerManager().getDeadServers();
+    Set<ServerName> deadServers = master.getServerManager().getDeadServers().copyServerNames();
 
     response.setContentType("text/html");
     MasterStatusTmpl tmpl;

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Thu Feb 14 13:35:54 2013
@@ -300,7 +300,7 @@ public class RegionStates {
       State state = oldState.getState();
       ServerName sn = oldState.getServerName();
       if (state != State.OFFLINE || sn != null) {
-        LOG.debug("Online a region with current state=" + state + ", expected state=OFFLINE"
+        LOG.debug("Offline a region with current state=" + state + ", expected state=OFFLINE"
           + ", assigned to server: " + sn + ", expected null");
       }
     }
@@ -317,10 +317,10 @@ public class RegionStates {
   /**
    * A server is offline, all regions on it are dead.
    */
-  public synchronized List<RegionState> serverOffline(final ServerName sn) {
+  public synchronized List<HRegionInfo> serverOffline(final ServerName sn) {
     // Clean up this server from map of servers to regions, and remove all regions
     // of this server from online map of regions.
-    List<RegionState> rits = new ArrayList<RegionState>();
+    List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
     Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
     if (assignedRegions == null) {
       assignedRegions = new HashSet<HRegionInfo>();
@@ -330,19 +330,23 @@ public class RegionStates {
       regionAssignments.remove(region);
     }
 
-    // See if any of the regions that were online on this server were in RIT
-    // If they are, normal timeouts will deal with them appropriately so
-    // let's skip a manual re-assignment.
     for (RegionState state : regionsInTransition.values()) {
-      if (assignedRegions.contains(state.getRegion())) {
-        rits.add(state);
+      HRegionInfo hri = state.getRegion();
+      if (assignedRegions.contains(hri)) {
+        // Region is open on this region server, but in transition.
+        // This region must be moving away from this server.
+        // SSH will handle it, either skip assigning, or re-assign.
+        LOG.info("Transitioning region "
+          + state + " will be handled by SSH for " + sn);
       } else if (sn.equals(state.getServerName())) {
         // Region is in transition on this region server, and this
         // region is not open on this server. So the region must be
         // moving to this server from another one (i.e. opening or
         // pending open on this server, was open on another one
         if (state.isPendingOpen() || state.isOpening()) {
-          state.setTimestamp(0); // timeout it, let timeout monitor reassign
+          LOG.info("Found opening region "
+            + state + " to be reassigned by SSH for " + sn);
+          rits.add(hri);
         } else {
           LOG.warn("THIS SHOULD NOT HAPPEN: unexpected state "
             + state + " of region in transition on server " + sn);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Thu Feb 14 13:35:54 2013
@@ -130,7 +130,7 @@ public class ServerManager {
   private final MasterServices services;
   private final HConnection connection;
 
-  private final DeadServer deadservers;
+  private final DeadServer deadservers = new DeadServer();
 
   private final long maxSkew;
   private final long warningSkew;
@@ -188,7 +188,6 @@ public class ServerManager {
     Configuration c = master.getConfiguration();
     maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
-    this.deadservers = new DeadServer();
     this.connection = connect ? HConnectionManager.getConnection(c) : null;
   }
 
@@ -405,8 +404,9 @@ public class ServerManager {
     }
   }
 
-  public Set<ServerName> getDeadServers() {
-    return this.deadservers.clone();
+
+  public DeadServer getDeadServers() {
+    return this.deadservers;
   }
 
   /**
@@ -458,7 +458,7 @@ public class ServerManager {
       LOG.warn("Received expiration of " + serverName +
         " but server is not currently online");
     }
-    if (this.deadservers.contains(serverName)) {
+    if (this.deadservers.isDeadServer(serverName)) {
       // TODO: Can this happen?  It shouldn't be online in this case?
       LOG.warn("Received expiration of " + serverName +
           " but server shutdown is already in progress");
@@ -886,13 +886,8 @@ public class ServerManager {
    * To clear any dead server with same host name and port of any online server
    */
   void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
-    ServerName sn;
     for (ServerName serverName : getOnlineServersList()) {
-      while ((sn = ServerName.
-          findServerWithSameHostnamePort(this.deadservers, serverName)) != null) {
-        this.deadservers.remove(sn);
-      }
+      deadservers.cleanAllPreviousInstances(serverName);
     }
   }
-
 }

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Thu Feb 14 13:35:54 2013
@@ -75,7 +75,7 @@ public class ServerShutdownHandler exten
     this.server = server;
     this.services = services;
     this.deadServers = deadServers;
-    if (!this.deadServers.contains(this.serverName)) {
+    if (!this.deadServers.isDeadServer(this.serverName)) {
       LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
     }
     this.shouldSplitHlog = shouldSplitHlog;
@@ -198,25 +198,30 @@ public class ServerShutdownHandler exten
       // OFFLINE? -- and then others after like CLOSING that depend on log
       // splitting.
       AssignmentManager am = services.getAssignmentManager();
-      List<RegionState> regionsInTransition = am.processServerShutdown(serverName);
+      List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);
       LOG.info("Reassigning " + ((hris == null)? 0: hris.size()) +
         " region(s) that " + (serverName == null? "null": serverName)  +
-        " was carrying (skipping " + regionsInTransition.size() +
-        " regions(s) that are already in transition)");
+        " was carrying (and " + regionsInTransition.size() +
+        " regions(s) that were opening on this server)");
+
+      List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
+      toAssignRegions.addAll(regionsInTransition);
 
       // Iterate regions that were on this server and assign them
       if (hris != null) {
         RegionStates regionStates = am.getRegionStates();
-        List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
         for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
           HRegionInfo hri = e.getKey();
+          if (regionsInTransition.contains(hri)) {
+            continue;
+          }
           RegionState rit = regionStates.getRegionTransitionState(hri);
           if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
             ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
             if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
               // If this region is in transition on the dead server, it must be
-              // opening or pending_open, which is covered by AM#processServerShutdown
-              LOG.debug("Skip assigning region " + hri.getRegionNameAsString()
+              // opening or pending_open, which should have been covered by AM#processServerShutdown
+              LOG.info("Skip assigning region " + hri.getRegionNameAsString()
                 + " because it has been opened in " + addressFromAM.getServerName());
               continue;
             }
@@ -262,12 +267,12 @@ public class ServerShutdownHandler exten
             }
           }
         }
-        try {
-          am.assign(toAssignRegions);
-        } catch (InterruptedException ie) {
-          LOG.error("Caught " + ie + " during round-robin assignment");
-          throw new IOException(ie);
-        }
+      }
+      try {
+        am.assign(toAssignRegions);
+      } catch (InterruptedException ie) {
+        LOG.error("Caught " + ie + " during round-robin assignment");
+        throw new IOException(ie);
       }
     } finally {
       this.deadServers.finish(serverName);

Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1446173&r1=1446172&r2=1446173&view=diff
==============================================================================
--- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Feb 14 13:35:54 2013
@@ -136,7 +136,6 @@ import org.apache.hadoop.util.StringUtil
 import org.cliffc.high_scale_lib.Counter;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
@@ -394,16 +393,6 @@ public class HRegion implements HeapSize
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
 
   /**
-   * HRegion copy constructor. Useful when reopening a closed region (normally
-   * for unit tests)
-   * @param other original object
-   */
-  public HRegion(HRegion other) {
-    this(other.getTableDir(), other.getLog(), other.getFilesystem(),
-        other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
-  }
-
-  /**
    * HRegion constructor.  his constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
    * {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices)} method.
@@ -519,9 +508,13 @@ public class HRegion implements HeapSize
 
   /**
    * Initialize this region.
+   * Used only by tests and SplitTransaction to reopen the region.
+   * You should use createHRegion() or openHRegion()
    * @return What the next sequence (edit) id should be.
    * @throws IOException e
+   * @deprecated use HRegion.createHRegion() or HRegion.openHRegion()
    */
+  @Deprecated
   public long initialize() throws IOException {
     return initialize(null);
   }
@@ -533,8 +526,7 @@ public class HRegion implements HeapSize
    * @return What the next sequence (edit) id should be.
    * @throws IOException e
    */
-  public long initialize(final CancelableProgressable reporter)
-  throws IOException {
+  private long initialize(final CancelableProgressable reporter) throws IOException {
     MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
     long nextSeqId = -1;
     try {
@@ -681,10 +673,7 @@ public class HRegion implements HeapSize
    */
   public boolean hasReferences() {
     for (Store store : this.stores.values()) {
-      for (StoreFile sf : store.getStorefiles()) {
-        // Found a reference, return.
-        if (sf.isReference()) return true;
-      }
+      if (store.hasReferences()) return true;
     }
     return false;
   }
@@ -1036,24 +1025,22 @@ public class HRegion implements HeapSize
         ThreadPoolExecutor storeCloserThreadPool =
           getStoreOpenAndCloseThreadPool("StoreCloserThread-"
             + this.regionInfo.getRegionNameAsString());
-        CompletionService<ImmutableList<StoreFile>> completionService =
-          new ExecutorCompletionService<ImmutableList<StoreFile>>(
-            storeCloserThreadPool);
+        CompletionService<Collection<StoreFile>> completionService =
+          new ExecutorCompletionService<Collection<StoreFile>>(storeCloserThreadPool);
 
         // close each store in parallel
         for (final Store store : stores.values()) {
           completionService
-              .submit(new Callable<ImmutableList<StoreFile>>() {
-                public ImmutableList<StoreFile> call() throws IOException {
+              .submit(new Callable<Collection<StoreFile>>() {
+                public Collection<StoreFile> call() throws IOException {
                   return store.close();
                 }
               });
         }
         try {
           for (int i = 0; i < stores.size(); i++) {
-            Future<ImmutableList<StoreFile>> future = completionService
-                .take();
-            ImmutableList<StoreFile> storeFileList = future.get();
+            Future<Collection<StoreFile>> future = completionService.take();
+            Collection<StoreFile> storeFileList = future.get();
             result.addAll(storeFileList);
           }
         } catch (InterruptedException e) {
@@ -2497,7 +2484,7 @@ public class HRegion implements HeapSize
       // 2.1. build the snapshot reference directory for the store
       Path dstStoreDir = TakeSnapshotUtils.getStoreSnapshotDirectory(snapshotRegionDir,
         Bytes.toString(store.getFamily().getName()));
-      List<StoreFile> storeFiles = store.getStorefiles();
+      List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
       }
@@ -3115,8 +3102,7 @@ public class HRegion implements HeapSize
           throw new IllegalArgumentException("No column family : " +
               new String(column) + " available");
         }
-        List<StoreFile> storeFiles = store.getStorefiles();
-        for (StoreFile storeFile: storeFiles) {
+        for (StoreFile storeFile: store.getStorefiles()) {
           storeFileNames.add(storeFile.getPath().toString());
         }
       }
@@ -3742,10 +3728,11 @@ public class HRegion implements HeapSize
           if (this.joinedHeap != null) {
             KeyValue nextJoinedKv = joinedHeap.peek();
             // If joinedHeap is pointing to some other row, try to seek to a correct one.
-            // We don't need to recheck that row here - populateResult will take care of that.
             boolean mayHaveData =
               (nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
-              || this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length));
+              || (this.joinedHeap.seek(KeyValue.createFirstOnRow(currentRow, offset, length))
+                  && joinedHeap.peek() != null
+                  && joinedHeap.peek().matchingRow(currentRow, offset, length));
             if (mayHaveData) {
               joinedContinuationRow = current;
               populateFromJoinedHeap(results, limit, metric);
@@ -3865,7 +3852,7 @@ public class HRegion implements HeapSize
    * @param rsServices
    * @return the new instance
    */
-  public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
+  static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
       Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
       RegionServerServices rsServices) {
     try {
@@ -4032,7 +4019,7 @@ public class HRegion implements HeapSize
    * HLog#setSequenceNumber(long) passing the result of the call to
    * HRegion#getMinSequenceId() to ensure the log id is properly kept
    * up.  HRegionStore does this every time it opens a new region.
-   * @param conf
+   * @param conf The Configuration object to use.
    * @param rsServices An interface we can request flushes against.
    * @param reporter An interface we can report progress against.
    * @return new HRegion
@@ -4044,26 +4031,22 @@ public class HRegion implements HeapSize
     final RegionServerServices rsServices,
     final CancelableProgressable reporter)
   throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening region: " + info);
-    }
-    if (info == null) {
-      throw new NullPointerException("Passed region info is null");
-    }
-    Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
-      info.getTableName());
-    FileSystem fs = null;
-    if (rsServices != null) {
-      fs = rsServices.getFileSystem();
-    }
-    if (fs == null) {
-      fs = FileSystem.get(conf);
-    }
-    HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info,
-      htd, rsServices);
-    return r.openHRegion(reporter);
+    return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
   }
 
+  /**
+   * Open a Region.
+   * @param rootDir Root directory for HBase instance
+   * @param info Info for region to be opened.
+   * @param htd the table descriptor
+   * @param wal HLog for region to use. This method will call
+   * HLog#setSequenceNumber(long) passing the result of the call to
+   * HRegion#getMinSequenceId() to ensure the log id is properly kept
+   * up.  HRegionStore does this every time it opens a new region.
+   * @param conf The Configuration object to use.
+   * @return new HRegion
+   * @throws IOException
+   */
   public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
       final HTableDescriptor htd, final HLog wal, final Configuration conf)
   throws IOException {
@@ -4074,14 +4057,15 @@ public class HRegion implements HeapSize
    * Open a Region.
    * @param rootDir Root directory for HBase instance
    * @param info Info for region to be opened.
+   * @param htd the table descriptor
    * @param wal HLog for region to use. This method will call
    * HLog#setSequenceNumber(long) passing the result of the call to
    * HRegion#getMinSequenceId() to ensure the log id is properly kept
    * up.  HRegionStore does this every time it opens a new region.
-   * @param conf
+   * @param conf The Configuration object to use.
+   * @param rsServices An interface we can request flushes against.
    * @param reporter An interface we can report progress against.
    * @return new HRegion
-   *
    * @throws IOException
    */
   public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
@@ -4089,16 +4073,79 @@ public class HRegion implements HeapSize
       final RegionServerServices rsServices,
       final CancelableProgressable reporter)
   throws IOException {
+    FileSystem fs = null;
+    if (rsServices != null) {
+      fs = rsServices.getFileSystem();
+    }
+    if (fs == null) {
+      fs = FileSystem.get(conf);
+    }
+    return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
+  }
+
+  /**
+   * Open a Region.
+   * @param conf The Configuration object to use.
+   * @param fs Filesystem to use
+   * @param rootDir Root directory for HBase instance
+   * @param info Info for region to be opened.
+   * @param htd the table descriptor
+   * @param wal HLog for region to use. This method will call
+   * HLog#setSequenceNumber(long) passing the result of the call to
+   * HRegion#getMinSequenceId() to ensure the log id is properly kept
+   * up.  HRegionStore does this every time it opens a new region.
+   * @return new HRegion
+   * @throws IOException
+   */
+  public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
+      final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal)
+      throws IOException {
+    return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
+  }
+
+  /**
+   * Open a Region.
+   * @param conf The Configuration object to use.
+   * @param fs Filesystem to use
+   * @param rootDir Root directory for HBase instance
+   * @param info Info for region to be opened.
+   * @param htd the table descriptor
+   * @param wal HLog for region to use. This method will call
+   * HLog#setSequenceNumber(long) passing the result of the call to
+   * HRegion#getMinSequenceId() to ensure the log id is properly kept
+   * up.  HRegionStore does this every time it opens a new region.
+   * @param rsServices An interface we can request flushes against.
+   * @param reporter An interface we can report progress against.
+   * @return new HRegion
+   * @throws IOException
+   */
+  public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
+      final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final HLog wal,
+      final RegionServerServices rsServices, final CancelableProgressable reporter)
+      throws IOException {
     if (info == null) throw new NullPointerException("Passed region info is null");
     LOG.info("HRegion.openHRegion Region name ==" + info.getRegionNameAsString());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Opening region: " + info);
     }
     Path dir = HTableDescriptor.getTableDir(rootDir, info.getTableName());
-    HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info, htd, rsServices);
+    HRegion r = HRegion.newHRegion(dir, wal, fs, conf, info, htd, rsServices);
     return r.openHRegion(reporter);
   }
 
+  /**
+   * Useful when reopening a closed region (normally for unit tests)
+   * @param other original object
+   * @param reporter An interface we can report progress against.
+   * @return new HRegion
+   * @throws IOException
+   */
+  public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
+      throws IOException {
+    HRegion r = newHRegion(other.getTableDir(), other.getLog(), other.getFilesystem(),
+        other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
+    return r.openHRegion(reporter);
+  }
 
   /**
    * Open HRegion.
@@ -4127,6 +4174,22 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * Create a daughter region from given a temp directory with the region data.
+   * @param hri Spec. for daughter region to open.
+   * @param daughterTmpDir Directory that contains region files.
+   * @throws IOException
+   */
+  HRegion createDaughterRegion(final HRegionInfo hri, final Path daughterTmpDir)
+      throws IOException {
+    HRegion r = HRegion.newHRegion(this.getTableDir(), this.getLog(), fs,
+        this.getBaseConf(), hri, this.getTableDesc(), rsServices);
+    r.readRequestsCount.set(this.getReadRequestsCount() / 2);
+    r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
+    moveInitialFilesIntoPlace(fs, daughterTmpDir, r.getRegionDir());
+    return r;
+  }
+
+  /**
    * Inserts a new region's meta information into the passed
    * <code>meta</code> region. Used by the HMaster bootstrap code adding
    * new table to ROOT table.
@@ -4731,6 +4794,7 @@ public class HRegion implements HeapSize
     long size = 0;
     long txid = 0;
 
+    checkReadOnly();
     // Lock row
     startRegionOperation();
     this.writeRequestsCount.increment();
@@ -4895,6 +4959,7 @@ public class HRegion implements HeapSize
     long size = 0;
     long txid = 0;
 
+    checkReadOnly();
     // Lock row
     startRegionOperation();
     this.writeRequestsCount.increment();



Mime
View raw message