accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [accumulo] 01/01: Merge branch '1.9' into 2.0
Date Fri, 07 Jun 2019 16:41:59 GMT
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 3bdb6cd94d0fbdd88bff6da4aeb682bd4391c847
Merge: c8776a9 3311218
Author: Christopher Tubbs <ctubbsii@apache.org>
AuthorDate: Fri Jun 7 12:41:25 2019 -0400

    Merge branch '1.9' into 2.0

 core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java | 4 +---
 core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java   | 4 ++--
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index 98e49d3,0000000..0bddb07
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@@ -1,263 -1,0 +1,261 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import java.util.List;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.accumulo.fate.util.Retry;
 +import org.apache.accumulo.fate.util.Retry.RetryFactory;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil.ZooKeeperConnectionInfo;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.Code;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.data.ACL;
 +import org.apache.zookeeper.data.Stat;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class ZooReader implements IZooReader {
 +  private static final Logger log = LoggerFactory.getLogger(ZooReader.class);
 +
 +  protected String keepers;
 +  protected int timeout;
 +  private final RetryFactory retryFactory;
-   private final ZooKeeperConnectionInfo info;
 +
 +  protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth)
{
 +    return ZooSession.getSession(keepers, timeout, scheme, auth);
 +  }
 +
 +  protected ZooKeeper getZooKeeper() {
 +    return getSession(keepers, timeout, null, null);
 +  }
 +
 +  protected RetryFactory getRetryFactory() {
 +    return retryFactory;
 +  }
 +
 +  protected void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
 +    log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
 +    if (retry.canRetry()) {
 +      retry.useRetry();
 +      return;
 +    }
 +
 +    log.error("Retry attempts ({}) exceeded trying to communicate with ZooKeeper",
 +        retry.retriesCompleted());
 +    throw e;
 +  }
 +
 +  @Override
 +  public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException
{
 +    return getData(zPath, false, stat);
 +  }
 +
 +  @Override
 +  public byte[] getData(String zPath, boolean watch, Stat stat)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().getData(zPath, watch, stat);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public byte[] getData(String zPath, Watcher watcher, Stat stat)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().getData(zPath, watcher, stat);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().exists(zPath, false);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public Stat getStatus(String zPath, Watcher watcher)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().exists(zPath, watcher);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public List<String> getChildren(String zPath) throws KeeperException, InterruptedException
{
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().getChildren(zPath, false);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public List<String> getChildren(String zPath, Watcher watcher)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().getChildren(zPath, watcher);
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public boolean exists(String zPath) throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().exists(zPath, false) != null;
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public boolean exists(String zPath, Watcher watcher)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = getRetryFactory().createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper().exists(zPath, watcher) != null;
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  @Override
 +  public void sync(final String path) throws KeeperException, InterruptedException {
 +    final AtomicInteger rc = new AtomicInteger();
 +    final CountDownLatch waiter = new CountDownLatch(1);
 +    getZooKeeper().sync(path, (code, arg1, arg2) -> {
 +      rc.set(code);
 +      waiter.countDown();
 +    }, null);
 +    waiter.await();
 +    Code code = Code.get(rc.get());
 +    if (code != KeeperException.Code.OK) {
 +      throw KeeperException.create(code);
 +    }
 +  }
 +
 +  @Override
 +  public List<ACL> getACL(String zPath, Stat stat) throws KeeperException, InterruptedException
{
-     return ZooUtil.getACL(info, zPath, stat);
++    return ZooUtil.getACL(getZooKeeper(), zPath, stat);
 +  }
 +
 +  public ZooReader(String keepers, int timeout) {
 +    this.keepers = keepers;
 +    this.timeout = timeout;
 +    this.retryFactory = ZooUtil.DEFAULT_RETRY;
-     this.info = new ZooKeeperConnectionInfo(keepers, timeout, null, null);
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index 032986f,0000000..6573614
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@@ -1,617 -1,0 +1,617 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.fate.zookeeper;
 +
 +import static java.util.Objects.requireNonNull;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.math.BigInteger;
 +import java.net.UnknownHostException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.volume.VolumeConfiguration;
 +import org.apache.accumulo.fate.util.Retry;
 +import org.apache.accumulo.fate.util.Retry.RetryFactory;
 +import org.apache.commons.lang.builder.HashCodeBuilder;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.KeeperException.Code;
 +import org.apache.zookeeper.KeeperException.NoNodeException;
 +import org.apache.zookeeper.ZooDefs.Ids;
 +import org.apache.zookeeper.ZooDefs.Perms;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.data.ACL;
 +import org.apache.zookeeper.data.Stat;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class ZooUtil {
 +
 +  public static final RetryFactory DEFAULT_RETRY = Retry.builder().maxRetries(10)
 +      .retryAfter(250, MILLISECONDS).incrementBy(250, MILLISECONDS).maxWait(5, TimeUnit.SECONDS)
 +      .backOffFactor(1.5).logInterval(3, TimeUnit.MINUTES).createFactory();
 +
 +  private static final Logger log = LoggerFactory.getLogger(ZooUtil.class);
 +
 +  public enum NodeExistsPolicy {
 +    SKIP, OVERWRITE, FAIL
 +  }
 +
 +  public enum NodeMissingPolicy {
 +    SKIP, CREATE, FAIL
 +  }
 +
 +  public static class LockID {
 +    public long eid;
 +    public String path;
 +    public String node;
 +
 +    public LockID(String root, String serializedLID) {
 +      String[] sa = serializedLID.split("\\$");
 +      int lastSlash = sa[0].lastIndexOf('/');
 +
 +      if (sa.length != 2 || lastSlash < 0) {
 +        throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
 +      }
 +
 +      if (lastSlash == 0)
 +        path = root;
 +      else
 +        path = root + "/" + sa[0].substring(0, lastSlash);
 +      node = sa[0].substring(lastSlash + 1);
 +      eid = new BigInteger(sa[1], 16).longValue();
 +    }
 +
 +    public LockID(String path, String node, long eid) {
 +      this.path = path;
 +      this.node = node;
 +      this.eid = eid;
 +    }
 +
 +    public String serialize(String root) {
 +
 +      return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
 +    }
 +  }
 +
 +  protected static class ZooKeeperConnectionInfo {
 +    String keepers, scheme;
 +    int timeout;
 +    byte[] auth;
 +
 +    public ZooKeeperConnectionInfo(String keepers, int timeout, String scheme, byte[] auth)
{
 +      requireNonNull(keepers);
 +      this.keepers = keepers;
 +      this.timeout = timeout;
 +      this.scheme = scheme;
 +      this.auth = auth;
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      final HashCodeBuilder hcb = new HashCodeBuilder(31, 47);
 +      hcb.append(keepers).append(timeout);
 +      if (scheme != null) {
 +        hcb.append(scheme);
 +      }
 +      if (auth != null) {
 +        hcb.append(auth);
 +      }
 +      return hcb.toHashCode();
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof ZooKeeperConnectionInfo) {
 +        ZooKeeperConnectionInfo other = (ZooKeeperConnectionInfo) o;
 +        if (!keepers.equals(other.keepers) || timeout != other.timeout) {
 +          return false;
 +        }
 +
 +        if (scheme != null) {
 +          if (other.scheme == null) {
 +            // Ours is non-null, theirs is null
 +            return false;
 +          } else if (!scheme.equals(other.scheme)) {
 +            // Both non-null but not equal
 +            return false;
 +          }
 +        }
 +
 +        if (auth != null) {
 +          if (other.auth == null) {
 +            return false;
 +          } else {
 +            return Arrays.equals(auth, other.auth);
 +          }
 +        }
 +
 +        return true;
 +      }
 +
 +      return false;
 +    }
 +
 +    @Override
 +    public String toString() {
 +      StringBuilder sb = new StringBuilder(64);
 +      sb.append("zookeepers=").append(keepers);
 +      sb.append(", timeout=").append(timeout);
 +      sb.append(", scheme=").append(scheme);
 +      sb.append(", auth=").append(auth == null ? "null" : "REDACTED");
 +      return sb.toString();
 +    }
 +  }
 +
 +  public static final List<ACL> PRIVATE;
 +  public static final List<ACL> PUBLIC;
 +  private static final RetryFactory RETRY_FACTORY;
 +
 +  static {
 +    PRIVATE = new ArrayList<>();
 +    PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
 +    PUBLIC = new ArrayList<>();
 +    PUBLIC.addAll(PRIVATE);
 +    PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
 +    RETRY_FACTORY = DEFAULT_RETRY;
 +  }
 +
 +  protected static ZooKeeper getZooKeeper(ZooKeeperConnectionInfo info) {
 +    return getZooKeeper(info.keepers, info.timeout, info.scheme, info.auth);
 +  }
 +
 +  protected static ZooKeeper getZooKeeper(String keepers, int timeout, String scheme, byte[]
auth) {
 +    return ZooSession.getSession(keepers, timeout, scheme, auth);
 +  }
 +
 +  protected static void retryOrThrow(Retry retry, KeeperException e) throws KeeperException
{
 +    log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
 +    if (retry.canRetry()) {
 +      retry.useRetry();
 +      return;
 +    }
 +
 +    log.error("Retry attempts ({}) exceeded trying to communicate with ZooKeeper",
 +        retry.retriesCompleted());
 +    throw e;
 +  }
 +
 +  /**
 +   * This method will delete a node and all its children from zookeeper
 +   *
 +   * @param zPath
 +   *          the path to delete
 +   */
 +  static void recursiveDelete(ZooKeeperConnectionInfo info, String zPath, NodeMissingPolicy
policy)
 +      throws KeeperException, InterruptedException {
 +    if (policy.equals(NodeMissingPolicy.CREATE))
 +      throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
 +    try {
 +      List<String> children;
 +      final Retry retry = RETRY_FACTORY.createRetry();
 +      while (true) {
 +        try {
 +          children = getZooKeeper(info).getChildren(zPath, false);
 +          break;
 +        } catch (KeeperException e) {
 +          final Code c = e.code();
 +          if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +            retryOrThrow(retry, e);
 +          } else {
 +            throw e;
 +          }
 +        }
 +        retry.waitForNextAttempt();
 +      }
 +      for (String child : children)
 +        recursiveDelete(info, zPath + "/" + child, NodeMissingPolicy.SKIP);
 +
 +      Stat stat;
 +      while (true) {
 +        try {
 +          stat = getZooKeeper(info).exists(zPath, null);
 +          // Node exists
 +          if (stat != null) {
 +            try {
 +              // Try to delete it. We don't care if there was an update to the node
 +              // since we got the Stat, just delete all versions (-1).
 +              getZooKeeper(info).delete(zPath, -1);
 +              return;
 +            } catch (NoNodeException e) {
 +              // If the node is gone now, it's ok if we have SKIP
 +              if (policy.equals(NodeMissingPolicy.SKIP)) {
 +                return;
 +              }
 +              throw e;
 +            }
 +            // Let other KeeperException bubble to the outer catch
 +          } else {
 +            // If the stat is null, the node is now gone which is fine.
 +            return;
 +          }
 +        } catch (KeeperException e) {
 +          final Code c = e.code();
 +          if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +            retryOrThrow(retry, e);
 +          } else {
 +            throw e;
 +          }
 +        }
 +
 +        retry.waitForNextAttempt();
 +      }
 +    } catch (KeeperException e) {
 +      if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
 +        return;
 +      throw e;
 +    }
 +  }
 +
 +  /**
 +   * Create a persistent node with the default ACL
 +   *
 +   * @return true if the node was created or altered; false if it was skipped
 +   */
 +  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data,
 +      NodeExistsPolicy policy) throws KeeperException, InterruptedException {
 +    return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
 +  }
 +
 +  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data,
 +      int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException
{
 +    return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
 +  }
 +
 +  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data,
 +      int version, NodeExistsPolicy policy, List<ACL> acls)
 +      throws KeeperException, InterruptedException {
 +    return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
 +  }
 +
 +  private static boolean putData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
 +      CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
 +      throws KeeperException, InterruptedException {
 +    if (policy == null)
 +      policy = NodeExistsPolicy.FAIL;
 +
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        getZooKeeper(info).create(zPath, data, acls, mode);
 +        return true;
 +      } catch (KeeperException e) {
 +        final Code code = e.code();
 +        if (code == Code.NODEEXISTS) {
 +          switch (policy) {
 +            case SKIP:
 +              return false;
 +            case OVERWRITE:
 +              // overwrite the data in the node when it already exists
 +              try {
 +                getZooKeeper(info).setData(zPath, data, version);
 +                return true;
 +              } catch (KeeperException e2) {
 +                final Code code2 = e2.code();
 +                if (code2 == Code.NONODE) {
 +                  // node delete between create call and set data, so try create call again
 +                  continue;
 +                } else if (code2 == Code.CONNECTIONLOSS || code2 == Code.OPERATIONTIMEOUT
 +                    || code2 == Code.SESSIONEXPIRED) {
 +                  retryOrThrow(retry, e2);
 +                  break;
 +                } else {
 +                  // unhandled exception on setData()
 +                  throw e2;
 +                }
 +              }
 +            default:
 +              throw e;
 +          }
 +        } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT
 +            || code == Code.SESSIONEXPIRED) {
 +          retryOrThrow(retry, e);
 +        } else {
 +          // unhandled exception on create()
 +          throw e;
 +        }
 +      }
 +
 +      // Catch all to wait before retrying
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static byte[] getData(ZooKeeperConnectionInfo info, String zPath, Stat stat)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper(info).getData(zPath, false, stat);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static Stat getStatus(ZooKeeperConnectionInfo info, String zPath)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper(info).exists(zPath, false);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static boolean exists(ZooKeeperConnectionInfo info, String zPath)
 +      throws KeeperException, InterruptedException {
 +    return getStatus(info, zPath) != null;
 +  }
 +
 +  public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String source,
 +      String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException
{
 +    Stat stat = null;
 +    if (!exists(info, source))
 +      throw KeeperException.create(Code.NONODE, source);
 +    if (exists(info, destination)) {
 +      switch (policy) {
 +        case OVERWRITE:
 +          break;
 +        case SKIP:
 +          return;
 +        case FAIL:
 +        default:
 +          throw KeeperException.create(Code.NODEEXISTS, source);
 +      }
 +    }
 +
 +    stat = new Stat();
 +    byte[] data = getData(info, source, stat);
 +
 +    if (stat.getEphemeralOwner() == 0) {
 +      if (data == null)
 +        throw KeeperException.create(Code.NONODE, source);
 +      putPersistentData(info, destination, data, policy);
 +      if (stat.getNumChildren() > 0) {
 +        List<String> children;
 +        final Retry retry = RETRY_FACTORY.createRetry();
 +        while (true) {
 +          try {
 +            children = getZooKeeper(info).getChildren(source, false);
 +            break;
 +          } catch (KeeperException e) {
 +            final Code c = e.code();
 +            if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT
 +                || c == Code.SESSIONEXPIRED) {
 +              retryOrThrow(retry, e);
 +            } else {
 +              throw e;
 +            }
 +          }
 +          retry.waitForNextAttempt();
 +        }
 +        for (String child : children) {
 +          recursiveCopyPersistent(info, source + "/" + child, destination + "/" + child,
policy);
 +        }
 +      }
 +    }
 +  }
 +
 +  public static boolean putPrivatePersistentData(ZooKeeperConnectionInfo info, String zPath,
 +      byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException
{
 +    return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
 +  }
 +
 +  public static String putPersistentSequential(ZooKeeperConnectionInfo info, String zPath,
 +      byte[] data) throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC,
 +            CreateMode.PERSISTENT_SEQUENTIAL);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static String putEphemeralData(ZooKeeperConnectionInfo info, String zPath, byte[]
data)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static String putEphemeralSequential(ZooKeeperConnectionInfo info, String zPath,
 +      byte[] data) throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC,
 +            CreateMode.EPHEMERAL_SEQUENTIAL);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static byte[] getLockData(ZooCache zc, String path) {
 +
 +    List<String> children = zc.getChildren(path);
 +
 +    if (children == null || children.size() == 0) {
 +      return null;
 +    }
 +
 +    children = new ArrayList<>(children);
 +    Collections.sort(children);
 +
 +    String lockNode = children.get(0);
 +
 +    return zc.get(path + "/" + lockNode);
 +  }
 +
 +  public static boolean isLockHeld(ZooKeeperConnectionInfo info, LockID lid)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
 +        List<String> children = getZooKeeper(info).getChildren(lid.path, false);
 +
 +        if (children.size() == 0) {
 +          return false;
 +        }
 +
 +        Collections.sort(children);
 +
 +        String lockNode = children.get(0);
 +        if (!lid.node.equals(lockNode))
 +          return false;
 +
 +        Stat stat = getZooKeeper(info).exists(lid.path + "/" + lid.node, false);
 +        return stat != null && stat.getEphemeralOwner() == lid.eid;
 +      } catch (KeeperException ex) {
 +        final Code c = ex.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, ex);
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
-   public static List<ACL> getACL(ZooKeeperConnectionInfo info, String zPath, Stat
stat)
++  public static List<ACL> getACL(ZooKeeper zk, String zPath, Stat stat)
 +      throws KeeperException, InterruptedException {
 +    final Retry retry = RETRY_FACTORY.createRetry();
 +    while (true) {
 +      try {
-         return getZooKeeper(info).getACL(zPath, stat);
++        return zk.getACL(zPath, stat);
 +      } catch (KeeperException e) {
 +        final Code c = e.code();
 +        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
 +          retryOrThrow(retry, e);
 +        } else {
 +          throw e;
 +        }
 +      }
 +
 +      retry.waitForNextAttempt();
 +    }
 +  }
 +
 +  public static String getRoot(final String instanceId) {
 +    return Constants.ZROOT + "/" + instanceId;
 +  }
 +
 +  public static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration
conf,
 +      Configuration hadoopConf) {
 +    try {
 +      FileSystem fs = VolumeConfiguration.getVolume(instanceDirectory.toString(), hadoopConf,
conf)
 +          .getFileSystem();
 +      FileStatus[] files = null;
 +      try {
 +        files = fs.listStatus(instanceDirectory);
 +      } catch (FileNotFoundException ex) {
 +        // ignored
 +      }
 +      log.debug("Trying to read instance id from {}", instanceDirectory);
 +      if (files == null || files.length == 0) {
 +        log.error("unable obtain instance id at {}", instanceDirectory);
 +        throw new RuntimeException(
 +            "Accumulo not initialized, there is no instance id at " + instanceDirectory);
 +      } else if (files.length != 1) {
 +        log.error("multiple potential instances in {}", instanceDirectory);
 +        throw new RuntimeException(
 +            "Accumulo found multiple possible instance ids in " + instanceDirectory);
 +      } else {
 +        return files[0].getPath().getName();
 +      }
 +    } catch (IOException e) {
 +      log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
 +      throw new RuntimeException(
 +          "Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory,
 +          e);
 +    } catch (IllegalArgumentException exception) {
 +      /* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
 +      if (exception.getCause() instanceof UnknownHostException) {
 +        log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
 +      }
 +      throw exception;
 +    }
 +  }
 +
 +}


Mime
View raw message