hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [03/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper from hbase-server and hbase-client
Date Fri, 17 Nov 2017 21:27:39 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
new file mode 100644
index 0000000..2f2b036
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.UUID;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Publishes and synchronizes a unique identifier specific to a given HBase
+ * cluster.  The stored identifier is read from the file system by the active
+ * master on startup, and is subsequently available to all watchers (including
+ * clients).
+ */
+@InterfaceAudience.Private
+public class ZKClusterId {
+  private ZKWatcher watcher;
+  private Abortable abortable;
+  private String id;
+
+  public ZKClusterId(ZKWatcher watcher, Abortable abortable) {
+    this.watcher = watcher;
+    this.abortable = abortable;
+  }
+
+  public boolean hasId() {
+    return getId() != null;
+  }
+
+  public String getId() {
+    try {
+      if (id == null) {
+        id = readClusterIdZNode(watcher);
+      }
+    } catch (KeeperException ke) {
+      abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
+          ke);
+    }
+    return id;
+  }
+
+  public static String readClusterIdZNode(ZKWatcher watcher)
+  throws KeeperException {
+    if (ZKUtil.checkExists(watcher, watcher.znodePaths.clusterIdZNode) != -1) {
+      byte [] data;
+      try {
+        data = ZKUtil.getData(watcher, watcher.znodePaths.clusterIdZNode);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
+      }
+      if (data != null) {
+        try {
+          return ClusterId.parseFrom(data).toString();
+        } catch (DeserializationException e) {
+          throw ZKUtil.convert(e);
+        }
+      }
+    }
+    return null;
+  }
+
+  public static void setClusterId(ZKWatcher watcher, ClusterId id)
+      throws KeeperException {
+    ZKUtil.createSetData(watcher, watcher.znodePaths.clusterIdZNode, id.toByteArray());
+  }
+
+  /**
+   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+   * @param zkw watcher connected to an ensemble
+   * @return the UUID read from zookeeper
+   * @throws KeeperException
+   */
+  public static UUID getUUIDForCluster(ZKWatcher zkw) throws KeeperException {
+    String uuid = readClusterIdZNode(zkw);
+    return uuid == null ? null : UUID.fromString(uuid);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
new file mode 100644
index 0000000..edd2ccd
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles coordination of a single "leader" instance among many possible
+ * candidates.  The first {@link ZKLeaderManager} to successfully create
+ * the given znode becomes the leader, allowing the instance to continue
+ * with whatever processing must be protected.  Other {@link ZKLeaderManager}
+ * instances will wait to be notified of changes to the leader znode.
+ * If the current master instance fails, the ephemeral leader znode will
+ * be removed, and all waiting instances will be notified, with the race
+ * to claim the leader znode beginning all over again.
+ * @deprecated Not used
+ */
+@Deprecated
+@InterfaceAudience.Private
+public class ZKLeaderManager extends ZKListener {
+  private static final Log LOG = LogFactory.getLog(ZKLeaderManager.class);
+
+  private final AtomicBoolean leaderExists = new AtomicBoolean();
+  private String leaderZNode;
+  private byte[] nodeId;
+  private Stoppable candidate;
+
+  public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
+                         byte[] identifier, Stoppable candidate) {
+    super(watcher);
+    this.leaderZNode = leaderZNode;
+    this.nodeId = identifier;
+    this.candidate = candidate;
+  }
+
+  public void start() {
+    try {
+      watcher.registerListener(this);
+      String parent = ZKUtil.getParent(leaderZNode);
+      if (ZKUtil.checkExists(watcher, parent) < 0) {
+        ZKUtil.createWithParents(watcher, parent);
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("Unhandled zk exception when starting", ke);
+      candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
+    }
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (leaderZNode.equals(path) && !candidate.isStopped()) {
+      handleLeaderChange();
+    }
+  }
+
+  @Override
+  public void nodeDeleted(String path) {
+    if (leaderZNode.equals(path) && !candidate.isStopped()) {
+      handleLeaderChange();
+    }
+  }
+
+  private void handleLeaderChange() {
+    try {
+      synchronized(leaderExists) {
+        if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
+          LOG.info("Found new leader for znode: "+leaderZNode);
+          leaderExists.set(true);
+        } else {
+          LOG.info("Leader change, but no new leader found");
+          leaderExists.set(false);
+          leaderExists.notifyAll();
+        }
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("ZooKeeper error checking for leader znode", ke);
+      candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
+    }
+  }
+
+  /**
+   * Blocks until this instance has claimed the leader ZNode in ZooKeeper
+   */
+  public void waitToBecomeLeader() {
+    while (!candidate.isStopped()) {
+      try {
+        if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
+          // claimed the leader znode
+          leaderExists.set(true);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Claimed the leader znode as '"+
+                Bytes.toStringBinary(nodeId)+"'");
+          }
+          return;
+        }
+
+        // if claiming the node failed, there should be another existing node
+        byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
+        if (currentId != null && Bytes.equals(currentId, nodeId)) {
+          // claimed with our ID, but we didn't grab it, possibly restarted?
+          LOG.info("Found existing leader with our ID ("+
+              Bytes.toStringBinary(nodeId)+"), removing");
+          ZKUtil.deleteNode(watcher, leaderZNode);
+          leaderExists.set(false);
+        } else {
+          LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
+          leaderExists.set(true);
+        }
+      } catch (KeeperException ke) {
+        watcher.abort("Unexpected error from ZK, stopping candidate", ke);
+        candidate.stop("Unexpected error from ZK: "+ke.getMessage());
+        return;
+      }
+
+      // wait for next chance
+      synchronized(leaderExists) {
+        while (leaderExists.get() && !candidate.isStopped()) {
+          try {
+            leaderExists.wait();
+          } catch (InterruptedException ie) {
+            LOG.debug("Interrupted waiting on leader", ie);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the leader znode, if it is currently claimed by this instance.
+   */
+  public void stepDownAsLeader() {
+    try {
+      synchronized(leaderExists) {
+        if (!leaderExists.get()) {
+          return;
+        }
+        byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
+        if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
+          LOG.info("Stepping down as leader");
+          ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
+          leaderExists.set(false);
+        } else {
+          LOG.info("Not current leader, no need to step down");
+        }
+      }
+    } catch (KeeperException ke) {
+      watcher.abort("Unhandled zookeeper exception removing leader node", ke);
+      candidate.stop("Unhandled zookeeper exception removing leader node: "
+          + ke.getMessage());
+    } catch (InterruptedException e) {
+      watcher.abort("Unhandled zookeeper exception removing leader node", e);
+      candidate.stop("Unhandled zookeeper exception removing leader node: "
+          + e.getMessage());
+    }
+  }
+
+  public boolean hasLeader() {
+    return leaderExists.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
new file mode 100644
index 0000000..595e713
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Base class for internal listeners of ZooKeeper events.
+ *
+ * The {@link ZKWatcher} for a process will execute the appropriate
+ * methods of implementations of this class.  In order to receive events from
+ * the watcher, every listener must register itself via {@link ZKWatcher#registerListener}.
+ *
+ * Subclasses need only override those methods in which they are interested.
+ *
+ * Note that the watcher will be blocked when invoking methods in listeners so
+ * they must not be long-running.
+ */
+@InterfaceAudience.Private
+public abstract class ZKListener {
+
+  // Reference to the zk watcher which also contains configuration and constants
+  protected ZKWatcher watcher;
+
+  /**
+   * Construct a ZooKeeper event listener.
+   */
+  public ZKListener(ZKWatcher watcher) {
+    this.watcher = watcher;
+  }
+
+  /**
+   * Called when a new node has been created.
+   * @param path full path of the new node
+   */
+  public void nodeCreated(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when a node has been deleted
+   * @param path full path of the deleted node
+   */
+  public void nodeDeleted(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has changed data.
+   * @param path full path of the updated node
+   */
+  public void nodeDataChanged(String path) {
+    // no-op
+  }
+
+  /**
+   * Called when an existing node has a child node added or removed.
+   * @param path full path of the node whose children have changed
+   */
+  public void nodeChildrenChanged(String path) {
+    // no-op
+  }
+
+  /**
+   * @return The watcher associated with this listener
+   */
+  public ZKWatcher getWatcher() {
+    return this.watcher;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
new file mode 100644
index 0000000..9cb0e7d
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.shaded.com.google.common.base.Stopwatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeperMain;
+
+/**
+ * Tool for running ZookeeperMain from HBase by  reading a ZooKeeper server
+ * from HBase XML configuration.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ZKMainServer {
+  private static final String SERVER_ARG = "-server";
+
+  public String parse(final Configuration c) {
+    return ZKConfig.getZKQuorumServersString(c);
+  }
+
+  /**
+   * ZooKeeper 3.4.6 broke being able to pass commands on command line.
+   * See ZOOKEEPER-1897.  This class is a hack to restore this faclity.
+   */
+  private static class HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain extends ZooKeeperMain {
+    public HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(String[] args)
+    throws IOException, InterruptedException {
+      super(args);
+      // Make sure we are connected before we proceed. Can take a while on some systems.
If we
+      // run the command without being connected, we get ConnectionLoss KeeperErrorConnection...
+      Stopwatch stopWatch = Stopwatch.createStarted();
+      while (!this.zk.getState().isConnected()) {
+        Thread.sleep(1);
+        if (stopWatch.elapsed(TimeUnit.SECONDS) > 10) {
+          throw new InterruptedException("Failed connect after waiting " +
+              stopWatch.elapsed(TimeUnit.SECONDS) + "seconds; state=" + this.zk.getState()
+
+              "; " + this.zk);
+        }
+      }
+    }
+
+    /**
+     * Run the command-line args passed.  Calls System.exit when done.
+     * @throws KeeperException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    void runCmdLine() throws KeeperException, IOException, InterruptedException {
+      processCmd(this.cl);
+      System.exit(0);
+    }
+  }
+
+  /**
+   * @param args
+   * @return True if argument strings have a '-server' in them.
+   */
+  private static boolean hasServer(final String args[]) {
+    return args.length > 0 && args[0].equals(SERVER_ARG);
+  }
+
+  /**
+   * @param args
+   * @return True if command-line arguments were passed.
+   */
+  private static boolean hasCommandLineArguments(final String args[]) {
+    if (hasServer(args)) {
+      if (args.length < 2) throw new IllegalStateException("-server param but no value");
+      return args.length > 2;
+    }
+    return args.length > 0;
+  }
+
+  /**
+   * Run the tool.
+   * @param args Command line arguments. First arg is path to zookeepers file.
+   */
+  public static void main(String args[]) throws Exception {
+    String [] newArgs = args;
+    if (!hasServer(args)) {
+      // Add the zk ensemble from configuration if none passed on command-line.
+      Configuration conf = HBaseConfiguration.create();
+      String hostport = new ZKMainServer().parse(conf);
+      if (hostport != null && hostport.length() > 0) {
+        newArgs = new String[args.length + 2];
+        System.arraycopy(args, 0, newArgs, 2, args.length);
+        newArgs[0] = "-server";
+        newArgs[1] = hostport;
+      }
+    }
+    // If command-line arguments, run our hack so they are executed.
+    // ZOOKEEPER-1897 was committed to zookeeper-3.4.6 but elsewhere in this class we say
+    // 3.4.6 breaks command-processing; TODO.
+    if (hasCommandLineArguments(args)) {
+      HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain zkm =
+        new HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(newArgs);
+      zkm.runCmdLine();
+    } else {
+      ZooKeeperMain.main(newArgs);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
new file mode 100644
index 0000000..20d4a55
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class used to push numbers about ZooKeeper into the metrics subsystem. This will take
a
+ * single function call and turn it into multiple manipulations of the hadoop metrics system.
+ */
+@InterfaceAudience.Private
+public class ZKMetrics implements ZKMetricsListener {
+  private final MetricsZooKeeperSource source;
+
+  public ZKMetrics() {
+    this(CompatibilitySingletonFactory.getInstance(MetricsZooKeeperSource.class));
+  }
+
+  @VisibleForTesting
+  public ZKMetrics(MetricsZooKeeperSource s) {
+    this.source = s;
+  }
+
+  @Override
+  public void registerAuthFailedException() {
+    source.incrementAuthFailedCount();
+  }
+
+  @Override
+  public void registerConnectionLossException() {
+    source.incrementConnectionLossCount();
+  }
+
+  @Override
+  public void registerDataInconsistencyException() {
+    source.incrementDataInconsistencyCount();
+  }
+
+  @Override
+  public void registerInvalidACLException() {
+    source.incrementInvalidACLCount();
+  }
+
+  @Override
+  public void registerNoAuthException() {
+    source.incrementNoAuthCount();
+  }
+
+  @Override
+  public void registerOperationTimeoutException() {
+    source.incrementOperationTimeoutCount();
+  }
+
+  @Override
+  public void registerRuntimeInconsistencyException() {
+    source.incrementRuntimeInconsistencyCount();
+  }
+
+  @Override
+  public void registerSessionExpiredException() {
+    source.incrementSessionExpiredCount();
+  }
+
+  @Override
+  public void registerSystemErrorException() {
+    source.incrementSystemErrorCount();
+  }
+
+  @Override
+  public void registerFailedZKCall() {
+    source.incrementTotalFailedZKCalls();
+  }
+
+  @Override
+  public void registerReadOperationLatency(long latency) {
+    source.recordReadOperationLatency(latency);
+  }
+
+  @Override
+  public void registerWriteOperationLatency(long latency) {
+    source.recordWriteOperationLatency(latency);
+  }
+
+  @Override
+  public void registerSyncOperationLatency(long latency) {
+    source.recordSyncOperationLatency(latency);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
new file mode 100644
index 0000000..f17925e
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface ZKMetricsListener {
+
+  /**
+   * An AUTHFAILED Exception was seen.
+   */
+  void registerAuthFailedException();
+
+  /**
+   * A CONNECTIONLOSS Exception was seen.
+   */
+  void registerConnectionLossException();
+
+  /**
+   * A DATAINCONSISTENCY Exception was seen.
+   */
+  void registerDataInconsistencyException();
+
+  /**
+   * An INVALIDACL Exception was seen.
+   */
+  void registerInvalidACLException();
+
+  /**
+   * A NOAUTH Exception was seen.
+   */
+  void registerNoAuthException();
+
+  /**
+   * A OPERATIONTIMEOUT Exception was seen.
+   */
+  void registerOperationTimeoutException();
+
+  /**
+   * A RUNTIMEINCONSISTENCY Exception was seen.
+   */
+  void registerRuntimeInconsistencyException();
+
+  /**
+   * A SESSIONEXPIRED Exception was seen.
+   */
+  void registerSessionExpiredException();
+
+  /**
+   * A SYSTEMERROR Exception was seen.
+   */
+  void registerSystemErrorException();
+
+  /**
+   * A ZooKeeper API Call failed.
+   */
+  void registerFailedZKCall();
+
+  /**
+   * Register the latency incurred for read operations.
+   */
+  void registerReadOperationLatency(long latency);
+
+  /**
+   * Register the latency incurred for write operations.
+   */
+  void registerWriteOperationLatency(long latency);
+
+  /**
+   * Register the latency incurred for sync operations.
+   */
+  void registerSyncOperationLatency(long latency);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
new file mode 100644
index 0000000..8ce41e3
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the availability and value of a single ZooKeeper node.
+ *
+ * <p>Utilizes the {@link ZKListener} interface to get the necessary
+ * ZooKeeper events related to the node.
+ *
+ * <p>This is the base class used by trackers in both the Master and
+ * RegionServers.
+ */
+@InterfaceAudience.Private
+public abstract class ZKNodeTracker extends ZKListener {
+  // LOG is being used in subclasses, hence keeping it protected
+  protected static final Log LOG = LogFactory.getLog(ZKNodeTracker.class);
+  /** Path of node being tracked */
+  protected final String node;
+
+  /** Data of the node being tracked */
+  private byte [] data;
+
+  /** Used to abort if a fatal error occurs */
+  protected final Abortable abortable;
+
+  private boolean stopped = false;
+
+  /**
+   * Constructs a new ZK node tracker.
+   *
+   * <p>After construction, use {@link #start} to kick off tracking.
+   *
+   * @param watcher
+   * @param node
+   * @param abortable
+   */
+  public ZKNodeTracker(ZKWatcher watcher, String node,
+                       Abortable abortable) {
+    super(watcher);
+    this.node = node;
+    this.abortable = abortable;
+    this.data = null;
+  }
+
+  /**
+   * Starts the tracking of the node in ZooKeeper.
+   *
+   * <p>Use {@link #blockUntilAvailable()} to block until the node is available
+   * or {@link #getData(boolean)} to get the data of the node if it is available.
+   */
+  public synchronized void start() {
+    this.watcher.registerListener(this);
+    try {
+      if(ZKUtil.watchAndCheckExists(watcher, node)) {
+        byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+        if(data != null) {
+          this.data = data;
+        } else {
+          // It existed but now does not, try again to ensure a watch is set
+          LOG.debug("Try starting again because there is no data from " + node);
+          start();
+        }
+      }
+    } catch (KeeperException e) {
+      abortable.abort("Unexpected exception during initialization, aborting", e);
+    }
+  }
+
+  public synchronized void stop() {
+    this.stopped = true;
+    notifyAll();
+  }
+
+  /**
+   * Gets the data of the node, blocking until the node is available.
+   *
+   * @return data of the node
+   * @throws InterruptedException if the waiting thread is interrupted
+   */
+  public synchronized byte [] blockUntilAvailable()
+  throws InterruptedException {
+    return blockUntilAvailable(0, false);
+  }
+
+  /**
+   * Gets the data of the node, blocking until the node is available or the
+   * specified timeout has elapsed.
+   *
+   * @param timeout maximum time to wait for the node data to be available,
+   * n milliseconds.  Pass 0 for no timeout.
+   * @return data of the node
+   * @throws InterruptedException if the waiting thread is interrupted
+   */
+  public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh)
+  throws InterruptedException {
+    if (timeout < 0) throw new IllegalArgumentException();
+    boolean notimeout = timeout == 0;
+    long startTime = System.currentTimeMillis();
+    long remaining = timeout;
+    if (refresh) {
+      try {
+        // This does not create a watch if the node does not exists
+        this.data = ZKUtil.getDataAndWatch(watcher, node);
+      } catch(KeeperException e) {
+        // We use to abort here, but in some cases the abort is ignored (
+        //  (empty Abortable), so it's better to log...
+        LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+        abortable.abort("Unexpected exception handling blockUntilAvailable", e);
+      }
+    }
+    boolean nodeExistsChecked = (!refresh ||data!=null);
+    while (!this.stopped && (notimeout || remaining > 0) && this.data
== null) {
+      if (!nodeExistsChecked) {
+        try {
+          nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1);
+        } catch (KeeperException e) {
+          LOG.warn(
+            "Got exception while trying to check existence in  ZooKeeper" +
+            " of the node: "+node+", retrying if timeout not reached",e );
+        }
+
+        // It did not exists, and now it does.
+        if (nodeExistsChecked){
+          LOG.debug("Node " + node + " now exists, resetting a watcher");
+          try {
+            // This does not create a watch if the node does not exists
+            this.data = ZKUtil.getDataAndWatch(watcher, node);
+          } catch (KeeperException e) {
+            LOG.warn("Unexpected exception handling blockUntilAvailable", e);
+            abortable.abort("Unexpected exception handling blockUntilAvailable", e);
+          }
+        }
+      }
+      // We expect a notification; but we wait with a
+      //  a timeout to lower the impact of a race condition if any
+      wait(100);
+      remaining = timeout - (System.currentTimeMillis() - startTime);
+    }
+    return this.data;
+  }
+
+  /**
+   * Gets the data of the node.
+   *
+   * <p>If the node is currently available, the most up-to-date known version of
+   * the data is returned.  If the node is not currently available, null is
+   * returned.
+   * @param refresh whether to refresh the data by calling ZK directly.
+   * @return data of the node, null if unavailable
+   */
+  public synchronized byte [] getData(boolean refresh) {
+    if (refresh) {
+      try {
+        this.data = ZKUtil.getDataAndWatch(watcher, node);
+      } catch(KeeperException e) {
+        abortable.abort("Unexpected exception handling getData", e);
+      }
+    }
+    return this.data;
+  }
+
+  public String getNode() {
+    return this.node;
+  }
+
+  @Override
+  public synchronized void nodeCreated(String path) {
+    if (!path.equals(node)) return;
+    try {
+      byte [] data = ZKUtil.getDataAndWatch(watcher, node);
+      if (data != null) {
+        this.data = data;
+        notifyAll();
+      } else {
+        nodeDeleted(path);
+      }
+    } catch(KeeperException e) {
+      abortable.abort("Unexpected exception handling nodeCreated event", e);
+    }
+  }
+
+  @Override
+  public synchronized void nodeDeleted(String path) {
+    if(path.equals(node)) {
+      try {
+        if(ZKUtil.watchAndCheckExists(watcher, node)) {
+          nodeCreated(path);
+        } else {
+          this.data = null;
+        }
+      } catch(KeeperException e) {
+        abortable.abort("Unexpected exception handling nodeDeleted event", e);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void nodeDataChanged(String path) {
+    if(path.equals(node)) {
+      nodeCreated(path);
+    }
+  }
+
+  /**
+   * Checks if the baseznode set as per the property 'zookeeper.znode.parent'
+   * exists.
+   * @return true if baseznode exists.
+   *         false if doesnot exists.
+   */
+  public boolean checkIfBaseNodeAvailable() {
+    try {
+      if (ZKUtil.checkExists(watcher, watcher.znodePaths.baseZNode) == -1) {
+        return false;
+      }
+    } catch (KeeperException e) {
+      abortable.abort("Exception while checking if basenode (" + watcher.znodePaths.baseZNode
+          + ") exists in ZooKeeper.",
+        e);
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "ZKNodeTracker{" +
+        "node='" + node + ", stopped=" + stopped + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
new file mode 100644
index 0000000..8116c23
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Tool for reading ZooKeeper servers from HBase XML configuration and producing
+ * a line-by-line list for use by bash scripts.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ZKServerTool {
+  public static ServerName[] readZKNodes(Configuration conf) {
+    List<ServerName> hosts = new LinkedList<>();
+    String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+
+    String[] values = quorum.split(",");
+    for (String value : values) {
+      String[] parts = value.split(":");
+      String host = parts[0];
+      int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+      if (parts.length > 1) {
+        port = Integer.parseInt(parts[1]);
+      }
+      hosts.add(ServerName.valueOf(host, port, -1));
+    }
+    return hosts.toArray(new ServerName[hosts.size()]);
+  }
+
+  /**
+   * Run the tool.
+   * @param args Command line arguments.
+   */
+  public static void main(String args[]) {
+    for(ServerName server: readZKNodes(HBaseConfiguration.create())) {
+      // bin/zookeeper.sh relies on the "ZK host" string for grepping which is case sensitive.
+      System.out.println("ZK host: " + server.getHostname());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
new file mode 100644
index 0000000..33cc43e
--- /dev/null
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -0,0 +1,122 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed
splitting of WAL logs.
+ */
+@InterfaceAudience.Private
+public class ZKSplitLog {
+  private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
+
+  /**
+   * Gets the full path node name for the log file being split.
+   * This method will url encode the filename.
+   * @param zkw zk reference
+   * @param filename log file name (only the basename)
+   */
+  public static String getEncodedNodeName(ZKWatcher zkw, String filename) {
+    return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, encode(filename));
+  }
+
+  public static String getFileName(String node) {
+    String basename = node.substring(node.lastIndexOf('/') + 1);
+    return decode(basename);
+  }
+
+  static String encode(String s) {
+    try {
+      return URLEncoder.encode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLENCODER doesn't support UTF-8");
+    }
+  }
+
+  static String decode(String s) {
+    try {
+      return URLDecoder.decode(s, "UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException("URLDecoder doesn't support UTF-8");
+    }
+  }
+
+  public static String getRescanNode(ZKWatcher zkw) {
+    return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, "RESCAN");
+  }
+
+  /**
+   * @param name the last part in path
+   * @return whether the node name represents a rescan node
+   */
+  public static boolean isRescanNode(String name) {
+    return name.startsWith("RESCAN");
+  }
+
+  /**
+   * @param zkw
+   * @param path the absolute path, starts with '/'
+   * @return whether the path represents a rescan node
+   */
+  public static boolean isRescanNode(ZKWatcher zkw, String path) {
+    String prefix = getRescanNode(zkw);
+    if (path.length() <= prefix.length()) {
+      return false;
+    }
+    for (int i = 0; i < prefix.length(); i++) {
+      if (prefix.charAt(i) != path.charAt(i)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static Path getSplitLogDir(Path rootdir, String tmpname) {
+    return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
+  }
+
+  public static void markCorrupted(Path rootdir, String logFileName,
+      FileSystem fs) {
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
+    try {
+      fs.createNewFile(file);
+    } catch (IOException e) {
+      LOG.warn("Could not flag a log file as corrupted. Failed to create " +
+          file, e);
+    }
+  }
+
+  public static boolean isCorrupted(Path rootdir, String logFileName,
+      FileSystem fs) throws IOException {
+    Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
+    boolean isCorrupt;
+    isCorrupt = fs.exists(file);
+    return isCorrupt;
+  }
+}


Mime
View raw message