hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/4] git commit: HBASE-11108 Split ZKTable into interface and implementation (Mikhail Antononv)
Date Thu, 22 May 2014 23:16:48 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 33f842855 -> ea0731d60


HBASE-11108 Split ZKTable into interface and implementation (Mikhail Antononv)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5d5a5d1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5d5a5d1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5d5a5d1

Branch: refs/heads/master
Commit: c5d5a5d1bc4ffab220fa9bf93aaa18fed7d6c8c8
Parents: 33f8428
Author: Michael Stack <stack@duboce.net>
Authored: Thu May 22 16:15:35 2014 -0700
Committer: Michael Stack <stack@duboce.net>
Committed: Thu May 22 16:15:35 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/CoordinatedStateException.java |  46 +++
 .../hadoop/hbase/CoordinatedStateManager.java   |  66 ++++
 .../apache/hadoop/hbase/TableStateManager.java  | 115 +++++++
 .../zookeeper/ZKTableStateClientSideReader.java | 168 ++++++++++
 .../hbase/zookeeper/ZKTableStateManager.java    | 330 +++++++++++++++++++
 .../hbase/CoordinatedStateManagerFactory.java   |  43 +++
 .../consensus/BaseCoordinatedStateManager.java  |  55 ++++
 .../consensus/ZkCoordinatedStateManager.java    |  59 ++++
 .../zookeeper/TestZKTableStateManager.java      | 114 +++++++
 9 files changed, 996 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java
new file mode 100644
index 0000000..a28a0c1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.HBaseException;
+
+/**
+ * Thrown by operations requiring coordination state access or manipulation
+ * when internal error within coordination engine (or other internal implementation) occurs.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class CoordinatedStateException extends HBaseException {
+  public CoordinatedStateException() {
+    super();
+  }
+
+  public CoordinatedStateException(final String message) {
+    super(message);
+  }
+
+  public CoordinatedStateException(final String message, final Throwable t) {
+    super(message, t);
+  }
+
+  public CoordinatedStateException(final Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
new file mode 100644
index 0000000..2642e29
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Implementations of this interface will keep and return to clients 
+ * implementations of classes providing API to execute
+ * coordinated operations. This interface is client-side, so it does NOT
+ * include methods to retrieve the particular interface implementations.
+ *
+ * For each coarse-grained area of operations there will be a separate
+ * interface with implementation, providing API for relevant operations
+ * requiring coordination.
+ *
+ * Property hbase.coordinated.state.manager.class in hbase-site.xml controls
+ * which provider to use.
+ */
+@InterfaceAudience.Private
+public interface CoordinatedStateManager {
+
+  /**
+   * Initialize coordinated state management service.
+   * @param server server instance to run within.
+   */
+  void initialize(Server server);
+
+  /**
+   * Starts service.
+   */
+  void start();
+
+  /**
+   * Stops service.
+   */
+  void stop();
+
+  /**
+   * @return instance of Server coordinated state manager runs within
+   */
+  Server getServer();
+
+  /**
+   * Returns implementation of TableStateManager.
+   * @throws InterruptedException if operation is interrupted
+   * @throws CoordinatedStateException if error happens in underlying coordination mechanism
+   */
+  TableStateManager getTableStateManager() throws InterruptedException,
+    CoordinatedStateException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java
new file mode 100644
index 0000000..56cd4ae
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java
@@ -0,0 +1,115 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+
+import java.io.InterruptedIOException;
+import java.util.Set;
+
+/**
+ * Helper class for table state management for operations running inside
+ * RegionServer or HMaster.
+ * Depending on implementation, fetches information from HBase system table,
+ * local data store, ZooKeeper ensemble or somewhere else.
+ * Code running on client side (with no coordinated state context) shall instead use
+ * {@link org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader}
+ */
+@InterfaceAudience.Private
+public interface TableStateManager {
+
+  /**
+   * Sets the table into desired state. Fails silently if the table is already in this state.
+   * @param tableName table to process
+   * @param state new state of this table
+   * @throws CoordinatedStateException if error happened when trying to set table state
+   */
+  void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
+    throws CoordinatedStateException;
+
+  /**
+   * Sets the specified table into the newState, but only if the table is already in
+   * one of the possibleCurrentStates (otherwise no operation is performed).
+   * @param tableName table to process
+   * @param newState new state for the table
+   * @param states table should be in one of these states for the operation
+   *                              to be performed
+   * @throws CoordinatedStateException if error happened while performing operation
+   * @return true if operation succeeded, false otherwise
+   */
+  boolean setTableStateIfInStates(TableName tableName, ZooKeeperProtos.Table.State newState,
+                                  ZooKeeperProtos.Table.State... states)
+    throws CoordinatedStateException;
+
+  /**
+   * Sets the specified table into the newState, but only if the table is NOT in
+   * one of the possibleCurrentStates (otherwise no operation is performed).
+   * @param tableName table to process
+   * @param newState new state for the table
+   * @param states table should NOT be in one of these states for the operation
+   *                              to be performed
+   * @throws CoordinatedStateException if error happened while performing operation
+   * @return true if operation succeeded, false otherwise
+   */
+  boolean setTableStateIfNotInStates(TableName tableName, ZooKeeperProtos.Table.State newState,
+                                     ZooKeeperProtos.Table.State... states)
+    throws CoordinatedStateException;
+
+  /**
+   * @return true if the table is in any one of the listed states, false otherwise.
+   */
+  boolean isTableState(TableName tableName, ZooKeeperProtos.Table.State... states);
+
+  /**
+   * Mark table as deleted.  Fails silently if the table is not currently marked as disabled.
+   * @param tableName table to be deleted
+   * @throws CoordinatedStateException if error happened while performing operation
+   */
+  void setDeletedTable(TableName tableName) throws CoordinatedStateException;
+
+  /**
+   * Checks if table is present.
+   *
+   * @param tableName table we're checking
+   * @return true if the table is present, false otherwise
+   */
+  boolean isTablePresent(TableName tableName);
+
+  /**
+   * @return set of tables which are in any one of the listed states, empty Set if none
+   */
+  Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
+    throws InterruptedIOException, CoordinatedStateException;
+
+  /**
+   * If the table is found in the given state the in-memory state is removed. This
+   * helps in cases where CreateTable is to be retried by the client in case of
+   * failures.  If deletePermanentState is true - the flag kept permanently is
+   * also reset.
+   *
+   * @param tableName table we're working on
+   * @param states if table isn't in any one of these states, operation aborts
+   * @param deletePermanentState if true, reset the permanent flag
+   * @throws CoordinatedStateException if error happened in underlying coordination engine
+   */
+  void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
+                            boolean deletePermanentState)
+    throws CoordinatedStateException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java
new file mode 100644
index 0000000..94bd31e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java
@@ -0,0 +1,168 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.zookeeper.KeeperException;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Non-instantiable class that provides helper functions to learn
+ * about HBase table state for code running on client side (hence, not having
+ * access to consensus context).
+ *
+ * Doesn't cache any table state, just goes directly to ZooKeeper.
+ * TODO: decouple this class from ZooKeeper.
+ */
+@InterfaceAudience.Private
+public class ZKTableStateClientSideReader {
+
+  private ZKTableStateClientSideReader() {}
+  
+  /**
+   * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLED}.
+   * This method does not use cache.
+   * This method is for clients other than AssignmentManager
+   * @param zkw ZooKeeperWatcher instance to use
+   * @param tableName table we're checking
+   * @return True if table is enabled.
+   * @throws KeeperException
+   */
+  public static boolean isDisabledTable(final ZooKeeperWatcher zkw,
+      final TableName tableName)
+      throws KeeperException, InterruptedException {
+    ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
+    return isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
+  }
+
+  /**
+   * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#ENABLED}.
+   * This method does not use cache.
+   * This method is for clients other than AssignmentManager
+   * @param zkw ZooKeeperWatcher instance to use
+   * @param tableName table we're checking
+   * @return True if table is enabled.
+   * @throws KeeperException
+   */
+  public static boolean isEnabledTable(final ZooKeeperWatcher zkw,
+      final TableName tableName)
+      throws KeeperException, InterruptedException {
+    return getTableState(zkw, tableName) == ZooKeeperProtos.Table.State.ENABLED;
+  }
+
+  /**
+   * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLING}
+   * of {@code ZooKeeperProtos.Table.State#DISABLED}.
+   * This method does not use cache.
+   * This method is for clients other than AssignmentManager.
+   * @param zkw ZooKeeperWatcher instance to use
+   * @param tableName table we're checking
+   * @return True if table is enabled.
+   * @throws KeeperException
+   */
+  public static boolean isDisablingOrDisabledTable(final ZooKeeperWatcher zkw,
+      final TableName tableName)
+      throws KeeperException, InterruptedException {
+    ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
+    return isTableState(ZooKeeperProtos.Table.State.DISABLING, state) ||
+      isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
+  }
+
+  /**
+   * Gets a list of all the tables set as disabled in zookeeper.
+   * @return Set of disabled tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<TableName> getDisabledTables(ZooKeeperWatcher zkw)
+      throws KeeperException, InterruptedException {
+    Set<TableName> disabledTables = new HashSet<TableName>();
+    List<String> children =
+      ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
+    for (String child: children) {
+      TableName tableName =
+          TableName.valueOf(child);
+      ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
+      if (state == ZooKeeperProtos.Table.State.DISABLED) disabledTables.add(tableName);
+    }
+    return disabledTables;
+  }
+
+  /**
+   * Gets a list of all the tables set as disabled in zookeeper.
+   * @return Set of disabled tables, empty Set if none
+   * @throws KeeperException
+   */
+  public static Set<TableName> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
+      throws KeeperException, InterruptedException {
+    Set<TableName> disabledTables = new HashSet<TableName>();
+    List<String> children =
+      ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
+    for (String child: children) {
+      TableName tableName =
+          TableName.valueOf(child);
+      ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
+      if (state == ZooKeeperProtos.Table.State.DISABLED ||
+          state == ZooKeeperProtos.Table.State.DISABLING)
+        disabledTables.add(tableName);
+    }
+    return disabledTables;
+  }
+
+  static boolean isTableState(final ZooKeeperProtos.Table.State expectedState,
+      final ZooKeeperProtos.Table.State currentState) {
+    return currentState != null && currentState.equals(expectedState);
+  }
+
+  /**
+   * @param zkw ZooKeeperWatcher instance to use
+   * @param tableName table we're checking
+   * @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
+   * @throws KeeperException
+   */
+  static ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
+      final TableName tableName)
+      throws KeeperException, InterruptedException {
+    String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
+    byte [] data = ZKUtil.getData(zkw, znode);
+    if (data == null || data.length <= 0) return null;
+    try {
+      ProtobufUtil.expectPBMagicPrefix(data);
+      ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
+      int magicLen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
+      return t.getState();
+    } catch (InvalidProtocolBufferException e) {
+      KeeperException ke = new KeeperException.DataInconsistencyException();
+      ke.initCause(e);
+      throw ke;
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java
new file mode 100644
index 0000000..1aff12f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java
@@ -0,0 +1,330 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Implementation of TableStateManager which reads, caches and sets state
+ * up in ZooKeeper.  If multiple read/write clients, will make for confusion.
+ * Code running on client side without consensus context should use
+ * {@link ZKTableStateClientSideReader} instead.
+ *
+ * <p>To save on trips to the zookeeper ensemble, internally we cache table
+ * state.
+ */
+@InterfaceAudience.Private
+public class ZKTableStateManager implements TableStateManager {
+  // A znode will exist under the table directory if it is in any of the
+  // following states: {@link TableState#ENABLING} , {@link TableState#DISABLING},
+  // or {@link TableState#DISABLED}.  If {@link TableState#ENABLED}, there will
+  // be no entry for a table in zk.  Thats how it currently works.
+
+  private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class);
+  private final ZooKeeperWatcher watcher;
+
+  /**
+   * Cache of what we found in zookeeper so we don't have to go to zk ensemble
+   * for every query.  Synchronize access rather than use concurrent Map because
+   * synchronization needs to span query of zk.
+   */
+  private final Map<TableName, ZooKeeperProtos.Table.State> cache =
+    new HashMap<TableName, ZooKeeperProtos.Table.State>();
+
+  public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException,
+      InterruptedException {
+    super();
+    this.watcher = zkw;
+    populateTableStates();
+  }
+
+  /**
+   * Gets a list of all the tables set as disabled in zookeeper.
+   * @throws KeeperException, InterruptedException
+   */
+  private void populateTableStates() throws KeeperException, InterruptedException {
+    synchronized (this.cache) {
+      List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
+      if (children == null) return;
+      for (String child: children) {
+        TableName tableName = TableName.valueOf(child);
+        ZooKeeperProtos.Table.State state = getTableState(this.watcher, tableName);
+        if (state != null) this.cache.put(tableName, state);
+      }
+    }
+  }
+
+  /**
+   * Sets table state in ZK. Sets no watches.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
+  throws CoordinatedStateException {
+    synchronized (this.cache) {
+      LOG.warn("Moving table " + tableName + " state from " + this.cache.get(tableName)
+        + " to " + state);
+      try {
+        setTableStateInZK(tableName, state);
+      } catch (KeeperException e) {
+        throw new CoordinatedStateException(e);
+      }
+    }
+  }
+
+  /**
+   * Checks and sets table state in ZK. Sets no watches.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean setTableStateIfInStates(TableName tableName,
+                                         ZooKeeperProtos.Table.State newState,
+                                         ZooKeeperProtos.Table.State... states)
+      throws CoordinatedStateException {
+    synchronized (this.cache) {
+      // Transition ENABLED->DISABLING has to be performed with a hack, because
+      // we treat empty state as enabled in this case because 0.92- clusters.
+      if (
+          (newState == ZooKeeperProtos.Table.State.DISABLING) &&
+               this.cache.get(tableName) != null && !isTableState(tableName, states)
||
+          (newState != ZooKeeperProtos.Table.State.DISABLING &&
+               !isTableState(tableName, states) )) {
+        return false;
+      }
+      try {
+        setTableStateInZK(tableName, newState);
+      } catch (KeeperException e) {
+        throw new CoordinatedStateException(e);
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Checks and sets table state in ZK. Sets no watches.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean setTableStateIfNotInStates(TableName tableName,
+                                            ZooKeeperProtos.Table.State newState,
+                                            ZooKeeperProtos.Table.State... states)
+    throws CoordinatedStateException {
+    synchronized (this.cache) {
+      if (isTableState(tableName, states)) {
+        return false;
+      }
+      try {
+        setTableStateInZK(tableName, newState);
+      } catch (KeeperException e) {
+        throw new CoordinatedStateException(e);
+      }
+      return true;
+    }
+  }
+
+  private void setTableStateInZK(final TableName tableName,
+                                 final ZooKeeperProtos.Table.State state)
+      throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString());
+    if (ZKUtil.checkExists(this.watcher, znode) == -1) {
+      ZKUtil.createAndFailSilent(this.watcher, znode);
+    }
+    synchronized (this.cache) {
+      ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
+      builder.setState(state);
+      byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+      ZKUtil.setData(this.watcher, znode, data);
+      this.cache.put(tableName, state);
+    }
+  }
+
+  /**
+   * Checks if table is marked in specified state in ZK.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isTableState(final TableName tableName,
+      final ZooKeeperProtos.Table.State... states) {
+    synchronized (this.cache) {
+      ZooKeeperProtos.Table.State currentState = this.cache.get(tableName);
+      return isTableInState(Arrays.asList(states), currentState);
+    }
+  }
+
+  /**
+   * Deletes the table in zookeeper.  Fails silently if the
+   * table is not currently disabled in zookeeper.  Sets no watches.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public void setDeletedTable(final TableName tableName)
+  throws CoordinatedStateException {
+    synchronized (this.cache) {
+      if (this.cache.remove(tableName) == null) {
+        LOG.warn("Moving table " + tableName + " state to deleted but was " +
+          "already deleted");
+      }
+      try {
+        ZKUtil.deleteNodeFailSilent(this.watcher,
+          ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
+      } catch (KeeperException e) {
+        throw new CoordinatedStateException(e);
+      }
+    }
+  }
+
+  /**
+   * check if table is present.
+   *
+   * @param tableName table we're working on
+   * @return true if the table is present
+   */
+  @Override
+  public boolean isTablePresent(final TableName tableName) {
+    synchronized (this.cache) {
+      ZooKeeperProtos.Table.State state = this.cache.get(tableName);
+      return !(state == null);
+    }
+  }
+
+  /**
+   * Gets a list of all the tables set as disabling in zookeeper.
+   * @return Set of disabling tables, empty Set if none
+   * @throws CoordinatedStateException if error happened in underlying coordination engine
+   */
+  @Override
+  public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
+    throws InterruptedIOException, CoordinatedStateException {
+    try {
+      return getAllTables(states);
+    } catch (KeeperException e) {
+      throw new CoordinatedStateException(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
+                                       boolean deletePermanentState)
+      throws CoordinatedStateException {
+    synchronized (this.cache) {
+      if (isTableState(tableName, states)) {
+        this.cache.remove(tableName);
+        if (deletePermanentState) {
+          try {
+            ZKUtil.deleteNodeFailSilent(this.watcher,
+                ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
+          } catch (KeeperException e) {
+            throw new CoordinatedStateException(e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Gets a list of all the tables of specified states in zookeeper.
+   * @return Set of tables of specified states, empty Set if none
+   * @throws KeeperException
+   */
+  Set<TableName> getAllTables(final ZooKeeperProtos.Table.State... states)
+      throws KeeperException, InterruptedIOException {
+
+    Set<TableName> allTables = new HashSet<TableName>();
+    List<String> children =
+      ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode);
+    if(children == null) return allTables;
+    for (String child: children) {
+      TableName tableName = TableName.valueOf(child);
+      ZooKeeperProtos.Table.State state;
+      try {
+        state = getTableState(watcher, tableName);
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException();
+      }
+      for (ZooKeeperProtos.Table.State expectedState: states) {
+        if (state == expectedState) {
+          allTables.add(tableName);
+          break;
+        }
+      }
+    }
+    return allTables;
+  }
+
+  /**
+   * Gets table state from ZK.
+   * @param zkw ZooKeeperWatcher instance to use
+   * @param tableName table we're checking
+   * @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
+   * @throws KeeperException
+   */
+  private ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
+                                                   final TableName tableName)
+    throws KeeperException, InterruptedException {
+    String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
+    byte [] data = ZKUtil.getData(zkw, znode);
+    if (data == null || data.length <= 0) return null;
+    try {
+      ProtobufUtil.expectPBMagicPrefix(data);
+      ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
+      int magicLen = ProtobufUtil.lengthOfPBMagic();
+      ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
+      return t.getState();
+    } catch (InvalidProtocolBufferException e) {
+      KeeperException ke = new KeeperException.DataInconsistencyException();
+      ke.initCause(e);
+      throw ke;
+    } catch (DeserializationException e) {
+      throw ZKUtil.convert(e);
+    }
+  }
+
+  /**
+   * @return true if current state isn't null and is contained
+   * in the list of expected states.
+   */
+  private boolean isTableInState(final List<ZooKeeperProtos.Table.State> expectedStates,
+                       final ZooKeeperProtos.Table.State currentState) {
+    return currentState != null && expectedStates.contains(currentState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
new file mode 100644
index 0000000..77ef217
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Creates instance of {@link CoordinatedStateManager}
+ * based on configuration.
+ */
+@InterfaceAudience.Private
+public class CoordinatedStateManagerFactory {
+
+  /**
+   * Creates consensus provider from the given configuration.
+   * @param conf Configuration
+   * @return Implementation of  {@link CoordinatedStateManager}
+   */
+  public static CoordinatedStateManager getCoordinatedStateManager(Configuration conf) {
+    Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass =
+      conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS,
+        ZkCoordinatedStateManager.class, CoordinatedStateManager.class);
+    return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
new file mode 100644
index 0000000..7f4e510
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableStateManager;
+
+/**
+ * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
+ * Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager
+ * reference returned from Server interface has to be casted to this type to
+ * access those methods.
+ */
+@InterfaceAudience.Private
+public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
+
+  @Override
+  public void initialize(Server server) {
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public Server getServer() {
+    return null;
+  }
+
+  @Override
+  public abstract TableStateManager getTableStateManager() throws InterruptedException,
+    CoordinatedStateException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
new file mode 100644
index 0000000..27e09ca
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.consensus;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
+ */
+@InterfaceAudience.Private
+public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
+  private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class);
+  private Server server;
+  private ZooKeeperWatcher watcher;
+
+  @Override
+  public void initialize(Server server) {
+    this.server = server;
+    this.watcher = server.getZooKeeper();
+  }
+
+  @Override
+  public Server getServer() {
+    return server;
+  }
+
+  @Override
+  public TableStateManager getTableStateManager() throws InterruptedException,
+      CoordinatedStateException {
+    try {
+      return new ZKTableStateManager(server.getZooKeeper());
+    } catch (KeeperException e) {
+      throw new CoordinatedStateException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java
new file mode 100644
index 0000000..f5210cc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableStateManager;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
+
+@Category(MediumTests.class)
+public class TestZKTableStateManager {
+  private static final Log LOG = LogFactory.getLog(TestZKTableStateManager.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Test
+  public void testTableStates()
+      throws CoordinatedStateException, IOException, KeeperException, InterruptedException
{
+    final TableName name =
+        TableName.valueOf("testDisabled");
+    Abortable abortable = new Abortable() {
+      @Override
+      public void abort(String why, Throwable e) {
+        LOG.info(why, e);
+      }
+
+      @Override
+      public boolean isAborted() {
+        return false;
+      }
+
+    };
+    ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+      name.getNameAsString(), abortable, true);
+    TableStateManager zkt = new ZKTableStateManager(zkw);
+    assertFalse(zkt.isTableState(name, Table.State.ENABLED));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED));
+    assertFalse(zkt.isTableState(name, Table.State.ENABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
+    assertFalse(zkt.isTablePresent(name));
+    zkt.setTableState(name, Table.State.DISABLING);
+    assertTrue(zkt.isTableState(name, Table.State.DISABLING));
+    assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
+    assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
+    assertTrue(zkt.isTablePresent(name));
+    zkt.setTableState(name, Table.State.DISABLED);
+    assertTrue(zkt.isTableState(name, Table.State.DISABLED));
+    assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLING));
+    assertTrue(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
+    assertTrue(zkt.isTablePresent(name));
+    zkt.setTableState(name, Table.State.ENABLING);
+    assertTrue(zkt.isTableState(name, Table.State.ENABLING));
+    assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED));
+    assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
+    assertTrue(zkt.isTablePresent(name));
+    zkt.setTableState(name, Table.State.ENABLED);
+    assertTrue(zkt.isTableState(name, Table.State.ENABLED));
+    assertFalse(zkt.isTableState(name, Table.State.ENABLING));
+    assertTrue(zkt.isTablePresent(name));
+    zkt.setDeletedTable(name);
+    assertFalse(zkt.isTableState(name, Table.State.ENABLED));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED));
+    assertFalse(zkt.isTableState(name, Table.State.ENABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
+    assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
+    assertFalse(zkt.isTablePresent(name));
+  }
+}


Mime
View raw message