Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2FB4E200D42 for ; Fri, 17 Nov 2017 22:27:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2E242160C11; Fri, 17 Nov 2017 21:27:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7A01D160BFB for ; Fri, 17 Nov 2017 22:27:42 +0100 (CET) Received: (qmail 67209 invoked by uid 500); 17 Nov 2017 21:27:38 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 67105 invoked by uid 99); 17 Nov 2017 21:27:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Nov 2017 21:27:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4B78CDFC2E; Fri, 17 Nov 2017 21:27:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: appy@apache.org To: commits@hbase.apache.org Date: Fri, 17 Nov 2017 21:27:39 -0000 Message-Id: <025f5bb61ec0467a980cf5c053b70972@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper from hbase-server and hbase-client archived-at: Fri, 17 Nov 2017 21:27:44 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java new file mode 100644 index 0000000..2f2b036 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.UUID; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ClusterId; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.zookeeper.KeeperException; + +/** + * Publishes and synchronizes a unique identifier specific to a given HBase + * cluster. The stored identifier is read from the file system by the active + * master on startup, and is subsequently available to all watchers (including + * clients). + */ +@InterfaceAudience.Private +public class ZKClusterId { + private ZKWatcher watcher; + private Abortable abortable; + private String id; + + public ZKClusterId(ZKWatcher watcher, Abortable abortable) { + this.watcher = watcher; + this.abortable = abortable; + } + + public boolean hasId() { + return getId() != null; + } + + public String getId() { + try { + if (id == null) { + id = readClusterIdZNode(watcher); + } + } catch (KeeperException ke) { + abortable.abort("Unexpected exception from ZooKeeper reading cluster ID", + ke); + } + return id; + } + + public static String readClusterIdZNode(ZKWatcher watcher) + throws KeeperException { + if (ZKUtil.checkExists(watcher, watcher.znodePaths.clusterIdZNode) != -1) { + byte [] data; + try { + data = ZKUtil.getData(watcher, watcher.znodePaths.clusterIdZNode); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + if (data != null) { + try { + return ClusterId.parseFrom(data).toString(); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + } + } + return null; + } + + public static void setClusterId(ZKWatcher watcher, ClusterId id) + throws KeeperException { + ZKUtil.createSetData(watcher, watcher.znodePaths.clusterIdZNode, id.toByteArray()); + } + + /** + * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions + * @param zkw watcher connected to an ensemble + * @return the UUID read from zookeeper + * @throws KeeperException + */ + public static UUID getUUIDForCluster(ZKWatcher zkw) throws KeeperException { + String uuid = readClusterIdZNode(zkw); + return uuid == null ? null : UUID.fromString(uuid); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java new file mode 100644 index 0000000..edd2ccd --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +/** + * Handles coordination of a single "leader" instance among many possible + * candidates. The first {@link ZKLeaderManager} to successfully create + * the given znode becomes the leader, allowing the instance to continue + * with whatever processing must be protected. Other {@link ZKLeaderManager} + * instances will wait to be notified of changes to the leader znode. + * If the current master instance fails, the ephemeral leader znode will + * be removed, and all waiting instances will be notified, with the race + * to claim the leader znode beginning all over again. + * @deprecated Not used + */ +@Deprecated +@InterfaceAudience.Private +public class ZKLeaderManager extends ZKListener { + private static final Log LOG = LogFactory.getLog(ZKLeaderManager.class); + + private final AtomicBoolean leaderExists = new AtomicBoolean(); + private String leaderZNode; + private byte[] nodeId; + private Stoppable candidate; + + public ZKLeaderManager(ZKWatcher watcher, String leaderZNode, + byte[] identifier, Stoppable candidate) { + super(watcher); + this.leaderZNode = leaderZNode; + this.nodeId = identifier; + this.candidate = candidate; + } + + public void start() { + try { + watcher.registerListener(this); + String parent = ZKUtil.getParent(leaderZNode); + if (ZKUtil.checkExists(watcher, parent) < 0) { + ZKUtil.createWithParents(watcher, parent); + } + } catch (KeeperException ke) { + watcher.abort("Unhandled zk exception when starting", ke); + candidate.stop("Unhandled zk exception starting up: "+ke.getMessage()); + } + } + + @Override + public void nodeCreated(String path) { + if (leaderZNode.equals(path) && !candidate.isStopped()) { + handleLeaderChange(); + } + } + + @Override + public void nodeDeleted(String path) { + if (leaderZNode.equals(path) && !candidate.isStopped()) { + handleLeaderChange(); + } + } + + private void handleLeaderChange() { + try { + synchronized(leaderExists) { + if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) { + LOG.info("Found new leader for znode: "+leaderZNode); + leaderExists.set(true); + } else { + LOG.info("Leader change, but no new leader found"); + leaderExists.set(false); + leaderExists.notifyAll(); + } + } + } catch (KeeperException ke) { + watcher.abort("ZooKeeper error checking for leader znode", ke); + candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage()); + } + } + + /** + * Blocks until this instance has claimed the leader ZNode in ZooKeeper + */ + public void waitToBecomeLeader() { + while (!candidate.isStopped()) { + try { + if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) { + // claimed the leader znode + leaderExists.set(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Claimed the leader znode as '"+ + Bytes.toStringBinary(nodeId)+"'"); + } + return; + } + + // if claiming the node failed, there should be another existing node + byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode); + if (currentId != null && Bytes.equals(currentId, nodeId)) { + // claimed with our ID, but we didn't grab it, possibly restarted? + LOG.info("Found existing leader with our ID ("+ + Bytes.toStringBinary(nodeId)+"), removing"); + ZKUtil.deleteNode(watcher, leaderZNode); + leaderExists.set(false); + } else { + LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId)); + leaderExists.set(true); + } + } catch (KeeperException ke) { + watcher.abort("Unexpected error from ZK, stopping candidate", ke); + candidate.stop("Unexpected error from ZK: "+ke.getMessage()); + return; + } + + // wait for next chance + synchronized(leaderExists) { + while (leaderExists.get() && !candidate.isStopped()) { + try { + leaderExists.wait(); + } catch (InterruptedException ie) { + LOG.debug("Interrupted waiting on leader", ie); + } + } + } + } + } + + /** + * Removes the leader znode, if it is currently claimed by this instance. + */ + public void stepDownAsLeader() { + try { + synchronized(leaderExists) { + if (!leaderExists.get()) { + return; + } + byte[] leaderId = ZKUtil.getData(watcher, leaderZNode); + if (leaderId != null && Bytes.equals(nodeId, leaderId)) { + LOG.info("Stepping down as leader"); + ZKUtil.deleteNodeFailSilent(watcher, leaderZNode); + leaderExists.set(false); + } else { + LOG.info("Not current leader, no need to step down"); + } + } + } catch (KeeperException ke) { + watcher.abort("Unhandled zookeeper exception removing leader node", ke); + candidate.stop("Unhandled zookeeper exception removing leader node: " + + ke.getMessage()); + } catch (InterruptedException e) { + watcher.abort("Unhandled zookeeper exception removing leader node", e); + candidate.stop("Unhandled zookeeper exception removing leader node: " + + e.getMessage()); + } + } + + public boolean hasLeader() { + return leaderExists.get(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java new file mode 100644 index 0000000..595e713 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKListener.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for internal listeners of ZooKeeper events. + * + * The {@link ZKWatcher} for a process will execute the appropriate + * methods of implementations of this class. In order to receive events from + * the watcher, every listener must register itself via {@link ZKWatcher#registerListener}. + * + * Subclasses need only override those methods in which they are interested. + * + * Note that the watcher will be blocked when invoking methods in listeners so + * they must not be long-running. + */ +@InterfaceAudience.Private +public abstract class ZKListener { + + // Reference to the zk watcher which also contains configuration and constants + protected ZKWatcher watcher; + + /** + * Construct a ZooKeeper event listener. + */ + public ZKListener(ZKWatcher watcher) { + this.watcher = watcher; + } + + /** + * Called when a new node has been created. + * @param path full path of the new node + */ + public void nodeCreated(String path) { + // no-op + } + + /** + * Called when a node has been deleted + * @param path full path of the deleted node + */ + public void nodeDeleted(String path) { + // no-op + } + + /** + * Called when an existing node has changed data. + * @param path full path of the updated node + */ + public void nodeDataChanged(String path) { + // no-op + } + + /** + * Called when an existing node has a child node added or removed. + * @param path full path of the node whose children have changed + */ + public void nodeChildrenChanged(String path) { + // no-op + } + + /** + * @return The watcher associated with this listener + */ + public ZKWatcher getWatcher() { + return this.watcher; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java new file mode 100644 index 0000000..9cb0e7d --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.shaded.com.google.common.base.Stopwatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeperMain; + +/** + * Tool for running ZookeeperMain from HBase by reading a ZooKeeper server + * from HBase XML configuration. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class ZKMainServer { + private static final String SERVER_ARG = "-server"; + + public String parse(final Configuration c) { + return ZKConfig.getZKQuorumServersString(c); + } + + /** + * ZooKeeper 3.4.6 broke being able to pass commands on command line. + * See ZOOKEEPER-1897. This class is a hack to restore this faclity. + */ + private static class HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain extends ZooKeeperMain { + public HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(String[] args) + throws IOException, InterruptedException { + super(args); + // Make sure we are connected before we proceed. Can take a while on some systems. If we + // run the command without being connected, we get ConnectionLoss KeeperErrorConnection... + Stopwatch stopWatch = Stopwatch.createStarted(); + while (!this.zk.getState().isConnected()) { + Thread.sleep(1); + if (stopWatch.elapsed(TimeUnit.SECONDS) > 10) { + throw new InterruptedException("Failed connect after waiting " + + stopWatch.elapsed(TimeUnit.SECONDS) + "seconds; state=" + this.zk.getState() + + "; " + this.zk); + } + } + } + + /** + * Run the command-line args passed. Calls System.exit when done. + * @throws KeeperException + * @throws IOException + * @throws InterruptedException + */ + void runCmdLine() throws KeeperException, IOException, InterruptedException { + processCmd(this.cl); + System.exit(0); + } + } + + /** + * @param args + * @return True if argument strings have a '-server' in them. + */ + private static boolean hasServer(final String args[]) { + return args.length > 0 && args[0].equals(SERVER_ARG); + } + + /** + * @param args + * @return True if command-line arguments were passed. + */ + private static boolean hasCommandLineArguments(final String args[]) { + if (hasServer(args)) { + if (args.length < 2) throw new IllegalStateException("-server param but no value"); + return args.length > 2; + } + return args.length > 0; + } + + /** + * Run the tool. + * @param args Command line arguments. First arg is path to zookeepers file. + */ + public static void main(String args[]) throws Exception { + String [] newArgs = args; + if (!hasServer(args)) { + // Add the zk ensemble from configuration if none passed on command-line. + Configuration conf = HBaseConfiguration.create(); + String hostport = new ZKMainServer().parse(conf); + if (hostport != null && hostport.length() > 0) { + newArgs = new String[args.length + 2]; + System.arraycopy(args, 0, newArgs, 2, args.length); + newArgs[0] = "-server"; + newArgs[1] = hostport; + } + } + // If command-line arguments, run our hack so they are executed. + // ZOOKEEPER-1897 was committed to zookeeper-3.4.6 but elsewhere in this class we say + // 3.4.6 breaks command-processing; TODO. + if (hasCommandLineArguments(args)) { + HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain zkm = + new HACK_UNTIL_ZOOKEEPER_1897_ZooKeeperMain(newArgs); + zkm.runCmdLine(); + } else { + ZooKeeperMain.main(newArgs); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java new file mode 100644 index 0000000..20d4a55 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetrics.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * Class used to push numbers about ZooKeeper into the metrics subsystem. This will take a + * single function call and turn it into multiple manipulations of the hadoop metrics system. + */ +@InterfaceAudience.Private +public class ZKMetrics implements ZKMetricsListener { + private final MetricsZooKeeperSource source; + + public ZKMetrics() { + this(CompatibilitySingletonFactory.getInstance(MetricsZooKeeperSource.class)); + } + + @VisibleForTesting + public ZKMetrics(MetricsZooKeeperSource s) { + this.source = s; + } + + @Override + public void registerAuthFailedException() { + source.incrementAuthFailedCount(); + } + + @Override + public void registerConnectionLossException() { + source.incrementConnectionLossCount(); + } + + @Override + public void registerDataInconsistencyException() { + source.incrementDataInconsistencyCount(); + } + + @Override + public void registerInvalidACLException() { + source.incrementInvalidACLCount(); + } + + @Override + public void registerNoAuthException() { + source.incrementNoAuthCount(); + } + + @Override + public void registerOperationTimeoutException() { + source.incrementOperationTimeoutCount(); + } + + @Override + public void registerRuntimeInconsistencyException() { + source.incrementRuntimeInconsistencyCount(); + } + + @Override + public void registerSessionExpiredException() { + source.incrementSessionExpiredCount(); + } + + @Override + public void registerSystemErrorException() { + source.incrementSystemErrorCount(); + } + + @Override + public void registerFailedZKCall() { + source.incrementTotalFailedZKCalls(); + } + + @Override + public void registerReadOperationLatency(long latency) { + source.recordReadOperationLatency(latency); + } + + @Override + public void registerWriteOperationLatency(long latency) { + source.recordWriteOperationLatency(latency); + } + + @Override + public void registerSyncOperationLatency(long latency) { + source.recordSyncOperationLatency(latency); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java new file mode 100644 index 0000000..f17925e --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMetricsListener.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface ZKMetricsListener { + + /** + * An AUTHFAILED Exception was seen. + */ + void registerAuthFailedException(); + + /** + * A CONNECTIONLOSS Exception was seen. + */ + void registerConnectionLossException(); + + /** + * A DATAINCONSISTENCY Exception was seen. + */ + void registerDataInconsistencyException(); + + /** + * An INVALIDACL Exception was seen. + */ + void registerInvalidACLException(); + + /** + * A NOAUTH Exception was seen. + */ + void registerNoAuthException(); + + /** + * A OPERATIONTIMEOUT Exception was seen. + */ + void registerOperationTimeoutException(); + + /** + * A RUNTIMEINCONSISTENCY Exception was seen. + */ + void registerRuntimeInconsistencyException(); + + /** + * A SESSIONEXPIRED Exception was seen. + */ + void registerSessionExpiredException(); + + /** + * A SYSTEMERROR Exception was seen. + */ + void registerSystemErrorException(); + + /** + * A ZooKeeper API Call failed. + */ + void registerFailedZKCall(); + + /** + * Register the latency incurred for read operations. + */ + void registerReadOperationLatency(long latency); + + /** + * Register the latency incurred for write operations. + */ + void registerWriteOperationLatency(long latency); + + /** + * Register the latency incurred for sync operations. + */ + void registerSyncOperationLatency(long latency); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java new file mode 100644 index 0000000..8ce41e3 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKNodeTracker.java @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the availability and value of a single ZooKeeper node. + * + *

Utilizes the {@link ZKListener} interface to get the necessary + * ZooKeeper events related to the node. + * + *

This is the base class used by trackers in both the Master and + * RegionServers. + */ +@InterfaceAudience.Private +public abstract class ZKNodeTracker extends ZKListener { + // LOG is being used in subclasses, hence keeping it protected + protected static final Log LOG = LogFactory.getLog(ZKNodeTracker.class); + /** Path of node being tracked */ + protected final String node; + + /** Data of the node being tracked */ + private byte [] data; + + /** Used to abort if a fatal error occurs */ + protected final Abortable abortable; + + private boolean stopped = false; + + /** + * Constructs a new ZK node tracker. + * + *

After construction, use {@link #start} to kick off tracking. + * + * @param watcher + * @param node + * @param abortable + */ + public ZKNodeTracker(ZKWatcher watcher, String node, + Abortable abortable) { + super(watcher); + this.node = node; + this.abortable = abortable; + this.data = null; + } + + /** + * Starts the tracking of the node in ZooKeeper. + * + *

Use {@link #blockUntilAvailable()} to block until the node is available + * or {@link #getData(boolean)} to get the data of the node if it is available. + */ + public synchronized void start() { + this.watcher.registerListener(this); + try { + if(ZKUtil.watchAndCheckExists(watcher, node)) { + byte [] data = ZKUtil.getDataAndWatch(watcher, node); + if(data != null) { + this.data = data; + } else { + // It existed but now does not, try again to ensure a watch is set + LOG.debug("Try starting again because there is no data from " + node); + start(); + } + } + } catch (KeeperException e) { + abortable.abort("Unexpected exception during initialization, aborting", e); + } + } + + public synchronized void stop() { + this.stopped = true; + notifyAll(); + } + + /** + * Gets the data of the node, blocking until the node is available. + * + * @return data of the node + * @throws InterruptedException if the waiting thread is interrupted + */ + public synchronized byte [] blockUntilAvailable() + throws InterruptedException { + return blockUntilAvailable(0, false); + } + + /** + * Gets the data of the node, blocking until the node is available or the + * specified timeout has elapsed. + * + * @param timeout maximum time to wait for the node data to be available, + * n milliseconds. Pass 0 for no timeout. + * @return data of the node + * @throws InterruptedException if the waiting thread is interrupted + */ + public synchronized byte [] blockUntilAvailable(long timeout, boolean refresh) + throws InterruptedException { + if (timeout < 0) throw new IllegalArgumentException(); + boolean notimeout = timeout == 0; + long startTime = System.currentTimeMillis(); + long remaining = timeout; + if (refresh) { + try { + // This does not create a watch if the node does not exists + this.data = ZKUtil.getDataAndWatch(watcher, node); + } catch(KeeperException e) { + // We use to abort here, but in some cases the abort is ignored ( + // (empty Abortable), so it's better to log... + LOG.warn("Unexpected exception handling blockUntilAvailable", e); + abortable.abort("Unexpected exception handling blockUntilAvailable", e); + } + } + boolean nodeExistsChecked = (!refresh ||data!=null); + while (!this.stopped && (notimeout || remaining > 0) && this.data == null) { + if (!nodeExistsChecked) { + try { + nodeExistsChecked = (ZKUtil.checkExists(watcher, node) != -1); + } catch (KeeperException e) { + LOG.warn( + "Got exception while trying to check existence in ZooKeeper" + + " of the node: "+node+", retrying if timeout not reached",e ); + } + + // It did not exists, and now it does. + if (nodeExistsChecked){ + LOG.debug("Node " + node + " now exists, resetting a watcher"); + try { + // This does not create a watch if the node does not exists + this.data = ZKUtil.getDataAndWatch(watcher, node); + } catch (KeeperException e) { + LOG.warn("Unexpected exception handling blockUntilAvailable", e); + abortable.abort("Unexpected exception handling blockUntilAvailable", e); + } + } + } + // We expect a notification; but we wait with a + // a timeout to lower the impact of a race condition if any + wait(100); + remaining = timeout - (System.currentTimeMillis() - startTime); + } + return this.data; + } + + /** + * Gets the data of the node. + * + *

If the node is currently available, the most up-to-date known version of + * the data is returned. If the node is not currently available, null is + * returned. + * @param refresh whether to refresh the data by calling ZK directly. + * @return data of the node, null if unavailable + */ + public synchronized byte [] getData(boolean refresh) { + if (refresh) { + try { + this.data = ZKUtil.getDataAndWatch(watcher, node); + } catch(KeeperException e) { + abortable.abort("Unexpected exception handling getData", e); + } + } + return this.data; + } + + public String getNode() { + return this.node; + } + + @Override + public synchronized void nodeCreated(String path) { + if (!path.equals(node)) return; + try { + byte [] data = ZKUtil.getDataAndWatch(watcher, node); + if (data != null) { + this.data = data; + notifyAll(); + } else { + nodeDeleted(path); + } + } catch(KeeperException e) { + abortable.abort("Unexpected exception handling nodeCreated event", e); + } + } + + @Override + public synchronized void nodeDeleted(String path) { + if(path.equals(node)) { + try { + if(ZKUtil.watchAndCheckExists(watcher, node)) { + nodeCreated(path); + } else { + this.data = null; + } + } catch(KeeperException e) { + abortable.abort("Unexpected exception handling nodeDeleted event", e); + } + } + } + + @Override + public synchronized void nodeDataChanged(String path) { + if(path.equals(node)) { + nodeCreated(path); + } + } + + /** + * Checks if the baseznode set as per the property 'zookeeper.znode.parent' + * exists. + * @return true if baseznode exists. + * false if doesnot exists. + */ + public boolean checkIfBaseNodeAvailable() { + try { + if (ZKUtil.checkExists(watcher, watcher.znodePaths.baseZNode) == -1) { + return false; + } + } catch (KeeperException e) { + abortable.abort("Exception while checking if basenode (" + watcher.znodePaths.baseZNode + + ") exists in ZooKeeper.", + e); + } + return true; + } + + @Override + public String toString() { + return "ZKNodeTracker{" + + "node='" + node + ", stopped=" + stopped + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java new file mode 100644 index 0000000..8116c23 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; + +import java.util.LinkedList; +import java.util.List; + +/** + * Tool for reading ZooKeeper servers from HBase XML configuration and producing + * a line-by-line list for use by bash scripts. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class ZKServerTool { + public static ServerName[] readZKNodes(Configuration conf) { + List hosts = new LinkedList<>(); + String quorum = conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + + String[] values = quorum.split(","); + for (String value : values) { + String[] parts = value.split(":"); + String host = parts[0]; + int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; + if (parts.length > 1) { + port = Integer.parseInt(parts[1]); + } + hosts.add(ServerName.valueOf(host, port, -1)); + } + return hosts.toArray(new ServerName[hosts.size()]); + } + + /** + * Run the tool. + * @param args Command line arguments. + */ + public static void main(String args[]) { + for(ServerName server: readZKNodes(HBaseConfiguration.create())) { + // bin/zookeeper.sh relies on the "ZK host" string for grepping which is case sensitive. + System.out.println("ZK host: " + server.getHostname()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java new file mode 100644 index 0000000..33cc43e --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed splitting of WAL logs. + */ +@InterfaceAudience.Private +public class ZKSplitLog { + private static final Log LOG = LogFactory.getLog(ZKSplitLog.class); + + /** + * Gets the full path node name for the log file being split. + * This method will url encode the filename. + * @param zkw zk reference + * @param filename log file name (only the basename) + */ + public static String getEncodedNodeName(ZKWatcher zkw, String filename) { + return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, encode(filename)); + } + + public static String getFileName(String node) { + String basename = node.substring(node.lastIndexOf('/') + 1); + return decode(basename); + } + + static String encode(String s) { + try { + return URLEncoder.encode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("URLENCODER doesn't support UTF-8"); + } + } + + static String decode(String s) { + try { + return URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("URLDecoder doesn't support UTF-8"); + } + } + + public static String getRescanNode(ZKWatcher zkw) { + return ZNodePaths.joinZNode(zkw.znodePaths.splitLogZNode, "RESCAN"); + } + + /** + * @param name the last part in path + * @return whether the node name represents a rescan node + */ + public static boolean isRescanNode(String name) { + return name.startsWith("RESCAN"); + } + + /** + * @param zkw + * @param path the absolute path, starts with '/' + * @return whether the path represents a rescan node + */ + public static boolean isRescanNode(ZKWatcher zkw, String path) { + String prefix = getRescanNode(zkw); + if (path.length() <= prefix.length()) { + return false; + } + for (int i = 0; i < prefix.length(); i++) { + if (prefix.charAt(i) != path.charAt(i)) { + return false; + } + } + return true; + } + + public static Path getSplitLogDir(Path rootdir, String tmpname) { + return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname); + } + + public static void markCorrupted(Path rootdir, String logFileName, + FileSystem fs) { + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); + try { + fs.createNewFile(file); + } catch (IOException e) { + LOG.warn("Could not flag a log file as corrupted. Failed to create " + + file, e); + } + } + + public static boolean isCorrupted(Path rootdir, String logFileName, + FileSystem fs) throws IOException { + Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt"); + boolean isCorrupt; + isCorrupt = fs.exists(file); + return isCorrupt; + } +}