hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject git commit: HBASE-10985 Decouple Split Transaction from Zookeeper (Sergey Soldatov)
Date Mon, 02 Jun 2014 17:23:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 6fdf73717 -> 43be19794


HBASE-10985 Decouple Split Transaction from Zookeeper (Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/43be1979
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/43be1979
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/43be1979

Branch: refs/heads/master
Commit: 43be19794ab4d46d57e0609ad6631901740d94f9
Parents: 6fdf737
Author: Michael Stack <stack@duboce.net>
Authored: Mon Jun 2 10:22:28 2014 -0700
Committer: Michael Stack <stack@duboce.net>
Committed: Mon Jun 2 10:22:28 2014 -0700

----------------------------------------------------------------------
 .../src/main/resources/hbase-default.xml        |   2 +-
 .../hbase/CoordinatedStateManagerFactory.java   |   2 +-
 .../consensus/BaseCoordinatedStateManager.java  |  55 ----
 .../consensus/ZkCoordinatedStateManager.java    |  59 ----
 .../BaseCoordinatedStateManager.java            |  59 ++++
 .../SplitTransactionCoordination.java           | 101 ++++++
 .../ZKSplitTransactionCoordination.java         | 314 +++++++++++++++++++
 .../coordination/ZkCoordinatedStateManager.java |  67 ++++
 .../hadoop/hbase/master/AssignmentManager.java  |  22 +-
 .../hbase/regionserver/SplitTransaction.java    | 288 +++--------------
 .../apache/hadoop/hbase/TestDrainingServer.java |   2 +-
 .../hbase/master/TestAssignmentManager.java     |   2 +-
 .../TestEndToEndSplitTransaction.java           |   6 +-
 .../TestSplitTransactionOnCluster.java          |  67 ++--
 14 files changed, 648 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 3d4be0f..7ff4cc2 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1198,7 +1198,7 @@ possible configurations would overwhelm and obscure the important.
   </property>
   <property>
     <name>hbase.coordinated.state.manager.class</name>
-    <value>org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager</value>
+    <value>org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager</value>
     <description>Fully qualified name of class implementing coordinated state manager.</description>
   </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
index 77ef217..caf3621 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
deleted file mode 100644
index 7f4e510..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.consensus;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableStateManager;
-
-/**
- * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
- * Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager
- * reference returned from Server interface has to be casted to this type to
- * access those methods.
- */
-@InterfaceAudience.Private
-public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
-
-  @Override
-  public void initialize(Server server) {
-  }
-
-  @Override
-  public void start() {
-  }
-
-  @Override
-  public void stop() {
-  }
-
-  @Override
-  public Server getServer() {
-    return null;
-  }
-
-  @Override
-  public abstract TableStateManager getTableStateManager() throws InterruptedException,
-    CoordinatedStateException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
deleted file mode 100644
index 27e09ca..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.consensus;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.CoordinatedStateException;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableStateManager;
-import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
- */
-@InterfaceAudience.Private
-public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
-  private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class);
-  private Server server;
-  private ZooKeeperWatcher watcher;
-
-  @Override
-  public void initialize(Server server) {
-    this.server = server;
-    this.watcher = server.getZooKeeper();
-  }
-
-  @Override
-  public Server getServer() {
-    return server;
-  }
-
-  @Override
-  public TableStateManager getTableStateManager() throws InterruptedException,
-      CoordinatedStateException {
-    try {
-      return new ZKTableStateManager(server.getZooKeeper());
-    } catch (KeeperException e) {
-      throw new CoordinatedStateException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
new file mode 100644
index 0000000..63697ee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableStateManager;
+
+/**
+ * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
+ * Defines methods to retrieve coordination objects for relevant areas. CoordinatedStateManager
+ * reference returned from Server interface has to be casted to this type to
+ * access those methods.
+ */
+@InterfaceAudience.Private
+public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
+
+  @Override
+  public void initialize(Server server) {
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public Server getServer() {
+    return null;
+  }
+
+  @Override
+  public abstract TableStateManager getTableStateManager() throws InterruptedException,
+    CoordinatedStateException;
+  /**
+   * Method to retrieve coordination for split transaction
+   */
+  abstract public SplitTransactionCoordination getSplitTransactionCoordination();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java
new file mode 100644
index 0000000..659d4e5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitTransactionCoordination.java
@@ -0,0 +1,101 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitTransaction;
+
+/**
+ * Coordination operations for split transaction. The split operation should be coordinated at the
+ * following stages:
+ * 1. start - all preparation/initialization for split transaction should be done there.
+ * 2. waitForSplitTransaction  - the coordination should perform all logic related to split
+ *    transaction and wait till it's finished
+ * 3. completeSplitTransaction - all steps that are required to complete the transaction.
+ *    Called after PONR (point of no return)
+ */
+@InterfaceAudience.Private
+public interface SplitTransactionCoordination {
+
+  /**
+   * Dummy interface for split transaction details.
+   */
+  public static interface SplitTransactionDetails {
+  }
+
+  SplitTransactionDetails getDefaultDetails();
+
+
+  /**
+   * init coordination for split transaction
+   * @param parent region to be created as offline
+   * @param serverName server event originates from
+   * @param hri_a daughter region
+   * @param hri_b daughter region
+   * @throws IOException
+   */
+  void startSplitTransaction(HRegion parent, ServerName serverName,
+      HRegionInfo hri_a, HRegionInfo hri_b) throws IOException;
+
+  /**
+   * Wait while coordination process the transaction
+   * @param services Used to online/offline regions.
+   * @param parent region
+   * @param hri_a daughter region
+   * @param hri_b daughter region
+   * @param std split transaction details
+   * @throws IOException
+   */
+  void waitForSplitTransaction(final RegionServerServices services,
+      HRegion parent, HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails std)
+      throws IOException;
+
+  /**
+   * Finish off split transaction
+   * @param services Used to online/offline regions.
+   * @param first daughter region
+   * @param second daughter region
+   * @param std split transaction details
+   * @param parent
+   * @throws IOException If thrown, transaction failed. Call
+   *           {@link SplitTransaction#rollback(Server, RegionServerServices)}
+   */
+  void completeSplitTransaction(RegionServerServices services, HRegion first,
+      HRegion second, SplitTransactionDetails std, HRegion parent) throws IOException;
+
+  /**
+   * clean the split transaction
+   * @param hri node to delete
+   */
+  void clean(final HRegionInfo hri);
+
+  /**
+   * Required by AssignmentManager
+   */
+  int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
+      ServerName sn, SplitTransactionDetails std) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java
new file mode 100644
index 0000000..de9f51f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitTransactionCoordination.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.SplitTransaction;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZKSplitTransactionCoordination implements SplitTransactionCoordination {
+
+  private CoordinatedStateManager coordinationManager;
+  private final ZooKeeperWatcher watcher;
+
+  private static final Log LOG = LogFactory.getLog(ZKSplitTransactionCoordination.class);
+
+  public ZKSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
+      ZooKeeperWatcher watcher) {
+    this.coordinationManager = coordinationProvider;
+    this.watcher = watcher;
+  }
+
+  /**
+   * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. Create it
+   * ephemeral in case regionserver dies mid-split.
+   * <p>
+   * Does not transition nodes from other states. If a node already exists for this region, an
+   * Exception will be thrown.
+   * @param parent region to be created as offline
+   * @param serverName server event originates from
+   * @param hri_a daughter region
+   * @param hri_b daughter region
+   * @throws IOException
+   */
+
+  @Override
+  public void startSplitTransaction(HRegion parent, ServerName serverName, HRegionInfo hri_a,
+      HRegionInfo hri_b) throws IOException {
+
+    HRegionInfo region = parent.getRegionInfo();
+    try {
+
+      LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
+          + " in PENDING_SPLIT state"));
+      byte[] payload = HRegionInfo.toDelimitedByteArray(hri_a, hri_b);
+      RegionTransition rt =
+          RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_SPLIT,
+            region.getRegionName(), serverName, payload);
+      String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
+      if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
+        throw new IOException("Failed create of ephemeral " + node);
+      }
+
+    } catch (KeeperException e) {
+      throw new IOException("Failed creating PENDING_SPLIT znode on "
+          + parent.getRegionNameAsString(), e);
+    }
+
+  }
+
+  /**
+   * Transitions an existing ephemeral node for the specified region which is currently in the begin
+   * state to be in the end state. Master cleans up the final SPLIT znode when it reads it (or if we
+   * crash, zk will clean it up).
+   * <p>
+   * Does not transition nodes from other states. If for some reason the node could not be
+   * transitioned, the method returns -1. If the transition is successful, the version of the node
+   * after transition is returned.
+   * <p>
+   * This method can fail and return false for three different reasons:
+   * <ul>
+   * <li>Node for this region does not exist</li>
+   * <li>Node for this region is not in the begin state</li>
+   * <li>After verifying the begin state, update fails because of wrong version (this should never
+   * actually happen since an RS only does this transition following a transition to the begin
+   * state. If two RS are conflicting, one would fail the original transition to the begin state and
+   * not this transition)</li>
+   * </ul>
+   * <p>
+   * Does not set any watches.
+   * <p>
+   * This method should only be used by a RegionServer when splitting a region.
+   * @param parent region to be transitioned to opened
+   * @param a Daughter a of split
+   * @param b Daughter b of split
+   * @param serverName server event originates from
+   * @param std split transaction details
+   * @param beginState the expected current state the znode should be
+   * @param endState the state to be transition to
+   * @return version of node after transition, -1 if unsuccessful transition
+   * @throws IOException
+   */
+
+  private int transitionSplittingNode(HRegionInfo parent, HRegionInfo a, HRegionInfo b,
+      ServerName serverName, SplitTransactionDetails std, final EventType beginState,
+      final EventType endState) throws IOException {
+    ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
+    byte[] payload = HRegionInfo.toDelimitedByteArray(a, b);
+    try {
+      return ZKAssign.transitionNode(watcher, parent, serverName, beginState, endState,
+        zstd.getZnodeVersion(), payload);
+    } catch (KeeperException e) {
+      throw new IOException(
+          "Failed transition of splitting node " + parent.getRegionNameAsString(), e);
+    }
+  }
+
+  /**
+   * Wait for the splitting node to be transitioned from pending_split to splitting by master.
+   * That's how we are sure master has processed the event and is good with us to move on. If we
+   * don't get any update, we periodically transition the node so that master gets the callback. If
+   * the node is removed or is not in pending_split state any more, we abort the split.
+   */
+  @Override
+  public void waitForSplitTransaction(final RegionServerServices services, HRegion parent,
+      HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
+    ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
+
+    // After creating the split node, wait for master to transition it
+    // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
+    // knows about it and won't transition any region which is splitting.
+    try {
+      int spins = 0;
+      Stat stat = new Stat();
+      ServerName expectedServer = coordinationManager.getServer().getServerName();
+      String node = parent.getRegionInfo().getEncodedName();
+      while (!(coordinationManager.getServer().isStopped() || services.isStopping())) {
+        if (spins % 5 == 0) {
+          LOG.debug("Still waiting for master to process " + "the pending_split for " + node);
+          SplitTransactionDetails temp = getDefaultDetails();
+          transitionSplittingNode(parent.getRegionInfo(), hri_a, hri_b, expectedServer, temp,
+            RS_ZK_REQUEST_REGION_SPLIT, RS_ZK_REQUEST_REGION_SPLIT);
+        }
+        Thread.sleep(100);
+        spins++;
+        byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
+        if (data == null) {
+          throw new IOException("Data is null, splitting node " + node + " no longer exists");
+        }
+        RegionTransition rt = RegionTransition.parseFrom(data);
+        EventType et = rt.getEventType();
+        if (et == RS_ZK_REGION_SPLITTING) {
+          ServerName serverName = rt.getServerName();
+          if (!serverName.equals(expectedServer)) {
+            throw new IOException("Splitting node " + node + " is for " + serverName + ", not us "
+                + expectedServer);
+          }
+          byte[] payloadOfSplitting = rt.getPayload();
+          List<HRegionInfo> splittingRegions =
+              HRegionInfo.parseDelimitedFrom(payloadOfSplitting, 0, payloadOfSplitting.length);
+          assert splittingRegions.size() == 2;
+          HRegionInfo a = splittingRegions.get(0);
+          HRegionInfo b = splittingRegions.get(1);
+          if (!(hri_a.equals(a) && hri_b.equals(b))) {
+            throw new IOException("Splitting node " + node + " is for " + a + ", " + b
+                + ", not expected daughters: " + hri_a + ", " + hri_b);
+          }
+          // Master has processed it.
+          zstd.setZnodeVersion(stat.getVersion());
+          return;
+        }
+        if (et != RS_ZK_REQUEST_REGION_SPLIT) {
+          throw new IOException("Splitting node " + node + " moved out of splitting to " + et);
+        }
+      }
+      // Server is stopping/stopped
+      throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new IOException("Failed getting SPLITTING znode on " + parent.getRegionNameAsString(),
+          e);
+    }
+  }
+
+  /**
+   * Finish off split transaction, transition the zknode
+   * @param services Used to online/offline regions.
+   * @param a daughter region
+   * @param b daughter region
+   * @param std split transaction details
+   * @param parent
+   * @throws IOException If thrown, transaction failed. Call
+   *           {@link SplitTransaction#rollback(Server, RegionServerServices)}
+   */
+  @Override
+  public void completeSplitTransaction(final RegionServerServices services, HRegion a, HRegion b,
+      SplitTransactionDetails std, HRegion parent) throws IOException {
+    ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) std;
+    // Tell master about split by updating zk. If we fail, abort.
+    if (coordinationManager.getServer() != null) {
+      try {
+        zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
+          b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
+          RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT));
+
+        int spins = 0;
+        // Now wait for the master to process the split. We know it's done
+        // when the znode is deleted. The reason we keep tickling the znode is
+        // that it's possible for the master to miss an event.
+        do {
+          if (spins % 10 == 0) {
+            LOG.debug("Still waiting on the master to process the split for "
+                + parent.getRegionInfo().getEncodedName());
+          }
+          Thread.sleep(100);
+          // When this returns -1 it means the znode doesn't exist
+          zstd.setZnodeVersion(transitionSplittingNode(parent.getRegionInfo(), a.getRegionInfo(),
+            b.getRegionInfo(), coordinationManager.getServer().getServerName(), zstd,
+            RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT));
+          spins++;
+        } while (zstd.getZnodeVersion() != -1 && !coordinationManager.getServer().isStopped()
+            && !services.isStopping());
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new IOException("Failed telling master about split", e);
+      }
+    }
+
+    // Leaving here, the splitdir with its dross will be in place but since the
+    // split was successful, just leave it; it'll be cleaned when parent is
+    // deleted and cleaned up.
+  }
+
+  @Override
+  public void clean(final HRegionInfo hri) {
+    try {
+      // Only delete if its in expected state; could have been hijacked.
+      if (!ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(),
+        hri.getEncodedName(), RS_ZK_REQUEST_REGION_SPLIT, coordinationManager.getServer()
+            .getServerName())) {
+        ZKAssign.deleteNode(coordinationManager.getServer().getZooKeeper(), hri.getEncodedName(),
+          RS_ZK_REGION_SPLITTING, coordinationManager.getServer().getServerName());
+      }
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+    } catch (KeeperException e) {
+      coordinationManager.getServer().abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
+    }
+  }
+
+  /**
+   * ZK-based implementation. Has details about whether the state transition should be reflected in
+   * ZK, as well as expected version of znode.
+   */
+  public static class ZkSplitTransactionDetails implements
+      SplitTransactionCoordination.SplitTransactionDetails {
+    private int znodeVersion;
+
+    public ZkSplitTransactionDetails() {
+    }
+
+    /**
+     * @return znode current version
+     */
+    public int getZnodeVersion() {
+      return znodeVersion;
+    }
+
+    /**
+     * @param znodeVersion znode new version
+     */
+    public void setZnodeVersion(int znodeVersion) {
+      this.znodeVersion = znodeVersion;
+    }
+  }
+
+  @Override
+  public SplitTransactionDetails getDefaultDetails() {
+    ZkSplitTransactionDetails zstd = new ZkSplitTransactionDetails();
+    zstd.setZnodeVersion(-1);
+    return zstd;
+  }
+
+  @Override
+  public int processTransition(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b, ServerName sn,
+      SplitTransactionDetails std) throws IOException {
+    return transitionSplittingNode(p, hri_a, hri_b, sn, std, RS_ZK_REQUEST_REGION_SPLIT,
+      RS_ZK_REGION_SPLITTING);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
new file mode 100644
index 0000000..2ef2db9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coordination;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
+ */
+@InterfaceAudience.Private
+public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
+  private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class);
+  protected Server server;
+  protected ZooKeeperWatcher watcher;
+  protected SplitTransactionCoordination splitTransactionCoordination;
+
+  @Override
+  public void initialize(Server server) {
+    this.server = server;
+    this.watcher = server.getZooKeeper();
+
+    splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
+  }
+
+  @Override
+  public Server getServer() {
+    return server;
+  }
+
+  @Override
+  public TableStateManager getTableStateManager() throws InterruptedException,
+      CoordinatedStateException {
+    try {
+      return new ZKTableStateManager(server.getZooKeeper());
+    } catch (KeeperException e) {
+      throw new CoordinatedStateException(e);
+    }
+  }
+
+  @Override
+  public SplitTransactionCoordination getSplitTransactionCoordination() {
+    return splitTransactionCoordination;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4dcd3e9..1c4804d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.TableStateManager;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.regionserver.SplitTransaction;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.KeyLocker;
 import org.apache.hadoop.hbase.util.Pair;
@@ -3225,25 +3226,26 @@ public class AssignmentManager extends ZooKeeperListener {
     EventType et = rt.getEventType();
     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
       try {
-        if (SplitTransaction.transitionSplittingNode(watcher, p,
-            hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
-            EventType.RS_ZK_REGION_SPLITTING) == -1) {
+        SplitTransactionDetails std =
+            ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+                .getSplitTransactionCoordination().getDefaultDetails();
+        if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+            .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
           byte[] data = ZKAssign.getData(watcher, encodedName);
           EventType currentType = null;
           if (data != null) {
             RegionTransition newRt = RegionTransition.parseFrom(data);
             currentType = newRt.getEventType();
           }
-          if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
-              && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
-            LOG.warn("Failed to transition pending_split node "
-              + encodedName + " to splitting, it's now " + currentType);
+          if (currentType == null
+              || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
+            LOG.warn("Failed to transition pending_split node " + encodedName
+                + " to splitting, it's now " + currentType);
             return false;
           }
         }
       } catch (Exception e) {
-        LOG.warn("Failed to transition pending_split node "
-          + encodedName + " to splitting", e);
+        LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
index 3394ccd..db4dad9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
@@ -18,10 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -41,25 +37,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.RegionTransition;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaEditor;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -97,7 +88,7 @@ public class SplitTransaction {
   private HRegionInfo hri_a;
   private HRegionInfo hri_b;
   private long fileSplitTimeout = 30000;
-  private int znodeVersion = -1;
+  public SplitTransactionCoordination.SplitTransactionDetails std;
 
   /*
    * Row to split around
@@ -113,7 +104,7 @@ public class SplitTransaction {
     /**
      * Set region as in transition, set it into SPLITTING state.
      */
-    SET_SPLITTING_IN_ZK,
+    SET_SPLITTING,
     /**
      * We created the temporary split data directory.
      */
@@ -294,26 +285,24 @@ public class SplitTransaction {
     }
     return daughterRegions;
   }
-
   public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
       final RegionServerServices services, boolean testing) throws IOException {
-    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
-    // have zookeeper so don't do zk stuff if server or zookeeper is null
-    if (server != null && server.getZooKeeper() != null) {
-      try {
-        createNodeSplitting(server.getZooKeeper(),
-          parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
-      } catch (KeeperException e) {
-        throw new IOException("Failed creating PENDING_SPLIT znode on " +
-          this.parent.getRegionNameAsString(), e);
+
+    if (server != null && server.getCoordinatedStateManager() != null) {
+      if (std == null) {
+        std =
+            ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+                .getSplitTransactionCoordination().getDefaultDetails();
       }
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
+            hri_a, hri_b);
     }
-    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
-    if (server != null && server.getZooKeeper() != null) {
-      // After creating the split node, wait for master to transition it
-      // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
-      // knows about it and won't transition any region which is splitting.
-      znodeVersion = getZKNode(server, services);
+    this.journal.add(JournalEntry.SET_SPLITTING);
+    if (server != null && server.getCoordinatedStateManager() != null) {
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
+            hri_b, std);
     }
 
     this.parent.getRegionFileSystem().createSplitsDir();
@@ -369,8 +358,7 @@ public class SplitTransaction {
 
   /**
    * Perform time consuming opening of the daughter regions.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
+   * @param server Hosting server instance.  Can be null when testing
    * @param services Used to online/offline regions.
    * @param a first daughter region
    * @param a second daughter region
@@ -425,136 +413,8 @@ public class SplitTransaction {
   }
 
   /**
-   * Finish off split transaction, transition the zknode
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @param a first daughter region
-   * @param a second daughter region
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
-   */
-  /* package */void transitionZKNode(final Server server,
-      final RegionServerServices services, HRegion a, HRegion b)
-      throws IOException {
-    // Tell master about split by updating zk.  If we fail, abort.
-    if (server != null && server.getZooKeeper() != null) {
-      try {
-        this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
-          parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
-          server.getServerName(), this.znodeVersion,
-          RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
-
-        int spins = 0;
-        // Now wait for the master to process the split. We know it's done
-        // when the znode is deleted. The reason we keep tickling the znode is
-        // that it's possible for the master to miss an event.
-        do {
-          if (spins % 10 == 0) {
-            LOG.debug("Still waiting on the master to process the split for " +
-                this.parent.getRegionInfo().getEncodedName());
-          }
-          Thread.sleep(100);
-          // When this returns -1 it means the znode doesn't exist
-          this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
-            parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
-            server.getServerName(), this.znodeVersion,
-            RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
-          spins++;
-        } while (this.znodeVersion != -1 && !server.isStopped()
-            && !services.isStopping());
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        throw new IOException("Failed telling master about split", e);
-      }
-    }
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postSplit(a,b);
-    }
-
-    // Leaving here, the splitdir with its dross will be in place but since the
-    // split was successful, just leave it; it'll be cleaned when parent is
-    // deleted and cleaned up.
-  }
-
-  /**
-   * Wait for the splitting node to be transitioned from pending_split
-   * to splitting by master. That's how we are sure master has processed
-   * the event and is good with us to move on. If we don't get any update,
-   * we periodically transition the node so that master gets the callback.
-   * If the node is removed or is not in pending_split state any more,
-   * we abort the split.
-   */
-  private int getZKNode(final Server server,
-      final RegionServerServices services) throws IOException {
-    // Wait for the master to process the pending_split.
-    try {
-      int spins = 0;
-      Stat stat = new Stat();
-      ZooKeeperWatcher zkw = server.getZooKeeper();
-      ServerName expectedServer = server.getServerName();
-      String node = parent.getRegionInfo().getEncodedName();
-      while (!(server.isStopped() || services.isStopping())) {
-        if (spins % 5 == 0) {
-          LOG.debug("Still waiting for master to process "
-            + "the pending_split for " + node);
-          transitionSplittingNode(zkw, parent.getRegionInfo(),
-            hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
-            RS_ZK_REQUEST_REGION_SPLIT);
-        }
-        Thread.sleep(100);
-        spins++;
-        byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
-        if (data == null) {
-          throw new IOException("Data is null, splitting node "
-            + node + " no longer exists");
-        }
-        RegionTransition rt = RegionTransition.parseFrom(data);
-        EventType et = rt.getEventType();
-        if (et == RS_ZK_REGION_SPLITTING) {
-          ServerName serverName = rt.getServerName();
-          if (!serverName.equals(expectedServer)) {
-            throw new IOException("Splitting node " + node + " is for "
-              + serverName + ", not us " + expectedServer);
-          }
-          byte [] payloadOfSplitting = rt.getPayload();
-          List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
-            payloadOfSplitting, 0, payloadOfSplitting.length);
-          assert splittingRegions.size() == 2;
-          HRegionInfo a = splittingRegions.get(0);
-          HRegionInfo b = splittingRegions.get(1);
-          if (!(hri_a.equals(a) && hri_b.equals(b))) {
-            throw new IOException("Splitting node " + node + " is for " + a + ", "
-              + b + ", not expected daughters: " + hri_a + ", " + hri_b);
-          }
-          // Master has processed it.
-          return stat.getVersion();
-        }
-        if (et != RS_ZK_REQUEST_REGION_SPLIT) {
-          throw new IOException("Splitting node " + node
-            + " moved out of splitting to " + et);
-        }
-      }
-      // Server is stopping/stopped
-      throw new IOException("Server is "
-        + (services.isStopping() ? "stopping" : "stopped"));
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new IOException("Failed getting SPLITTING znode on "
-        + parent.getRegionNameAsString(), e);
-    }
-  }
-
-  /**
    * Run the transaction.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
+   * @param server Hosting server instance.  Can be null when testing
    * @param services Used to online/offline regions.
    * @throws IOException If thrown, transaction failed.
    *          Call {@link #rollback(Server, RegionServerServices)}
@@ -565,6 +425,11 @@ public class SplitTransaction {
   public PairOfSameType<HRegion> execute(final Server server,
       final RegionServerServices services)
   throws IOException {
+    if (server != null && server.getCoordinatedStateManager() != null) {
+      std =
+          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+              .getSplitTransactionCoordination().getDefaultDetails();
+    }
     PairOfSameType<HRegion> regions = createDaughters(server, services);
     if (this.parent.getCoprocessorHost() != null) {
       this.parent.getCoprocessorHost().preSplitAfterPONR();
@@ -576,7 +441,17 @@ public class SplitTransaction {
       final RegionServerServices services, PairOfSameType<HRegion> regions)
       throws IOException {
     openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
+    if (server != null && server.getCoordinatedStateManager() != null) {
+      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+          .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
+            regions.getSecond(), std, parent);
+    }
+    // Coprocessor callback
+    if (parent.getCoprocessorHost() != null) {
+      parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
+    }
+
+
     return regions;
   }
 
@@ -800,9 +675,10 @@ public class SplitTransaction {
       JournalEntry je = iterator.previous();
       switch(je) {
 
-      case SET_SPLITTING_IN_ZK:
-        if (server != null && server.getZooKeeper() != null) {
-          cleanZK(server, this.parent.getRegionInfo());
+      case SET_SPLITTING:
+        if (server != null && server instanceof HRegionServer) {
+          ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+              .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
         }
         break;
 
@@ -864,88 +740,4 @@ public class SplitTransaction {
     return hri_b;
   }
 
-  private static void cleanZK(final Server server, final HRegionInfo hri) {
-    try {
-      // Only delete if its in expected state; could have been hijacked.
-      if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
-          RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
-        ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
-          RS_ZK_REGION_SPLITTING, server.getServerName());
-      }
-    } catch (KeeperException.NoNodeException e) {
-      LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
-    } catch (KeeperException e) {
-      server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
-    }
-  }
-
-  /**
-   * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
-   * Create it ephemeral in case regionserver dies mid-split.
-   *
-   * <p>Does not transition nodes from other states.  If a node already exists
-   * for this region, a {@link NodeExistsException} will be thrown.
-   *
-   * @param zkw zk reference
-   * @param region region to be created as offline
-   * @param serverName server event originates from
-   * @throws KeeperException
-   * @throws IOException
-   */
-  public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
-      final ServerName serverName, final HRegionInfo a,
-      final HRegionInfo b) throws KeeperException, IOException {
-    LOG.debug(zkw.prefix("Creating ephemeral node for " +
-      region.getEncodedName() + " in PENDING_SPLIT state"));
-    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
-    RegionTransition rt = RegionTransition.createRegionTransition(
-      RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
-    String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
-    if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
-      throw new IOException("Failed create of ephemeral " + node);
-    }
-  }
-
-  /**
-   * Transitions an existing ephemeral node for the specified region which is
-   * currently in the begin state to be in the end state. Master cleans up the
-   * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
-   *
-   * <p>Does not transition nodes from other states. If for some reason the
-   * node could not be transitioned, the method returns -1. If the transition
-   * is successful, the version of the node after transition is returned.
-   *
-   * <p>This method can fail and return false for three different reasons:
-   * <ul><li>Node for this region does not exist</li>
-   * <li>Node for this region is not in the begin state</li>
-   * <li>After verifying the begin state, update fails because of wrong version
-   * (this should never actually happen since an RS only does this transition
-   * following a transition to the begin state. If two RS are conflicting, one would
-   * fail the original transition to the begin state and not this transition)</li>
-   * </ul>
-   *
-   * <p>Does not set any watches.
-   *
-   * <p>This method should only be used by a RegionServer when splitting a region.
-   *
-   * @param zkw zk reference
-   * @param parent region to be transitioned to opened
-   * @param a Daughter a of split
-   * @param b Daughter b of split
-   * @param serverName server event originates from
-   * @param znodeVersion expected version of data before modification
-   * @param beginState the expected current state the znode should be
-   * @param endState the state to be transition to
-   * @return version of node after transition, -1 if unsuccessful transition
-   * @throws KeeperException if unexpected zookeeper exception
-   * @throws IOException
-   */
-  public static int transitionSplittingNode(ZooKeeperWatcher zkw,
-      HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
-      final int znodeVersion, final EventType beginState,
-      final EventType endState) throws KeeperException, IOException {
-    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
-    return ZKAssign.transitionNode(zkw, parent, serverName,
-      beginState, endState, znodeVersion, payload);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
index b3bf1f0..777bdb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
-import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 5cf10f3..c7d3f1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.executor.ExecutorService;

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
index 82f357f..14a44fa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -139,8 +140,9 @@ public class TestEndToEndSplitTransaction {
     assertTrue(test(con, tableName, lastRow, server));
 
     // 4. phase III
-    split.transitionZKNode(server, server, regions.getFirst(),
-        regions.getSecond());
+    ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+        .getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(),
+          regions.getSecond(), split.std, region);
     assertTrue(test(con, tableName, firstRow, server));
     assertTrue(test(con, tableName, lastRow, server));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/43be1979/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 71bc6c6..44f56c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.ZKSplitTransactionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -190,6 +192,10 @@ public class TestSplitTransactionOnCluster {
       // find a splittable region
       final HRegion region = findSplittableRegion(regions);
       assertTrue("not able to find a splittable region", region != null);
+      MockedCoordinatedStateManager cp = new MockedCoordinatedStateManager();
+      cp.initialize(regionServer, region);
+      cp.start();
+      regionServer.csm = cp;
 
       new Thread() {
         @Override
@@ -1083,18 +1089,52 @@ public class TestSplitTransactionOnCluster {
       TESTING_UTIL.deleteTable(tableName);
     }
   }
+    public static class MockedCoordinatedStateManager extends ZkCoordinatedStateManager {
 
-  public static class MockedSplitTransaction extends SplitTransaction {
+        public void initialize(Server server, HRegion region) {
+          this.server = server;
+          this.watcher = server.getZooKeeper();
+          splitTransactionCoordination = new MockedSplitTransactionCoordination(this, watcher, region);
+
+        }
+      }
+
+      public static class MockedSplitTransaction extends SplitTransaction {
+
+        private HRegion currentRegion;
+        public MockedSplitTransaction(HRegion region, byte[] splitrow) {
+          super(region, splitrow);
+          this.currentRegion = region;
+        }
+        @Override
+        public boolean rollback(Server server, RegionServerServices services) throws IOException {
+          if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
+              .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
+            if(secondSplit){
+              super.rollback(server, services);
+              latch.countDown();
+              return true;
+            }
+          }
+          return super.rollback(server, services);
+        }
+
+
+      }
+
+  public static class MockedSplitTransactionCoordination extends ZKSplitTransactionCoordination {
 
     private HRegion currentRegion;
-    public MockedSplitTransaction(HRegion r, byte[] splitrow) {
-      super(r, splitrow);
-      this.currentRegion = r;
+
+    public MockedSplitTransactionCoordination(CoordinatedStateManager coordinationProvider,
+        ZooKeeperWatcher watcher, HRegion region) {
+      super(coordinationProvider, watcher);
+      currentRegion = region;
     }
 
     @Override
-    void transitionZKNode(Server server, RegionServerServices services, HRegion a, HRegion b)
-        throws IOException {
+    public void completeSplitTransaction(RegionServerServices services, HRegion a, HRegion b,
+        SplitTransactionDetails std, HRegion parent) throws IOException {
       if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
           .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
         try {
@@ -1106,25 +1146,12 @@ public class TestSplitTransactionOnCluster {
         }
 
       }
-      super.transitionZKNode(server, services, a, b);
+      super.completeSplitTransaction(services, a, b, std, parent);
       if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
           .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
         firstSplitCompleted = true;
       }
     }
-    @Override
-    public boolean rollback(Server server, RegionServerServices services) throws IOException {
-      if (this.currentRegion.getRegionInfo().getTable().getNameAsString()
-          .equals("testShouldFailSplitIfZNodeDoesNotExistDueToPrevRollBack")) {
-        if(secondSplit){
-          super.rollback(server, services);
-          latch.countDown();
-          return true;
-        }
-      }
-      return super.rollback(server, services);
-    }
-
   }
 
   private HRegion findSplittableRegion(final List<HRegion> regions) throws InterruptedException {


Mime
View raw message