hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1166857 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase: ipc/ master/ regionserver/ regionserver/handler/ zookeeper/
Date Thu, 08 Sep 2011 18:50:27 GMT
Author: stack
Date: Thu Sep  8 18:50:26 2011
New Revision: 1166857

URL: http://svn.apache.org/viewvc?rev=1166857&view=rev
Log:
HBASE-4105 HBASE-4015-Making the timeout monitor less racy; third attempt

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Sep  8
18:50:26 2011
@@ -333,7 +333,7 @@ public interface HRegionInterface extend
    * @param region
    *          region to open
    * @return RegionOpeningState 
-   *         OPENED - if region opened succesfully.
+   *         OPENED         - if region open request was successful.
    *         ALREADY_OPENED - if the region was already opened. 
    *         FAILED_OPENING - if region opening failed.
    *
@@ -342,6 +342,22 @@ public interface HRegionInterface extend
   public RegionOpeningState openRegion(final HRegionInfo region) throws IOException;
 
   /**
+   * Opens the specified region.
+   * @param region
+   *          region to open
+   * @param versionOfOfflineNode
+   *          the version of znode to compare when RS transitions the znode from
+   *          OFFLINE state.
+   * @return RegionOpeningState 
+   *         OPENED         - if region open request was successful.
+   *         ALREADY_OPENED - if the region was already opened. 
+   *         FAILED_OPENING - if region opening failed.
+   * @throws IOException
+   */
+  public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
+      throws IOException;
+  
+  /**
    * Opens the specified regions.
    * @param regions regions to open
    * @throws IOException

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java?rev=1166857&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java Thu Sep 
8 18:50:26 2011
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * A callable object that invokes the corresponding action that needs to be
+ * taken for assignment of a region in transition. 
+ * Implementing as future callable we are able to act on the timeout
+ * asynchronously.
+ */
+public class AssignCallable implements Callable<Object> {
+  private AssignmentManager assignmentManager;
+
+  private HRegionInfo hri;
+
+  public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
+    this.assignmentManager = assignmentManager;
+    this.hri = hri;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    assignmentManager.assign(hri, true, true, true);
+    return null;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Sep
 8 18:50:26 2011
@@ -38,6 +38,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -159,6 +160,9 @@ public class AssignmentManager extends Z
 
   private final ExecutorService executorService;
 
+  //Thread pool executor service for timeout monitor
+  private java.util.concurrent.ExecutorService threadPoolExecutorService;
+
   /**
    * Constructs a new assignment manager.
    *
@@ -190,6 +194,7 @@ public class AssignmentManager extends Z
     this.maximumAssignmentAttempts =
       this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
+    this.threadPoolExecutorService = Executors.newCachedThreadPool();
   }
 
   /**
@@ -475,9 +480,20 @@ public class AssignmentManager extends Z
 
         // Just insert region into RIT
         // If this never updates the timeout will trigger new assignment
-        regionsInTransition.put(encodedRegionName, new RegionState(
-            regionInfo, RegionState.State.OPENING,
-            data.getStamp(), data.getOrigin()));
+        if (regionInfo.isMetaRegion() || regionInfo.isRootRegion()) {
+          regionsInTransition.put(encodedRegionName, new RegionState(
+              regionInfo, RegionState.State.OPENING, data.getStamp(), data
+                  .getOrigin()));
+          // If ROOT or .META. table is waiting for timeout monitor to assign
+          // it may take lot of time when the assignment.timeout.period is
+          // the default value which may be very long.  We will not be able
+          // to serve any request during this time.
+          // So we will assign the ROOT and .META. region immediately.
+          processOpeningState(regionInfo);
+          break;
+        }
+        regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
+            RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
         break;
 
       case RS_ZK_REGION_OPENED:
@@ -1109,12 +1125,21 @@ public class AssignmentManager extends Z
 
   public void assign(HRegionInfo region, boolean setOfflineInZK,
       boolean forceNewPlan) {
-    String tableName = region.getTableNameAsString();
-    boolean disabled = this.zkTable.isDisabledTable(tableName);
-    if (disabled || this.zkTable.isDisablingTable(tableName)) {
-      LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
-        " skipping assign of " + region.getRegionNameAsString());
-      offlineDisabledRegion(region);
+    assign(region, setOfflineInZK, forceNewPlan, false);
+  }
+
+  /**
+   * @param region
+   * @param setOfflineInZK
+   * @param forceNewPlan
+   * @param hijack
+   *          - true new assignment is needed, false otherwise
+   */
+  public void assign(HRegionInfo region, boolean setOfflineInZK,
+      boolean forceNewPlan, boolean hijack) {
+    //If hijack is true do not call disableRegionIfInRIT as 
+    // we have not yet moved the znode to OFFLINE state.
+    if (!hijack && isDisabledorDisablingRegionInRIT(region)) {
       return;
     }
     if (this.serverManager.isClusterShutdown()) {
@@ -1122,9 +1147,10 @@ public class AssignmentManager extends Z
         region.getRegionNameAsString());
       return;
     }
-    RegionState state = addToRegionsInTransition(region);
+    RegionState state = addToRegionsInTransition(region,
+        hijack);
     synchronized (state) {
-      assign(state, setOfflineInZK, forceNewPlan);
+      assign(region, state, setOfflineInZK, forceNewPlan, hijack);
     }
   }
 
@@ -1282,11 +1308,19 @@ public class AssignmentManager extends Z
    * @return The current RegionState
    */
   private RegionState addToRegionsInTransition(final HRegionInfo region) {
+    return addToRegionsInTransition(region, false);
+  }
+  /**
+   * @param region
+   * @param hijack
+   * @return The current RegionState
+   */
+  private RegionState addToRegionsInTransition(final HRegionInfo region,
+      boolean hijack) {
     synchronized (regionsInTransition) {
-      return forceRegionStateToOffline(region);
+      return forceRegionStateToOffline(region, hijack);
     }
   }
-
   /**
    * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
    * Caller must hold lock on this.regionsInTransition.
@@ -1294,14 +1328,32 @@ public class AssignmentManager extends Z
    * @return Amended RegionState.
    */
   private RegionState forceRegionStateToOffline(final HRegionInfo region) {
+    return forceRegionStateToOffline(region, false);
+  }
+
+  /**
+   * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
+   * Caller must hold lock on this.regionsInTransition.
+   * @param region
+   * @param hijack
+   * @return Amended RegionState.
+   */
+  private RegionState forceRegionStateToOffline(final HRegionInfo region,
+      boolean hijack) {
     String encodedName = region.getEncodedName();
     RegionState state = this.regionsInTransition.get(encodedName);
     if (state == null) {
       state = new RegionState(region, RegionState.State.OFFLINE);
       this.regionsInTransition.put(encodedName, state);
     } else {
-      LOG.debug("Forcing OFFLINE; was=" + state);
-      state.update(RegionState.State.OFFLINE);
+      // If we are reassigning the node do not force in-memory state to OFFLINE.
+      // Based on the znode state we will decide if to change
+      // in-memory state to OFFLINE or not. It will
+      // be done before setting the znode to OFFLINE state.
+      if (!hijack) {
+        LOG.debug("Forcing OFFLINE; was=" + state);
+        state.update(RegionState.State.OFFLINE);
+      }
     }
     return state;
   }
@@ -1311,11 +1363,29 @@ public class AssignmentManager extends Z
    * @param state
    * @param setOfflineInZK
    * @param forceNewPlan
+   * @param hijack
    */
-  private void assign(final RegionState state, final boolean setOfflineInZK,
-      final boolean forceNewPlan) {
+  private void assign(final HRegionInfo region, final RegionState state,
+      final boolean setOfflineInZK, final boolean forceNewPlan,
+      boolean hijack) {
     for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
-      if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
+      int versionOfOfflineNode = -1;
+      if (setOfflineInZK) {
+        // get the version of the znode after setting it to OFFLINE.
+        // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
+        versionOfOfflineNode = setOfflineInZooKeeper(state,
+            hijack);
+        if(versionOfOfflineNode != -1){
+          if (isDisabledorDisablingRegionInRIT(region)) {
+            return;
+          }
+        }
+      }
+      
+      if (setOfflineInZK && versionOfOfflineNode == -1) {
+        return;
+      }
+      
       if (this.master.isStopped()) {
         LOG.debug("Server stopped; skipping assign of " + state);
         return;
@@ -1334,8 +1404,9 @@ public class AssignmentManager extends Z
         state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
             plan.getDestination());
         // Send OPEN RPC. This can fail if the server on other end is is not up.
+        // Pass the version that was obtained while setting the node to OFFLINE.
         RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
-            .getDestination(), state.getRegion());
+            .getDestination(), state.getRegion(), versionOfOfflineNode);
         if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
           // Remove region from in-memory transition and unassigned node from ZK
           // While trying to enable the table the regions of the table were
@@ -1389,31 +1460,69 @@ public class AssignmentManager extends Z
     }
   }
 
+  private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
+    String tableName = region.getTableNameAsString();
+    boolean disabled = this.zkTable.isDisabledTable(tableName);
+    if (disabled || this.zkTable.isDisablingTable(tableName)) {
+      LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
+        " skipping assign of " + region.getRegionNameAsString());
+      offlineDisabledRegion(region);
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Set region as OFFLINED up in zookeeper
+   * 
    * @param state
-   * @return True if we succeeded, false otherwise (State was incorrect or failed
-   * updating zk).
-   */
-  boolean setOfflineInZooKeeper(final RegionState state) {
-    if (!state.isClosed() && !state.isOffline()) {
+   * @param hijack
+   *          - true if needs to be hijacked and reassigned, false otherwise.
+   * @return the version of the offline node if setting of the OFFLINE node was
+   *         successful, -1 otherwise.
+   */
+  int setOfflineInZooKeeper(final RegionState state,
+      boolean hijack) {
+    // In case of reassignment the current state in memory need not be
+    // OFFLINE. 
+    if (!hijack && !state.isClosed() && !state.isOffline()) {
       this.master.abort("Unexpected state trying to OFFLINE; " + state,
-        new IllegalStateException());
-      return false;
+          new IllegalStateException());
+      return -1;
     }
-    state.update(RegionState.State.OFFLINE);
+    boolean allowZNodeCreation = false;
+    // Under reassignment if the current state is PENDING_OPEN
+    // or OPENING then refresh the in-memory state to PENDING_OPEN. This is
+    // important because if the region was in 
+    // RS_OPENING state for a long time the master will try to force the znode
+    // to OFFLINE state meanwhile the RS could have opened the corresponding
+    // region and the state in znode will be RS_ZK_REGION_OPENED.
+    // For all other cases we can change the in-memory state to OFFLINE.
+    if (hijack && 
+        (state.getState().equals(RegionState.State.PENDING_OPEN) || 
+            state.getState().equals(RegionState.State.OPENING))) {
+      state.update(RegionState.State.PENDING_OPEN);
+      allowZNodeCreation = false;
+    } else {
+      state.update(RegionState.State.OFFLINE);
+      allowZNodeCreation = true;
+    }
+    int versionOfOfflineNode = -1;
     try {
-      if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
-          state.getRegion(), this.master.getServerName())) {
-        LOG.warn("Attempted to create/force node into OFFLINE state before " +
-          "completing assignment but failed to do so for " + state);
-        return false;
+      // get the version after setting the znode to OFFLINE
+      versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), 
+          state.getRegion(), this.master.getServerName(),
+          hijack, allowZNodeCreation);
+      if (versionOfOfflineNode == -1) {
+        LOG.warn("Attempted to create/force node into OFFLINE state before "
+            + "completing assignment but failed to do so for " + state);
+        return -1;
       }
     } catch (KeeperException e) {
       master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
-      return false;
+      return -1;
     }
-    return true;
+    return versionOfOfflineNode;
   }
 
   /**
@@ -2279,134 +2388,119 @@ public class AssignmentManager extends Z
     protected void chore() {
       // If bulkAssign in progress, suspend checks
       if (this.bulkAssign) return;
-      List<HRegionInfo> unassigns = new ArrayList<HRegionInfo>();
-      Map<HRegionInfo, Boolean> assigns =
-        new HashMap<HRegionInfo, Boolean>();
       synchronized (regionsInTransition) {
         // Iterate all regions in transition checking for time outs
         long now = System.currentTimeMillis();
         for (RegionState regionState : regionsInTransition.values()) {
           if (regionState.getStamp() + timeout <= now) {
-            HRegionInfo regionInfo = regionState.getRegion();
-            LOG.info("Regions in transition timed out:  " + regionState);
-            // Expired!  Do a retry.
-            switch (regionState.getState()) {
-              case CLOSED:
-                LOG.info("Region " + regionInfo.getEncodedName() +
-                  " has been CLOSED for too long, waiting on queued " +
-                  "ClosedRegionHandler to run or server shutdown");
-                // Update our timestamp.
-                regionState.updateTimestampToNow();
-                break;
-              case OFFLINE:
-                LOG.info("Region has been OFFLINE for too long, " +
-                  "reassigning " + regionInfo.getRegionNameAsString() +
-                  " to a random server");
-                assigns.put(regionState.getRegion(), Boolean.FALSE);
-                break;
-              case PENDING_OPEN:
-                LOG.info("Region has been PENDING_OPEN for too " +
-                    "long, reassigning region=" +
-                    regionInfo.getRegionNameAsString());
-                assigns.put(regionState.getRegion(), Boolean.TRUE);
-                break;
-              case OPENING:
-                LOG.info("Region has been OPENING for too " +
-                  "long, reassigning region=" +
-                  regionInfo.getRegionNameAsString());
-                // Should have a ZK node in OPENING state
-                try {
-                  String node = ZKAssign.getNodeName(watcher,
-                      regionInfo.getEncodedName());
-                  Stat stat = new Stat();
-                  RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
-                      node, stat);
-                  if (data == null) {
-                    LOG.warn("Data is null, node " + node + " no longer exists");
-                    break;
-                  }
-                  if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
-                    LOG.debug("Region has transitioned to OPENED, allowing " +
-                        "watched event handlers to process");
-                    break;
-                  } else if (data.getEventType() !=
-                      EventType.RS_ZK_REGION_OPENING) {
-                    LOG.warn("While timing out a region in state OPENING, " +
-                        "found ZK node in unexpected state: " +
-                        data.getEventType());
-                    break;
-                  }
-                  // Attempt to transition node into OFFLINE
-                  try {
-                    data = new RegionTransitionData(
-                      EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
-                        master.getServerName());
-                    if (ZKUtil.setData(watcher, node, data.getBytes(),
-                        stat.getVersion())) {
-                      // Node is now OFFLINE, let's trigger another assignment
-                      ZKUtil.getDataAndWatch(watcher, node); // re-set the watch
-                      LOG.info("Successfully transitioned region=" +
-                          regionInfo.getRegionNameAsString() + " into OFFLINE" +
-                          " and forcing a new assignment");
-                      assigns.put(regionState.getRegion(), Boolean.TRUE);
-                    }
-                  } catch (KeeperException.NoNodeException nne) {
-                    // Node did not exist, can't time this out
-                  }
-                } catch (KeeperException ke) {
-                  LOG.error("Unexpected ZK exception timing out CLOSING region",
-                      ke);
-                  break;
-                }
-                break;
-              case OPEN:
-                LOG.error("Region has been OPEN for too long, " +
-                "we don't know where region was opened so can't do anything");
-                synchronized(regionState) {
-                  regionState.updateTimestampToNow();
-                }
-                break;
-
-              case PENDING_CLOSE:
-                LOG.info("Region has been PENDING_CLOSE for too " +
-                    "long, running forced unassign again on region=" +
-                    regionInfo.getRegionNameAsString());
-                  try {
-                    // If the server got the RPC, it will transition the node
-                    // to CLOSING, so only do something here if no node exists
-                    if (!ZKUtil.watchAndCheckExists(watcher,
-                      ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
-                      // Queue running of an unassign -- do actual unassign
-                      // outside of the regionsInTransition lock.
-                      unassigns.add(regionInfo);
-                    }
-                  } catch (NoNodeException e) {
-                    LOG.debug("Node no longer existed so not forcing another " +
-                      "unassignment");
-                  } catch (KeeperException e) {
-                    LOG.warn("Unexpected ZK exception timing out a region " +
-                      "close", e);
-                  }
-                  break;
-              case CLOSING:
-                LOG.info("Region has been CLOSING for too " +
-                  "long, this should eventually complete or the server will " +
-                  "expire, doing nothing");
-                break;
-            }
+           //decide on action upon timeout
+            actOnTimeOut(regionState);
           }
         }
       }
-      // Finish the work for regions in PENDING_CLOSE state
-      for (HRegionInfo hri: unassigns) {
-        unassign(hri, true);
+    }
+
+    private void actOnTimeOut(RegionState regionState) {
+      HRegionInfo regionInfo = regionState.getRegion();
+      LOG.info("Regions in transition timed out:  " + regionState);
+      // Expired! Do a retry.
+      switch (regionState.getState()) {
+      case CLOSED:
+        LOG.info("Region " + regionInfo.getEncodedName()
+            + " has been CLOSED for too long, waiting on queued "
+            + "ClosedRegionHandler to run or server shutdown");
+        // Update our timestamp.
+        regionState.updateTimestampToNow();
+        break;
+      case OFFLINE:
+        LOG.info("Region has been OFFLINE for too long, " + "reassigning "
+            + regionInfo.getRegionNameAsString() + " to a random server");
+        invokeAssign(regionInfo);
+        break;
+      case PENDING_OPEN:
+        LOG.info("Region has been PENDING_OPEN for too "
+            + "long, reassigning region=" + regionInfo.getRegionNameAsString());
+        invokeAssign(regionInfo);
+        break;
+      case OPENING:
+        processOpeningState(regionInfo);
+        break;
+      case OPEN:
+        LOG.error("Region has been OPEN for too long, " +
+            "we don't know where region was opened so can't do anything");
+        synchronized (regionState) {
+          regionState.updateTimestampToNow();
+        }
+        break;
+
+      case PENDING_CLOSE:
+        LOG.info("Region has been PENDING_CLOSE for too "
+            + "long, running forced unassign again on region="
+            + regionInfo.getRegionNameAsString());
+        try {
+          // If the server got the RPC, it will transition the node
+          // to CLOSING, so only do something here if no node exists
+          if (!ZKUtil.watchAndCheckExists(watcher, 
+              ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
+            // Queue running of an unassign -- do actual unassign
+            // outside of the regionsInTransition lock.
+            invokeUnassign(regionInfo);
+          }
+        } catch (NoNodeException e) {
+          LOG.debug("Node no longer existed so not forcing another "
+              + "unassignment");
+        } catch (KeeperException e) {
+          LOG.warn("Unexpected ZK exception timing out a region close", e);
+        }
+        break;
+      case CLOSING:
+        LOG.info("Region has been CLOSING for too " +
+          "long, this should eventually complete or the server will " +
+          "expire, doing nothing");
+        break;
       }
-      for (Map.Entry<HRegionInfo, Boolean> e: assigns.entrySet()){
-        assign(e.getKey(), false, e.getValue());
+    }
+  }
+  
+  private void processOpeningState(HRegionInfo regionInfo) {
+    LOG.info("Region has been OPENING for too " + "long, reassigning region="
+        + regionInfo.getRegionNameAsString());
+    // Should have a ZK node in OPENING state
+    try {
+      String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
+      Stat stat = new Stat();
+      RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node,
+          stat);
+      if (dataInZNode == null) {
+        LOG.warn("Data is null, node " + node + " no longer exists");
+        return;
       }
+      if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+        LOG.debug("Region has transitioned to OPENED, allowing "
+            + "watched event handlers to process");
+        return;
+      } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING) {
+        LOG.warn("While timing out a region in state OPENING, "
+            + "found ZK node in unexpected state: "
+            + dataInZNode.getEventType());
+        return;
+      }
+      invokeAssign(regionInfo);
+    } catch (KeeperException ke) {
+      LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
+      return;
     }
+    return;
   }
-
+  
+  private void invokeAssign(HRegionInfo regionInfo) {
+    threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
+  }
+  
+  private void invokeUnassign(HRegionInfo regionInfo) {
+    threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
+  }
+  
   /**
    * Process shutdown server removing any assignments.
    * @param sn Server that went down.
@@ -2697,4 +2791,12 @@ public class AssignmentManager extends Z
   public boolean isServerOnline(ServerName serverName) {
     return this.serverManager.isServerOnline(serverName);
   }
+  /**
+   * Shutdown the threadpool executor service
+   */
+  public void shutdown() {
+    if (null != threadPoolExecutorService) {
+      this.threadPoolExecutorService.shutdown();
+    }
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Sep  8 18:50:26
2011
@@ -188,7 +188,7 @@ implements HMasterInterface, HMasterRegi
   private final ServerName serverName;
 
   private TableDescriptors tableDescriptors;
-
+  
   /**
    * Initializes the HMaster. The steps are as follows:
    * <p>
@@ -1265,6 +1265,7 @@ implements HMasterInterface, HMasterRegi
         LOG.error("Error call master coprocessor preShutdown()", ioe);
       }
     }
+    this.assignmentManager.shutdown();
     this.serverManager.shutdownCluster();
     try {
       this.clusterStatusTracker.setClusterDown();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Thu Sep  8
18:50:26 2011
@@ -392,8 +392,11 @@ public class ServerManager {
    * <p>
    * @param server server to open a region
    * @param region region to open
+   * @param versionOfOfflineNode that needs to be present in the offline node
+   * when RS tries to change the state from OFFLINE to other states.
    */
-  public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region)
+  public RegionOpeningState sendRegionOpen(final ServerName server,
+      HRegionInfo region, int versionOfOfflineNode)
   throws IOException {
     HRegionInterface hri = getServerConnection(server);
     if (hri == null) {
@@ -401,7 +404,8 @@ public class ServerManager {
         " failed because no RPC connection found to this server");
       return RegionOpeningState.FAILED_OPENING;
     }
-    return hri.openRegion(region);
+    return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri
+        .openRegion(region, versionOfOfflineNode);
   }
 
   /**

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java?rev=1166857&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/UnAssignCallable.java Thu Sep
 8 18:50:26 2011
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+
+/**
+ * A callable object that invokes the corresponding action that needs to be
+ * taken for unassignment of a region in transition. Implementing as future
+ * callable we are able to act on the timeout asynchronously.
+ */
+public class UnAssignCallable implements Callable<Object> {
+  private AssignmentManager assignmentManager;
+
+  private HRegionInfo hri;
+
+  public UnAssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
+    this.assignmentManager = assignmentManager;
+    this.hri = hri;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    assignmentManager.unassign(hri);
+    return null;
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu
Sep  8 18:50:26 2011
@@ -2336,6 +2336,12 @@ public class HRegionServer implements HR
   @QosPriority(priority=HIGH_QOS)
   public RegionOpeningState openRegion(HRegionInfo region)
   throws IOException {
+    return openRegion(region, -1);
+  }
+  @Override
+  @QosPriority(priority = HIGH_QOS)
+  public RegionOpeningState openRegion(HRegionInfo region, int versionOfOfflineNode)
+      throws IOException {
     checkOpen();
     if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
       throw new RegionAlreadyInTransitionException("open", region.getEncodedName());
@@ -2350,12 +2356,16 @@ public class HRegionServer implements HR
       region.getRegionNameAsString());
     this.regionsInTransitionInRS.add(region.getEncodedNameAsBytes());
     HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
+    // Need to pass the expected version in the constructor.
     if (region.isRootRegion()) {
-      this.service.submit(new OpenRootHandler(this, this, region, htd));
-    } else if(region.isMetaRegion()) {
-      this.service.submit(new OpenMetaHandler(this, this, region, htd));
+      this.service.submit(new OpenRootHandler(this, this, region, htd,
+          versionOfOfflineNode));
+    } else if (region.isMetaRegion()) {
+      this.service.submit(new OpenMetaHandler(this, this, region, htd,
+          versionOfOfflineNode));
     } else {
-      this.service.submit(new OpenRegionHandler(this, this, region, htd));
+      this.service.submit(new OpenRegionHandler(this, this, region, htd,
+          versionOfOfflineNode));
     }
     return RegionOpeningState.OPENED;
   }
@@ -3104,7 +3114,4 @@ public class HRegionServer implements HR
     HLog wal = this.getWAL();
     return wal.rollWriter(true);
   }
-
-
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenMetaHandler.java
Thu Sep  8 18:50:26 2011
@@ -33,6 +33,12 @@ public class OpenMetaHandler extends Ope
   public OpenMetaHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
       final HTableDescriptor htd) {
-    super(server,rsServices, regionInfo, htd, EventType.M_RS_OPEN_META);
+    this(server, rsServices, regionInfo, htd, -1);
+  }
+  public OpenMetaHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo,
+      final HTableDescriptor htd, int versionOfOfflineNode) {
+    super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_META,
+        versionOfOfflineNode);
   }
 }
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
Thu Sep  8 18:50:26 2011
@@ -52,20 +52,30 @@ public class OpenRegionHandler extends E
   // the total open. We'll fail the open if someone hijacks our znode; we can
   // tell this has happened if version is not as expected.
   private volatile int version = -1;
+  //version of the offline node that was set by the master
+  private volatile int versionOfOfflineNode = -1;
 
   public OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
       HTableDescriptor htd) {
-    this (server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION);
+    this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION, -1);
+  }
+  public OpenRegionHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo,
+      HTableDescriptor htd, int versionOfOfflineNode) {
+    this(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_REGION,
+        versionOfOfflineNode);
   }
 
   protected OpenRegionHandler(final Server server,
       final RegionServerServices rsServices, final HRegionInfo regionInfo,
-      final HTableDescriptor htd, EventType eventType) {
+      final HTableDescriptor htd, EventType eventType,
+      final int versionOfOfflineNode) {
     super(server, eventType);
     this.rsServices = rsServices;
     this.regionInfo = regionInfo;
     this.htd = htd;
+    this.versionOfOfflineNode = versionOfOfflineNode;
   }
 
   public HRegionInfo getRegionInfo() {
@@ -86,7 +96,8 @@ public class OpenRegionHandler extends E
 
       // If fails, just return.  Someone stole the region from under us.
       // Calling transitionZookeeperOfflineToOpening initalizes this.version.
-      if (!transitionZookeeperOfflineToOpening(encodedName)) {
+      if (!transitionZookeeperOfflineToOpening(encodedName,
+          versionOfOfflineNode)) {
         LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
           encodedName);
         return;
@@ -325,15 +336,18 @@ public class OpenRegionHandler extends E
    * Transition ZK node from OFFLINE to OPENING.
    * @param encodedName Name of the znode file (Region encodedName is the znode
    * name).
+   * @param versionOfOfflineNode - version Of OfflineNode that needs to be compared
+   * before changing the node's state from OFFLINE 
    * @return True if successful transition.
    */
-  boolean transitionZookeeperOfflineToOpening(final String encodedName) {
+  boolean transitionZookeeperOfflineToOpening(final String encodedName,
+      int versionOfOfflineNode) {
     // TODO: should also handle transition from CLOSED?
     try {
       // Initialize the znode version.
-      this.version =
-        ZKAssign.transitionNodeOpening(server.getZooKeeper(),
-          regionInfo, server.getServerName());
+      this.version = ZKAssign.transitionNode(server.getZooKeeper(), regionInfo,
+          server.getServerName(), EventType.M_ZK_REGION_OFFLINE,
+          EventType.RS_ZK_REGION_OPENING, versionOfOfflineNode);
     } catch (KeeperException e) {
       LOG.error("Error transition from OFFLINE to OPENING for region=" +
         encodedName, e);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRootHandler.java
Thu Sep  8 18:50:26 2011
@@ -33,6 +33,12 @@ public class OpenRootHandler extends Ope
   public OpenRootHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
       final HTableDescriptor htd) {
-    super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT);
+    super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT, -1);
+  }
+  public OpenRootHandler(final Server server,
+      final RegionServerServices rsServices, HRegionInfo regionInfo,
+      final HTableDescriptor htd, int versionOfOfflineNode) {
+    super(server, rsServices, regionInfo, htd, EventType.M_RS_OPEN_ROOT,
+        versionOfOfflineNode);
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java?rev=1166857&r1=1166856&r2=1166857&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java Thu Sep  8 18:50:26
2011
@@ -203,7 +203,6 @@ public class ZKAssign {
     ZKUtil.setData(zkw, node, data.getBytes());
   }
 
-
   /**
    * Creates or force updates an unassigned node to the OFFLINE state for the
    * specified region.
@@ -219,36 +218,108 @@ public class ZKAssign {
    * @param zkw zk reference
    * @param region region to be created as offline
    * @param serverName server event originates from
+   * @return the version of the znode created in OFFLINE state, -1 if
+   *         unsuccessful.
    * @throws KeeperException if unexpected zookeeper exception
    * @throws KeeperException.NodeExistsException if node already exists
    */
-  public static boolean createOrForceNodeOffline(ZooKeeperWatcher zkw,
-      HRegionInfo region, ServerName serverName)
+  public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName) throws KeeperException {
+    return createOrForceNodeOffline(zkw, region, serverName, false, true);
+  }
+
+  /**
+   * Creates or force updates an unassigned node to the OFFLINE state for the
+   * specified region.
+   * <p>
+   * Attempts to create the node but if it exists will force it to transition to
+   * and OFFLINE state.
+   * <p>
+   * Sets a watcher on the unassigned region node if the method is successful.
+   * 
+   * <p>
+   * This method should be used when assigning a region.
+   * 
+   * @param zkw
+   *          zk reference
+   * @param region
+   *          region to be created as offline
+   * @param serverName
+   *          server event originates from
+   * @param hijack
+   *          - true if to be hijacked and reassigned, false otherwise
+   * @param allowCreation
+   *          - true if the node has to be created newly, false otherwise
+   * @throws KeeperException
+   *           if unexpected zookeeper exception
+   * @return the version of the znode created in OFFLINE state, -1 if
+   *         unsuccessful.
+   * @throws KeeperException.NodeExistsException
+   *           if node already exists
+   */
+  public static int createOrForceNodeOffline(ZooKeeperWatcher zkw,
+      HRegionInfo region, ServerName serverName,
+      boolean hijack, boolean allowCreation)
   throws KeeperException {
     LOG.debug(zkw.prefix("Creating (or updating) unassigned node for " +
       region.getEncodedName() + " with OFFLINE state"));
     RegionTransitionData data = new RegionTransitionData(
         EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
     String node = getNodeName(zkw, region.getEncodedName());
+    Stat stat = new Stat();
     zkw.sync(node);
     int version = ZKUtil.checkExists(zkw, node);
     if (version == -1) {
-      ZKUtil.createAndWatch(zkw, node, data.getBytes());
+      // While trying to transit a node to OFFLINE that was in previously in 
+      // OPENING state but before it could transit to OFFLINE state if RS had 
+      // opened the region then the Master deletes the assigned region znode. 
+      // In that case the znode will not exist. So we should not
+      // create the znode again which will lead to double assignment.
+      if (hijack && !allowCreation) {
+        return -1;
+      }
+      return ZKUtil.createAndWatch(zkw, node, data.getBytes());
     } else {
-      if (!ZKUtil.setData(zkw, node, data.getBytes(), version)) {
-        return false;
+      RegionTransitionData curDataInZNode = ZKAssign.getDataNoWatch(zkw, region
+          .getEncodedName(), stat);
+      // Do not move the node to OFFLINE if znode is in any of the following
+      // state.
+      // Because these are already executed states.
+      if (hijack && null != curDataInZNode) {
+        EventType eventType = curDataInZNode.getEventType();
+        if (eventType.equals(EventType.RS_ZK_REGION_CLOSING)
+            || eventType.equals(EventType.RS_ZK_REGION_CLOSED)
+            || eventType.equals(EventType.RS_ZK_REGION_OPENED)) {
+          return -1;
+        }
+      }
+
+      boolean setData = false;
+      try {
+        setData = ZKUtil.setData(zkw, node, data.getBytes(), version);
+        // Setdata throws KeeperException which aborts the Master. So we are
+        // catching it here.
+        // If just before setting the znode to OFFLINE if the RS has made any
+        // change to the
+        // znode state then we need to return -1.
+      } catch (KeeperException kpe) {
+        LOG.info("Version mismatch while setting the node to OFFLINE state.");
+        return -1;
+      }
+      if (!setData) {
+        return -1;
       } else {
         // We successfully forced to OFFLINE, reset watch and handle if
         // the state changed in between our set and the watch
         RegionTransitionData curData =
-            ZKAssign.getData(zkw, region.getEncodedName());
+          ZKAssign.getData(zkw, region.getEncodedName());
         if (curData.getEventType() != data.getEventType()) {
           // state changed, need to process
-          return false;
+          return -1;
         }
       }
     }
-    return true;
+    return stat.getVersion() + 1;
   }
 
   /**
@@ -673,6 +744,18 @@ public class ZKAssign {
         "the node existed but was version " + stat.getVersion() +
         " not the expected version " + expectedVersion));
         return -1;
+    } else if (beginState.equals(EventType.M_ZK_REGION_OFFLINE)
+        && endState.equals(EventType.RS_ZK_REGION_OPENING)
+        && expectedVersion == -1 && stat.getVersion() != 0) {
+      // the below check ensures that double assignment doesnot happen.
+      // When the node is created for the first time then the expected version
+      // that is passed will be -1 and the version in znode will be 0.
+      // In all other cases the version in znode will be > 0.
+      LOG.warn(zkw.prefix("Attempt to transition the " + "unassigned node for "
+          + encoded + " from " + beginState + " to " + endState + " failed, "
+          + "the node existed but was version " + stat.getVersion()
+          + " not the expected version " + expectedVersion));
+      return -1;
     }
 
     // Verify it is in expected state



Mime
View raw message