hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-10915 Decouple region closing (HM and HRS) from ZK (Mikhail Antonov)
Date Wed, 04 Jun 2014 17:42:30 GMT
Repository: hbase
Updated Branches:
  refs/heads/master d20feaf1e -> ec9c12edf


HBASE-10915 Decouple region closing (HM and HRS) from ZK (Mikhail Antonov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec9c12ed
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec9c12ed
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec9c12ed

Branch: refs/heads/master
Commit: ec9c12edff8d2868a4ebb6bc76538abfb0d105d8
Parents: d20feaf
Author: Michael Stack <stack@duboce.net>
Authored: Wed Jun 4 10:42:10 2014 -0700
Committer: Michael Stack <stack@duboce.net>
Committed: Wed Jun 4 10:42:10 2014 -0700

----------------------------------------------------------------------
 .../BaseCoordinatedStateManager.java            |   8 +-
 .../coordination/CloseRegionCoordination.java   |  69 +++++++
 .../coordination/ZkCloseRegionCoordination.java | 197 +++++++++++++++++++
 .../coordination/ZkCoordinatedStateManager.java |   7 +
 .../hbase/regionserver/HRegionServer.java       |  28 +--
 .../hbase/regionserver/RSRpcServices.java       |  13 +-
 .../regionserver/handler/CloseMetaHandler.java  |  13 +-
 .../handler/CloseRegionHandler.java             | 104 +++-------
 .../TestSplitTransactionOnCluster.java          |   3 +-
 .../handler/TestCloseRegionHandler.java         |  46 ++++-
 10 files changed, 370 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index 63697ee..5e4e53e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -52,8 +52,14 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
   @Override
   public abstract TableStateManager getTableStateManager() throws InterruptedException,
     CoordinatedStateException;
+
   /**
-   * Method to retrieve coordination for split transaction
+   * Method to retrieve coordination for split transaction.
    */
   abstract public SplitTransactionCoordination getSplitTransactionCoordination();
+
+  /**
+   * Method to retrieve coordination for closing region operations.
+   */
+  public abstract CloseRegionCoordination getCloseRegionCoordination();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/CloseRegionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/CloseRegionCoordination.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/CloseRegionCoordination.java
new file mode 100644
index 0000000..503e4fc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/CloseRegionCoordination.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Coordinated operations for close region handlers.
+ */
+@InterfaceAudience.Private
+public interface CloseRegionCoordination {
+
+  /**
+   * Called before actual region closing to check that we can do close operation
+   * on this region.
+   * @param regionInfo region being closed
+   * @param crd details about closing operation
+   * @return true if caller shall proceed and close, false if need to abort closing.
+   */
+  boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd);
+
+  /**
+   * Called after region is closed to notify all interesting parties / "register"
+   * region as finally closed.
+   * @param region region being closed
+   * @param sn ServerName on which task runs
+   * @param crd details about closing operation
+   */
+  void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd);
+
+  /**
+   * Construct CloseRegionDetails instance from CloseRegionRequest.
+   * @return instance of CloseRegionDetails
+   */
+  CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request);
+
+  /**
+   * Get details object with params for case when we're closing on
+   * regionserver side internally (not because of RPC call from master),
+   * so we don't parse details from protobuf request.
+   */
+  CloseRegionDetails getDetaultDetails();
+
+  /**
+   * Marker interface for region closing tasks. Used to carry implementation details in
+   * encapsulated way through Handlers to the consensus API.
+   */
+  static interface CloseRegionDetails {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java
new file mode 100644
index 0000000..77ed84d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCloseRegionCoordination.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+
+/**
+ * ZK-based implementation of {@link CloseRegionCoordination}.
+ */
+@InterfaceAudience.Private
+public class ZkCloseRegionCoordination implements CloseRegionCoordination {
+  private static final Log LOG = LogFactory.getLog(ZkCloseRegionCoordination.class);
+
+  private final static int FAILED_VERSION = -1;
+
+  private CoordinatedStateManager csm;
+  private final ZooKeeperWatcher watcher;
+
+  public ZkCloseRegionCoordination(CoordinatedStateManager csm, ZooKeeperWatcher watcher)
{
+    this.csm = csm;
+    this.watcher = watcher;
+  }
+
+  /**
+   * In ZK-based version we're checking for bad znode state, e.g. if we're
+   * trying to delete the znode, and it's not ours (version doesn't match).
+   */
+  @Override
+  public boolean checkClosingState(HRegionInfo regionInfo, CloseRegionDetails crd) {
+    ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd;
+
+    try {
+      return zkCrd.isPublishStatusInZk() && !ZKAssign.checkClosingState(watcher,
+        regionInfo, ((ZkCloseRegionDetails) crd).getExpectedVersion());
+    } catch (KeeperException ke) {
+       csm.getServer().abort("Unrecoverable exception while checking state with zk " +
+          regionInfo.getRegionNameAsString() + ", still finishing close", ke);
+        throw new RuntimeException(ke);
+    }
+  }
+
+  /**
+   * In ZK-based version we do some znodes transitioning.
+   */
+  @Override
+  public void setClosedState(HRegion region, ServerName sn, CloseRegionDetails crd) {
+    ZkCloseRegionDetails zkCrd = (ZkCloseRegionDetails) crd;
+    String name = region.getRegionInfo().getRegionNameAsString();
+
+    if (zkCrd.isPublishStatusInZk()) {
+      if (setClosedState(region,sn, zkCrd)) {
+        LOG.debug("Set closed state in zk for " + name + " on " + sn);
+      } else {
+        LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " + sn);
+      }
+    }
+  }
+
+  /**
+   * Parse ZK-related fields from request.
+   */
+  @Override
+  public CloseRegionDetails parseFromProtoRequest(AdminProtos.CloseRegionRequest request)
{
+    ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
+      new ZkCloseRegionCoordination.ZkCloseRegionDetails();
+    zkCrd.setPublishStatusInZk(request.getTransitionInZK());
+    int versionOfClosingNode = -1;
+    if (request.hasVersionOfClosingNode()) {
+      versionOfClosingNode = request.getVersionOfClosingNode();
+    }
+    zkCrd.setExpectedVersion(versionOfClosingNode);
+
+    return zkCrd;
+  }
+
+  /**
+   * No ZK tracking will be performed for that case.
+   * This method should be used when we want to construct CloseRegionDetails,
+   * but don't want any coordination on that (when it's initiated by regionserver),
+   * so no znode state transitions will be performed.
+   */
+  @Override
+  public CloseRegionDetails getDetaultDetails() {
+    ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
+      new ZkCloseRegionCoordination.ZkCloseRegionDetails();
+    zkCrd.setPublishStatusInZk(false);
+    zkCrd.setExpectedVersion(FAILED_VERSION);
+
+    return zkCrd;
+  }
+
+  /**
+   * Transition ZK node to CLOSED
+   * @param region HRegion instance being closed
+   * @param sn ServerName on which task runs
+   * @param zkCrd  details about region closing operation.
+   * @return If the state is set successfully
+   */
+  private boolean setClosedState(final HRegion region,
+                                 ServerName sn,
+                                 ZkCloseRegionDetails zkCrd) {
+    final int expectedVersion = zkCrd.getExpectedVersion();
+
+    try {
+      if (ZKAssign.transitionNodeClosed(watcher, region.getRegionInfo(),
+        sn, expectedVersion) == FAILED_VERSION) {
+        LOG.warn("Completed the CLOSE of a region but when transitioning from " +
+          " CLOSING to CLOSED got a version mismatch, someone else clashed " +
+          "so now unassigning");
+        region.close();
+        return false;
+      }
+    } catch (NullPointerException e) {
+      // I've seen NPE when table was deleted while close was running in unit tests.
+      LOG.warn("NPE during close -- catching and continuing...", e);
+      return false;
+    } catch (KeeperException e) {
+      LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
+      return false;
+    } catch (IOException e) {
+      LOG.error("Failed to close region after failing to transition", e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * ZK-based implementation. Has details about whether the state transition should be
+   * reflected in ZK, as well as expected version of znode.
+   */
+  public static class ZkCloseRegionDetails implements CloseRegionCoordination.CloseRegionDetails
{
+
+    /**
+     * True if we are to update zk about the region close; if the close
+     * was orchestrated by master, then update zk.  If the close is being run by
+     * the regionserver because its going down, don't update zk.
+     * */
+    private boolean publishStatusInZk;
+
+    /**
+     * The version of znode to compare when RS transitions the znode from
+     * CLOSING state.
+     */
+    private int expectedVersion = FAILED_VERSION;
+
+    public ZkCloseRegionDetails() {
+    }
+
+    public ZkCloseRegionDetails(boolean publishStatusInZk, int expectedVersion) {
+      this.publishStatusInZk = publishStatusInZk;
+      this.expectedVersion = expectedVersion;
+    }
+
+    public boolean isPublishStatusInZk() {
+      return publishStatusInZk;
+    }
+
+    public void setPublishStatusInZk(boolean publishStatusInZk) {
+      this.publishStatusInZk = publishStatusInZk;
+    }
+
+    public int getExpectedVersion() {
+      return expectedVersion;
+    }
+
+    public void setExpectedVersion(int expectedVersion) {
+      this.expectedVersion = expectedVersion;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 2ef2db9..dab0262 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -36,6 +36,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager
{
   protected Server server;
   protected ZooKeeperWatcher watcher;
   protected SplitTransactionCoordination splitTransactionCoordination;
+  protected CloseRegionCoordination closeRegionCoordination;
 
   @Override
   public void initialize(Server server) {
@@ -43,6 +44,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager
{
     this.watcher = server.getZooKeeper();
 
     splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
+    closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
   }
 
   @Override
@@ -64,4 +66,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager
{
   public SplitTransactionCoordination getSplitTransactionCoordination() {
     return splitTransactionCoordination;
   }
+
+  @Override
+  public CloseRegionCoordination getCloseRegionCoordination() {
+    return closeRegionCoordination;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ea67bf1..7c198ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -77,6 +77,8 @@ import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@@ -392,7 +394,7 @@ public class HRegionServer extends HasThread implements
 
   protected final RSRpcServices rpcServices;
 
-  protected CoordinatedStateManager csm;
+  protected BaseCoordinatedStateManager csm;
 
   /**
    * Starts a HRegionServer at the default location.
@@ -483,7 +485,7 @@ public class HRegionServer extends HasThread implements
       zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
         rpcServices.isa.getPort(), this, canCreateBaseZNode());
 
-      this.csm = csm;
+      this.csm = (BaseCoordinatedStateManager) csm;
       this.csm.initialize(this);
       this.csm.start();
 
@@ -2157,7 +2159,7 @@ public class HRegionServer extends HasThread implements
   }
 
   @Override
-  public CoordinatedStateManager getCoordinatedStateManager() {
+  public BaseCoordinatedStateManager getCoordinatedStateManager() {
     return csm;
   }
 
@@ -2320,7 +2322,9 @@ public class HRegionServer extends HasThread implements
    */
   private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
     try {
-      if (!closeRegion(region.getEncodedName(), abort, false, -1, null)) {
+      CloseRegionCoordination.CloseRegionDetails details =
+        csm.getCloseRegionCoordination().getDetaultDetails();
+      if (!closeRegion(region.getEncodedName(), abort, details, null)) {
         LOG.warn("Failed to close " + region.getRegionNameAsString() +
             " - ignoring and continuing");
       }
@@ -2345,17 +2349,13 @@ public class HRegionServer extends HasThread implements
    *
    * @param encodedName Region to close
    * @param abort True if we are aborting
-   * @param zk True if we are to update zk about the region close; if the close
-   * was orchestrated by master, then update zk.  If the close is being run by
-   * the regionserver because its going down, don't update zk.
-   * @param versionOfClosingNode the version of znode to compare when RS transitions the
znode from
-   *   CLOSING state.
+   * @param crd details about closing region coordination-coordinated task
    * @return True if closed a region.
    * @throws NotServingRegionException if the region is not online
    * @throws RegionAlreadyInTransitionException if the region is already closing
    */
   protected boolean closeRegion(String encodedName, final boolean abort,
-      final boolean zk, final int versionOfClosingNode, final ServerName sn)
+      CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
       throws NotServingRegionException, RegionAlreadyInTransitionException {
     //Check for permissions to close.
     HRegion actualRegion = this.getFromOnlineRegions(encodedName);
@@ -2379,7 +2379,7 @@ public class HRegionServer extends HasThread implements
         // We're going to try to do a standard close then.
         LOG.warn("The opening for region " + encodedName + " was done before we could cancel
it." +
             " Doing a standard close now");
-        return closeRegion(encodedName, abort, zk, versionOfClosingNode, sn);
+        return closeRegion(encodedName, abort, crd, sn);
       }
       // Let's get the region from the online region list again
       actualRegion = this.getFromOnlineRegions(encodedName);
@@ -2413,9 +2413,11 @@ public class HRegionServer extends HasThread implements
     CloseRegionHandler crh;
     final HRegionInfo hri = actualRegion.getRegionInfo();
     if (hri.isMetaRegion()) {
-      crh = new CloseMetaHandler(this, this, hri, abort, zk, versionOfClosingNode);
+      crh = new CloseMetaHandler(this, this, hri, abort,
+        csm.getCloseRegionCoordination(), crd);
     } else {
-      crh = new CloseRegionHandler(this, this, hri, abort, zk, versionOfClosingNode, sn);
+      crh = new CloseRegionHandler(this, this, hri, abort,
+        csm.getCloseRegionCoordination(), crd, sn);
     }
     this.service.submit(crh);
     return true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 2e7b022..7e24610 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
@@ -883,11 +884,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   @QosPriority(priority=HConstants.HIGH_QOS)
   public CloseRegionResponse closeRegion(final RpcController controller,
       final CloseRegionRequest request) throws ServiceException {
-    int versionOfClosingNode = -1;
-    if (request.hasVersionOfClosingNode()) {
-      versionOfClosingNode = request.getVersionOfClosingNode();
-    }
-    boolean zk = request.getTransitionInZK();
     final ServerName sn = (request.hasDestinationServer() ?
       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
 
@@ -911,10 +907,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
 
       requestCount.increment();
-      LOG.info("Close " + encodedRegionName + ", via zk=" + (zk ? "yes" : "no")
-        + ", znode version=" + versionOfClosingNode + ", on " + sn);
+      LOG.info("Close " + encodedRegionName + ", on " + sn);
+      CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
+        .getCloseRegionCoordination().parseFromProtoRequest(request);
 
-      boolean closed = regionServer.closeRegion(encodedRegionName, false, zk, versionOfClosingNode,
sn);
+      boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
       return builder.build();
     } catch (IOException ie) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
index fe69f28..df8ae23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseMetaHandler.java
@@ -23,24 +23,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 
 /**
  * Handles closing of the root region on a region server.
  */
 @InterfaceAudience.Private
 public class CloseMetaHandler extends CloseRegionHandler {
-  // Called when master tells us shutdown a region via close rpc
-  public CloseMetaHandler(final Server server,
-      final RegionServerServices rsServices, final HRegionInfo regionInfo) {
-    this(server, rsServices, regionInfo, false, true, -1);
-  }
 
   // Called when regionserver determines its to go down; not master orchestrated
   public CloseMetaHandler(final Server server,
       final RegionServerServices rsServices,
       final HRegionInfo regionInfo,
-      final boolean abort, final boolean zk, final int versionOfClosingNode) {
-    super(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
-      EventType.M_RS_CLOSE_META);
+      final boolean abort, CloseRegionCoordination closeRegionCoordination,
+      CloseRegionCoordination.CloseRegionDetails crd) {
+    super(server, rsServices, regionInfo, abort, closeRegionCoordination,
+      crd, EventType.M_RS_CLOSE_META);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
index 591728e..e7d3ef8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java
@@ -26,12 +26,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * Handles closing of a region on a region server.
@@ -45,29 +44,15 @@ public class CloseRegionHandler extends EventHandler {
   // have a running queue of user regions to close?
   private static final Log LOG = LogFactory.getLog(CloseRegionHandler.class);
 
-  private final int FAILED = -1;
-  int expectedVersion = FAILED;
-
   private final RegionServerServices rsServices;
-
   private final HRegionInfo regionInfo;
 
   // If true, the hosting server is aborting.  Region close process is different
   // when we are aborting.
   private final boolean abort;
-
-  // Update zk on closing transitions. Usually true.  Its false if cluster
-  // is going down.  In this case, its the rs that initiates the region
-  // close -- not the master process so state up in zk will unlikely be
-  // CLOSING.
-  private final boolean zk;
   private ServerName destination;
-
-  // This is executed after receiving an CLOSE RPC from the master.
-  public CloseRegionHandler(final Server server,
-      final RegionServerServices rsServices, HRegionInfo regionInfo) {
-    this(server, rsServices, regionInfo, false, true, -1, EventType.M_RS_CLOSE_REGION, null);
-  }
+  private CloseRegionCoordination closeRegionCoordination;
+  private CloseRegionCoordination.CloseRegionDetails closeRegionDetails;
 
   /**
    * This method used internally by the RegionServer to close out regions.
@@ -75,43 +60,48 @@ public class CloseRegionHandler extends EventHandler {
    * @param rsServices
    * @param regionInfo
    * @param abort If the regionserver is aborting.
-   * @param zk If the close should be noted out in zookeeper.
+   * @param closeRegionCoordination consensus for closing regions
+   * @param crd object carrying details about region close task.
    */
   public CloseRegionHandler(final Server server,
       final RegionServerServices rsServices,
-      final HRegionInfo regionInfo, final boolean abort, final boolean zk,
-      final int versionOfClosingNode) {
-    this(server, rsServices,  regionInfo, abort, zk, versionOfClosingNode,
+      final HRegionInfo regionInfo, final boolean abort,
+      CloseRegionCoordination closeRegionCoordination,
+      CloseRegionCoordination.CloseRegionDetails crd) {
+    this(server, rsServices,  regionInfo, abort, closeRegionCoordination, crd,
       EventType.M_RS_CLOSE_REGION, null);
   }
 
   public CloseRegionHandler(final Server server,
       final RegionServerServices rsServices,
-      final HRegionInfo regionInfo, final boolean abort, final boolean zk,
-      final int versionOfClosingNode, ServerName destination) {
-    this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode,
+      final HRegionInfo regionInfo, final boolean abort,
+      CloseRegionCoordination closeRegionCoordination,
+      CloseRegionCoordination.CloseRegionDetails crd,
+      ServerName destination) {
+    this(server, rsServices, regionInfo, abort, closeRegionCoordination, crd,
       EventType.M_RS_CLOSE_REGION, destination);
   }
 
   public CloseRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
-      boolean abort, final boolean zk, final int versionOfClosingNode,
-      EventType eventType) {
-    this(server, rsServices, regionInfo, abort, zk, versionOfClosingNode, eventType, null);
+      boolean abort, CloseRegionCoordination closeRegionCoordination,
+      CloseRegionCoordination.CloseRegionDetails crd, EventType eventType) {
+    this(server, rsServices, regionInfo, abort, closeRegionCoordination, crd, eventType,
null);
   }
 
     protected CloseRegionHandler(final Server server,
       final RegionServerServices rsServices, HRegionInfo regionInfo,
-      boolean abort, final boolean zk, final int versionOfClosingNode,
+      boolean abort, CloseRegionCoordination closeRegionCoordination,
+      CloseRegionCoordination.CloseRegionDetails crd,
       EventType eventType, ServerName destination) {
     super(server, eventType);
     this.server = server;
     this.rsServices = rsServices;
     this.regionInfo = regionInfo;
     this.abort = abort;
-    this.zk = zk;
-    this.expectedVersion = versionOfClosingNode;
     this.destination = destination;
+    this.closeRegionCoordination = closeRegionCoordination;
+    this.closeRegionDetails = crd;
   }
 
   public HRegionInfo getRegionInfo() {
@@ -128,18 +118,14 @@ public class CloseRegionHandler extends EventHandler {
       HRegion region = this.rsServices.getFromOnlineRegions(encodedRegionName);
       if (region == null) {
         LOG.warn("Received CLOSE for region " + name + " but currently not serving - ignoring");
-        if (zk){
-          LOG.error("The znode is not modified as we are not serving " + name);
-        }
         // TODO: do better than a simple warning
         return;
       }
 
       // Close the region
       try {
-        if (zk && !ZKAssign.checkClosingState(server.getZooKeeper(), regionInfo,
expectedVersion)){
-          // bad znode state
-          return; // We're node deleting the znode, but it's not ours...
+        if (closeRegionCoordination.checkClosingState(regionInfo, closeRegionDetails)) {
+          return;
         }
 
         // TODO: If we need to keep updating CLOSING stamp to prevent against
@@ -152,10 +138,6 @@ public class CloseRegionHandler extends EventHandler {
             regionInfo.getRegionNameAsString());
           return;
         }
-      } catch (KeeperException ke) {
-          server.abort("Unrecoverable exception while checking state with zk " +
-            regionInfo.getRegionNameAsString() + ", still finishing close", ke);
-        throw new RuntimeException(ke);
       } catch (IOException ioe) {
         // An IOException here indicates that we couldn't successfully flush the
         // memstore before closing. So, we need to abort the server and allow
@@ -166,15 +148,8 @@ public class CloseRegionHandler extends EventHandler {
       }
 
       this.rsServices.removeFromOnlineRegions(region, destination);
-
-      if (this.zk) {
-        if (setClosedState(this.expectedVersion, region)) {
-          LOG.debug("Set closed state in zk for " + name + " on " + this.server.getServerName());
-        } else {
-          LOG.debug("Set closed state in zk UNSUCCESSFUL for " + name + " on " +
-            this.server.getServerName());
-        }
-      }
+      closeRegionCoordination.setClosedState(region, this.server.getServerName(),
+        closeRegionDetails);
 
       // Done!  Region is closed on this RS
       LOG.debug("Closed " + region.getRegionNameAsString());
@@ -183,33 +158,4 @@ public class CloseRegionHandler extends EventHandler {
           remove(this.regionInfo.getEncodedNameAsBytes());
     }
   }
-
-  /**
-   * Transition ZK node to CLOSED
-   * @param expectedVersion
-   * @return If the state is set successfully
-   */
-  private boolean setClosedState(final int expectedVersion, final HRegion region) {
-    try {
-      if (ZKAssign.transitionNodeClosed(server.getZooKeeper(), regionInfo,
-          server.getServerName(), expectedVersion) == FAILED) {
-        LOG.warn("Completed the CLOSE of a region but when transitioning from " +
-            " CLOSING to CLOSED got a version mismatch, someone else clashed " +
-            "so now unassigning");
-        region.close();
-        return false;
-      }
-    } catch (NullPointerException e) {
-      // I've seen NPE when table was deleted while close was running in unit tests.
-      LOG.warn("NPE during close -- catching and continuing...", e);
-      return false;
-    } catch (KeeperException e) {
-      LOG.error("Failed transitioning node from CLOSING to CLOSED", e);
-      return false;
-    } catch (IOException e) {
-      LOG.error("Failed to close region after failing to transition", e);
-      return false;
-    }
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 44f56c1..f03c81d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
 import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -1095,7 +1096,7 @@ public class TestSplitTransactionOnCluster {
           this.server = server;
           this.watcher = server.getZooKeeper();
           splitTransactionCoordination = new MockedSplitTransactionCoordination(this, watcher,
region);
-
+          closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ec9c12ed/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
index a86ff09..6793ff5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
@@ -33,10 +33,12 @@ import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.coordination.ZkCloseRegionCoordination;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MockServer;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -110,8 +112,18 @@ public class TestCloseRegionHandler {
       rss.addToOnlineRegions(spy);
       // Assert the Server is NOT stopped before we call close region.
       assertFalse(server.isStopped());
-      CloseRegionHandler handler =
-          new CloseRegionHandler(server, rss, hri, false, false, -1);
+
+      ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
+      consensusProvider.initialize(server);
+      consensusProvider.start();
+
+      ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
+        new ZkCloseRegionCoordination.ZkCloseRegionDetails();
+      zkCrd.setPublishStatusInZk(false);
+      zkCrd.setExpectedVersion(-1);
+
+      CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
+            consensusProvider.getCloseRegionCoordination(), zkCrd);
       boolean throwable = false;
       try {
         handler.process();
@@ -153,9 +165,18 @@ public class TestCloseRegionHandler {
        // The CloseRegionHandler will validate the expected version
        // Given it is set to invalid versionOfClosingNode+1,
        // CloseRegionHandler should be M_ZK_REGION_CLOSING
-       CloseRegionHandler handler =
-         new CloseRegionHandler(server, rss, hri, false, true,
-         versionOfClosingNode+1);
+
+       ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
+       consensusProvider.initialize(server);
+       consensusProvider.start();
+
+       ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
+         new ZkCloseRegionCoordination.ZkCloseRegionDetails();
+       zkCrd.setPublishStatusInZk(true);
+       zkCrd.setExpectedVersion(versionOfClosingNode+1);
+
+       CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
+         consensusProvider.getCloseRegionCoordination(), zkCrd);
        handler.process();
    
        // Handler should remain in M_ZK_REGION_CLOSING
@@ -190,9 +211,18 @@ public class TestCloseRegionHandler {
        // The CloseRegionHandler will validate the expected version
        // Given it is set to correct versionOfClosingNode,
        // CloseRegionHandlerit should be RS_ZK_REGION_CLOSED
-       CloseRegionHandler handler =
-         new CloseRegionHandler(server, rss, hri, false, true,
-         versionOfClosingNode);
+
+       ZkCoordinatedStateManager consensusProvider = new ZkCoordinatedStateManager();
+       consensusProvider.initialize(server);
+       consensusProvider.start();
+
+       ZkCloseRegionCoordination.ZkCloseRegionDetails zkCrd =
+         new ZkCloseRegionCoordination.ZkCloseRegionDetails();
+       zkCrd.setPublishStatusInZk(true);
+       zkCrd.setExpectedVersion(versionOfClosingNode);
+
+       CloseRegionHandler handler = new CloseRegionHandler(server, rss, hri, false,
+         consensusProvider.getCloseRegionCoordination(), zkCrd);
        handler.process();
        // Handler should have transitioned it to RS_ZK_REGION_CLOSED
        RegionTransition rt = RegionTransition.parseFrom(


Mime
View raw message