hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [12/13] hbase git commit: HBASE-19114 Split out o.a.h.h.zookeeper from hbase-server and hbase-client
Date Fri, 17 Nov 2017 21:20:44 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
deleted file mode 100644
index 04f709f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++ /dev/null
@@ -1,814 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.appendMetaData;
-import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.trace.TraceUtil;
-import org.apache.htrace.core.TraceScope;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.proto.SetDataRequest;
-
-/**
- * A zookeeper that can handle 'recoverable' errors.
- * To handle recoverable errors, developers need to realize that there are two
- * classes of requests: idempotent and non-idempotent requests. Read requests
- * and unconditional sets and deletes are examples of idempotent requests, they
- * can be reissued with the same results.
- * (Although, the delete may throw a NoNodeException on reissue its effect on
- * the ZooKeeper state is the same.) Non-idempotent requests need special
- * handling, application and library writers need to keep in mind that they may
- * need to encode information in the data or name of znodes to detect
- * retries. A simple example is a create that uses a sequence flag.
- * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
- * loss exception, that process will reissue another
- * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
- * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
- * that x-109 was the result of the previous create, so the process actually
- * owns both x-109 and x-111. An easy way around this is to use "x-process id-"
- * when doing the create. If the process is using an id of 352, before reissuing
- * the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
- * "x-352-109", x-333-110". The process will know that the original create
- * succeeded an the znode it created is "x-352-109".
- * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
- */
-@InterfaceAudience.Private
-public class RecoverableZooKeeper {
-  private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
-  // the actual ZooKeeper client instance
-  private ZooKeeper zk;
-  private final RetryCounterFactory retryCounterFactory;
-  // An identifier of this process in the cluster
-  private final String identifier;
-  private final byte[] id;
-  private Watcher watcher;
-  private int sessionTimeout;
-  private String quorumServers;
-  private final ZooKeeperMetricsListener metrics;
-
-  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
-      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
-  throws IOException {
-    this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
-        null);
-  }
-
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
-      justification="None. Its always been this way.")
-  public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
-      Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String
identifier)
-  throws IOException {
-    // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as
we should.
-    this.retryCounterFactory =
-      new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
-
-    if (identifier == null || identifier.length() == 0) {
-      // the identifier = processID@hostName
-      identifier = ManagementFactory.getRuntimeMXBean().getName();
-    }
-    LOG.info("Process identifier=" + identifier +
-      " connecting to ZooKeeper ensemble=" + quorumServers);
-    this.identifier = identifier;
-    this.id = Bytes.toBytes(identifier);
-
-    this.watcher = watcher;
-    this.sessionTimeout = sessionTimeout;
-    this.quorumServers = quorumServers;
-    this.metrics = new MetricsZooKeeper();
-    try {checkZk();} catch (Exception x) {/* ignore */}
-  }
-
-  /**
-   * Try to create a ZooKeeper connection. Turns any exception encountered into a
-   * KeeperException.OperationTimeoutException so it can retried.
-   * @return The created ZooKeeper connection object
-   * @throws KeeperException
-   */
-  protected synchronized ZooKeeper checkZk() throws KeeperException {
-    if (this.zk == null) {
-      try {
-        this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
-      } catch (IOException ex) {
-        LOG.warn("Unable to create ZooKeeper Connection", ex);
-        throw new KeeperException.OperationTimeoutException();
-      }
-    }
-    return zk;
-  }
-
-  public synchronized void reconnectAfterExpiration()
-        throws IOException, KeeperException, InterruptedException {
-    if (zk != null) {
-      LOG.info("Closing dead ZooKeeper connection, session" +
-        " was: 0x"+Long.toHexString(zk.getSessionId()));
-      zk.close();
-      // reset the ZooKeeper connection
-      zk = null;
-    }
-    checkZk();
-    LOG.info("Recreated a ZooKeeper, session" +
-      " is: 0x"+Long.toHexString(zk.getSessionId()));
-  }
-
-  /**
-   * delete is an idempotent operation. Retry before throwing exception.
-   * This function will not throw NoNodeException if the path does not
-   * exist.
-   */
-  public void delete(String path, int version) throws InterruptedException, KeeperException
{
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      boolean isRetry = false; // False for first attempt, true for all retries.
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          checkZk().delete(path, version);
-          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case NONODE:
-              if (isRetry) {
-                LOG.debug("Node " + path + " already deleted. Assuming a " +
-                    "previous attempt succeeded.");
-                return;
-              }
-              LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
-              throw e;
-
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "delete");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "delete");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-        isRetry = true;
-      }
-    }
-  }
-
-  /**
-   * exists is an idempotent operation. Retry before throwing exception
-   * @return A Stat instance
-   */
-  public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException
{
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          Stat nodeStat = checkZk().exists(path, watcher);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime, 1));
-          return nodeStat;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "exists");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "exists");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * exists is an idempotent operation. Retry before throwing exception
-   * @return A Stat instance
-   */
-  public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
{
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          Stat nodeStat = checkZk().exists(path, watch);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return nodeStat;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "exists");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "exists");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
-      String opName) throws KeeperException {
-    if (!retryCounter.shouldRetry()) {
-      LOG.error("ZooKeeper " + opName + " failed after "
-        + retryCounter.getMaxAttempts() + " attempts");
-      throw e;
-    }
-    LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," +
-        "exception=" + e);
-  }
-
-  /**
-   * getChildren is an idempotent operation. Retry before throwing exception
-   * @return List of children znodes
-   */
-  public List<String> getChildren(String path, Watcher watcher)
-    throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          List<String> children = checkZk().getChildren(path, watcher);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return children;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "getChildren");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "getChildren");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * getChildren is an idempotent operation. Retry before throwing exception
-   * @return List of children znodes
-   */
-  public List<String> getChildren(String path, boolean watch)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          List<String> children = checkZk().getChildren(path, watch);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return children;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "getChildren");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "getChildren");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * getData is an idempotent operation. Retry before throwing exception
-   * @return Data
-   */
-  public byte[] getData(String path, Watcher watcher, Stat stat)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          byte[] revData = checkZk().getData(path, watcher, stat);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return removeMetaData(revData);
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "getData");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "getData");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * getData is an idempotent operation. Retry before throwing exception
-   * @return Data
-   */
-  public byte[] getData(String path, boolean watch, Stat stat)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          byte[] revData = checkZk().getData(path, watch, stat);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return removeMetaData(revData);
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "getData");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "getData");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * setData is NOT an idempotent operation. Retry may cause BadVersion Exception
-   * Adding an identifier field into the data to check whether
-   * badversion is caused by the result of previous correctly setData
-   * @return Stat instance
-   */
-  public Stat setData(String path, byte[] data, int version)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      byte[] newData = appendMetaData(id, data);
-      boolean isRetry = false;
-      long startTime;
-      while (true) {
-        try {
-          startTime = EnvironmentEdgeManager.currentTime();
-          Stat nodeStat = checkZk().setData(path, newData, version);
-          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return nodeStat;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "setData");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "setData");
-              break;
-            case BADVERSION:
-              if (isRetry) {
-                // try to verify whether the previous setData success or not
-                try{
-                  Stat stat = new Stat();
-                  startTime = EnvironmentEdgeManager.currentTime();
-                  byte[] revData = checkZk().getData(path, false, stat);
-                  this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-                  if(Bytes.compareTo(revData, newData) == 0) {
-                    // the bad version is caused by previous successful setData
-                    return stat;
-                  }
-                } catch(KeeperException keeperException){
-                  this.metrics.registerFailedZKCall();
-                  // the ZK is not reliable at this moment. just throwing exception
-                  throw keeperException;
-                }
-              }
-            // throw other exceptions and verified bad version exceptions
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-        isRetry = true;
-      }
-    }
-  }
-
-  /**
-   * getAcl is an idempotent operation. Retry before throwing exception
-   * @return list of ACLs
-   */
-  public List<ACL> getAcl(String path, Stat stat)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          List<ACL> nodeACL = checkZk().getACL(path, stat);
-          this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return nodeACL;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "getAcl");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "getAcl");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * setAcl is an idempotent operation. Retry before throwing exception
-   * @return list of ACLs
-   */
-  public Stat setAcl(String path, List<ACL> acls, int version)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          Stat nodeStat = checkZk().setACL(path, acls, version);
-          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return nodeStat;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "setAcl");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "setAcl");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  /**
-   * <p>
-   * NONSEQUENTIAL create is idempotent operation.
-   * Retry before throwing exceptions.
-   * But this function will not throw the NodeExist exception back to the
-   * application.
-   * </p>
-   * <p>
-   * But SEQUENTIAL is NOT idempotent operation. It is necessary to add
-   * identifier to the path to verify, whether the previous one is successful
-   * or not.
-   * </p>
-   *
-   * @return Path
-   */
-  public String create(String path, byte[] data, List<ACL> acl,
-      CreateMode createMode)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
-      byte[] newData = appendMetaData(id, data);
-      switch (createMode) {
-        case EPHEMERAL:
-        case PERSISTENT:
-          return createNonSequential(path, newData, acl, createMode);
-
-        case EPHEMERAL_SEQUENTIAL:
-        case PERSISTENT_SEQUENTIAL:
-          return createSequential(path, newData, acl, createMode);
-
-        default:
-          throw new IllegalArgumentException("Unrecognized CreateMode: " +
-              createMode);
-      }
-    }
-  }
-
-  private String createNonSequential(String path, byte[] data, List<ACL> acl,
-      CreateMode createMode) throws KeeperException, InterruptedException {
-    RetryCounter retryCounter = retryCounterFactory.create();
-    boolean isRetry = false; // False for first attempt, true for all retries.
-    long startTime;
-    while (true) {
-      try {
-        startTime = EnvironmentEdgeManager.currentTime();
-        String nodePath = checkZk().create(path, data, acl, createMode);
-        this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-        return nodePath;
-      } catch (KeeperException e) {
-        this.metrics.registerFailedZKCall();
-        switch (e.code()) {
-          case NODEEXISTS:
-            if (isRetry) {
-              // If the connection was lost, there is still a possibility that
-              // we have successfully created the node at our previous attempt,
-              // so we read the node and compare.
-              startTime = EnvironmentEdgeManager.currentTime();
-              byte[] currentData = checkZk().getData(path, false, null);
-              this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-              if (currentData != null &&
-                  Bytes.compareTo(currentData, data) == 0) {
-                // We successfully created a non-sequential node
-                return path;
-              }
-              LOG.error("Node " + path + " already exists with " +
-                  Bytes.toStringBinary(currentData) + ", could not write " +
-                  Bytes.toStringBinary(data));
-              throw e;
-            }
-            LOG.debug("Node " + path + " already exists");
-            throw e;
-
-          case CONNECTIONLOSS:
-            this.metrics.registerConnectionLossException();
-            retryOrThrow(retryCounter, e, "create");
-            break;
-          case OPERATIONTIMEOUT:
-            this.metrics.registerOperationTimeoutException();
-            retryOrThrow(retryCounter, e, "create");
-            break;
-
-          default:
-            throw e;
-        }
-      }
-      retryCounter.sleepUntilNextRetry();
-      isRetry = true;
-    }
-  }
-
-  private String createSequential(String path, byte[] data,
-      List<ACL> acl, CreateMode createMode)
-  throws KeeperException, InterruptedException {
-    RetryCounter retryCounter = retryCounterFactory.create();
-    boolean first = true;
-    String newPath = path+this.identifier;
-    while (true) {
-      try {
-        if (!first) {
-          // Check if we succeeded on a previous attempt
-          String previousResult = findPreviousSequentialNode(newPath);
-          if (previousResult != null) {
-            return previousResult;
-          }
-        }
-        first = false;
-        long startTime = EnvironmentEdgeManager.currentTime();
-        String nodePath = checkZk().create(newPath, data, acl, createMode);
-        this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-        return nodePath;
-      } catch (KeeperException e) {
-        this.metrics.registerFailedZKCall();
-        switch (e.code()) {
-          case CONNECTIONLOSS:
-            this.metrics.registerConnectionLossException();
-            retryOrThrow(retryCounter, e, "create");
-            break;
-          case OPERATIONTIMEOUT:
-            this.metrics.registerOperationTimeoutException();
-            retryOrThrow(retryCounter, e, "create");
-            break;
-
-          default:
-            throw e;
-        }
-      }
-      retryCounter.sleepUntilNextRetry();
-    }
-  }
-  /**
-   * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
-   * instances to actually pass to multi (need to do this in order to appendMetaData).
-   */
-  private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
-  throws UnsupportedOperationException {
-    if(ops == null) return null;
-
-    List<Op> preparedOps = new LinkedList<>();
-    for (Op op : ops) {
-      if (op.getType() == ZooDefs.OpCode.create) {
-        CreateRequest create = (CreateRequest)op.toRequestRecord();
-        preparedOps.add(Op.create(create.getPath(), appendMetaData(id, create.getData()),
-          create.getAcl(), create.getFlags()));
-      } else if (op.getType() == ZooDefs.OpCode.delete) {
-        // no need to appendMetaData for delete
-        preparedOps.add(op);
-      } else if (op.getType() == ZooDefs.OpCode.setData) {
-        SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
-        preparedOps.add(Op.setData(setData.getPath(), appendMetaData(id, setData.getData()),
-          setData.getVersion()));
-      } else {
-        throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
-      }
-    }
-    return preparedOps;
-  }
-
-  /**
-   * Run multiple operations in a transactional manner. Retry before throwing exception
-   */
-  public List<OpResult> multi(Iterable<Op> ops)
-  throws KeeperException, InterruptedException {
-    try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
-      RetryCounter retryCounter = retryCounterFactory.create();
-      Iterable<Op> multiOps = prepareZKMulti(ops);
-      while (true) {
-        try {
-          long startTime = EnvironmentEdgeManager.currentTime();
-          List<OpResult> opResults = checkZk().multi(multiOps);
-          this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-          return opResults;
-        } catch (KeeperException e) {
-          this.metrics.registerFailedZKCall();
-          switch (e.code()) {
-            case CONNECTIONLOSS:
-              this.metrics.registerConnectionLossException();
-              retryOrThrow(retryCounter, e, "multi");
-              break;
-            case OPERATIONTIMEOUT:
-              this.metrics.registerOperationTimeoutException();
-              retryOrThrow(retryCounter, e, "multi");
-              break;
-
-            default:
-              throw e;
-          }
-        }
-        retryCounter.sleepUntilNextRetry();
-      }
-    }
-  }
-
-  private String findPreviousSequentialNode(String path)
-    throws KeeperException, InterruptedException {
-    int lastSlashIdx = path.lastIndexOf('/');
-    assert(lastSlashIdx != -1);
-    String parent = path.substring(0, lastSlashIdx);
-    String nodePrefix = path.substring(lastSlashIdx+1);
-    long startTime = EnvironmentEdgeManager.currentTime();
-    List<String> nodes = checkZk().getChildren(parent, false);
-    this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-    List<String> matching = filterByPrefix(nodes, nodePrefix);
-    for (String node : matching) {
-      String nodePath = parent + "/" + node;
-      startTime = EnvironmentEdgeManager.currentTime();
-      Stat stat = checkZk().exists(nodePath, false);
-      this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-      if (stat != null) {
-        return nodePath;
-      }
-    }
-    return null;
-  }
-
-  public synchronized long getSessionId() {
-    return zk == null ? -1 : zk.getSessionId();
-  }
-
-  public synchronized void close() throws InterruptedException {
-    if (zk != null) zk.close();
-  }
-
-  public synchronized States getState() {
-    return zk == null ? null : zk.getState();
-  }
-
-  public synchronized ZooKeeper getZooKeeper() {
-    return zk;
-  }
-
-  public synchronized byte[] getSessionPasswd() {
-    return zk == null ? null : zk.getSessionPasswd();
-  }
-
-  public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException
{
-    long startTime = EnvironmentEdgeManager.currentTime();
-    checkZk().sync(path, cb, null);
-    this.metrics.registerSyncOperationLatency(Math.min(EnvironmentEdgeManager.currentTime()
- startTime,  1));
-  }
-
-  /**
-   * Filters the given node list by the given prefixes.
-   * This method is all-inclusive--if any element in the node list starts
-   * with any of the given prefixes, then it is included in the result.
-   *
-   * @param nodes the nodes to filter
-   * @param prefixes the prefixes to include in the result
-   * @return list of every element that starts with one of the prefixes
-   */
-  private static List<String> filterByPrefix(List<String> nodes,
-      String... prefixes) {
-    List<String> lockChildren = new ArrayList<>();
-    for (String child : nodes){
-      for (String prefix : prefixes){
-        if (child.startsWith(prefix)){
-          lockChildren.add(child);
-          break;
-        }
-      }
-    }
-    return lockChildren;
-  }
-
-  public String getIdentifier() {
-    return identifier;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
deleted file mode 100644
index 9ef7691..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.UUID;
-
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Publishes and synchronizes a unique identifier specific to a given HBase
- * cluster.  The stored identifier is read from the file system by the active
- * master on startup, and is subsequently available to all watchers (including
- * clients).
- */
-@InterfaceAudience.Private
-public class ZKClusterId {
-  private ZooKeeperWatcher watcher;
-  private Abortable abortable;
-  private String id;
-
-  public ZKClusterId(ZooKeeperWatcher watcher, Abortable abortable) {
-    this.watcher = watcher;
-    this.abortable = abortable;
-  }
-
-  public boolean hasId() {
-    return getId() != null;
-  }
-
-  public String getId() {
-    try {
-      if (id == null) {
-        id = readClusterIdZNode(watcher);
-      }
-    } catch (KeeperException ke) {
-      abortable.abort("Unexpected exception from ZooKeeper reading cluster ID",
-          ke);
-    }
-    return id;
-  }
-
-  public static String readClusterIdZNode(ZooKeeperWatcher watcher)
-  throws KeeperException {
-    if (ZKUtil.checkExists(watcher, watcher.znodePaths.clusterIdZNode) != -1) {
-      byte [] data;
-      try {
-        data = ZKUtil.getData(watcher, watcher.znodePaths.clusterIdZNode);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      }
-      if (data != null) {
-        try {
-          return ClusterId.parseFrom(data).toString();
-        } catch (DeserializationException e) {
-          throw ZKUtil.convert(e);
-        }
-      }
-    }
-    return null;
-  }
-
-  public static void setClusterId(ZooKeeperWatcher watcher, ClusterId id)
-      throws KeeperException {
-    ZKUtil.createSetData(watcher, watcher.znodePaths.clusterIdZNode, id.toByteArray());
-  }
-
-  /**
-   * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
-   * @param zkw watcher connected to an ensemble
-   * @return the UUID read from zookeeper
-   * @throws KeeperException
-   */
-  public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
-    String uuid = readClusterIdZNode(zkw);
-    return uuid == null ? null : UUID.fromString(uuid);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/330b0d05/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
deleted file mode 100644
index b0610b0..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKLeaderManager.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Handles coordination of a single "leader" instance among many possible
- * candidates.  The first {@link ZKLeaderManager} to successfully create
- * the given znode becomes the leader, allowing the instance to continue
- * with whatever processing must be protected.  Other {@link ZKLeaderManager}
- * instances will wait to be notified of changes to the leader znode.
- * If the current master instance fails, the ephemeral leader znode will
- * be removed, and all waiting instances will be notified, with the race
- * to claim the leader znode beginning all over again.
- * @deprecated Not used
- */
-@Deprecated
-@InterfaceAudience.Private
-public class ZKLeaderManager extends ZooKeeperListener {
-  private static final Log LOG = LogFactory.getLog(ZKLeaderManager.class);
-
-  private final AtomicBoolean leaderExists = new AtomicBoolean();
-  private String leaderZNode;
-  private byte[] nodeId;
-  private Stoppable candidate;
-
-  public ZKLeaderManager(ZooKeeperWatcher watcher, String leaderZNode,
-      byte[] identifier, Stoppable candidate) {
-    super(watcher);
-    this.leaderZNode = leaderZNode;
-    this.nodeId = identifier;
-    this.candidate = candidate;
-  }
-
-  public void start() {
-    try {
-      watcher.registerListener(this);
-      String parent = ZKUtil.getParent(leaderZNode);
-      if (ZKUtil.checkExists(watcher, parent) < 0) {
-        ZKUtil.createWithParents(watcher, parent);
-      }
-    } catch (KeeperException ke) {
-      watcher.abort("Unhandled zk exception when starting", ke);
-      candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
-    }
-  }
-
-  @Override
-  public void nodeCreated(String path) {
-    if (leaderZNode.equals(path) && !candidate.isStopped()) {
-      handleLeaderChange();
-    }
-  }
-
-  @Override
-  public void nodeDeleted(String path) {
-    if (leaderZNode.equals(path) && !candidate.isStopped()) {
-      handleLeaderChange();
-    }
-  }
-
-  private void handleLeaderChange() {
-    try {
-      synchronized(leaderExists) {
-        if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
-          LOG.info("Found new leader for znode: "+leaderZNode);
-          leaderExists.set(true);
-        } else {
-          LOG.info("Leader change, but no new leader found");
-          leaderExists.set(false);
-          leaderExists.notifyAll();
-        }
-      }
-    } catch (KeeperException ke) {
-      watcher.abort("ZooKeeper error checking for leader znode", ke);
-      candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
-    }
-  }
-
-  /**
-   * Blocks until this instance has claimed the leader ZNode in ZooKeeper
-   */
-  public void waitToBecomeLeader() {
-    while (!candidate.isStopped()) {
-      try {
-        if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
-          // claimed the leader znode
-          leaderExists.set(true);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Claimed the leader znode as '"+
-                Bytes.toStringBinary(nodeId)+"'");
-          }
-          return;
-        }
-
-        // if claiming the node failed, there should be another existing node
-        byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
-        if (currentId != null && Bytes.equals(currentId, nodeId)) {
-          // claimed with our ID, but we didn't grab it, possibly restarted?
-          LOG.info("Found existing leader with our ID ("+
-              Bytes.toStringBinary(nodeId)+"), removing");
-          ZKUtil.deleteNode(watcher, leaderZNode);
-          leaderExists.set(false);
-        } else {
-          LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
-          leaderExists.set(true);
-        }
-      } catch (KeeperException ke) {
-        watcher.abort("Unexpected error from ZK, stopping candidate", ke);
-        candidate.stop("Unexpected error from ZK: "+ke.getMessage());
-        return;
-      }
-
-      // wait for next chance
-      synchronized(leaderExists) {
-        while (leaderExists.get() && !candidate.isStopped()) {
-          try {
-            leaderExists.wait();
-          } catch (InterruptedException ie) {
-            LOG.debug("Interrupted waiting on leader", ie);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes the leader znode, if it is currently claimed by this instance.
-   */
-  public void stepDownAsLeader() {
-    try {
-      synchronized(leaderExists) {
-        if (!leaderExists.get()) {
-          return;
-        }
-        byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
-        if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
-          LOG.info("Stepping down as leader");
-          ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
-          leaderExists.set(false);
-        } else {
-          LOG.info("Not current leader, no need to step down");
-        }
-      }
-    } catch (KeeperException ke) {
-      watcher.abort("Unhandled zookeeper exception removing leader node", ke);
-      candidate.stop("Unhandled zookeeper exception removing leader node: "
-          + ke.getMessage());
-    } catch (InterruptedException e) {
-      watcher.abort("Unhandled zookeeper exception removing leader node", e);
-      candidate.stop("Unhandled zookeeper exception removing leader node: "
-          + e.getMessage());
-    }
-  }
-
-  public boolean hasLeader() {
-    return leaderExists.get();
-  }
-}


Mime
View raw message