accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [13/16] git commit: And some other files...
Date Mon, 22 Sep 2014 20:48:13 GMT
And some other files...



git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-CURATOR@1497620 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4640866b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4640866b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4640866b

Branch: refs/heads/ACCUMULO-CURATOR
Commit: 4640866b2ea94d0b2dc3c52f9c4fec5155343b43
Parents: e261f8f
Author: John Vines <vines@apache.org>
Authored: Fri Jun 28 00:23:52 2013 +0000
Committer: John Vines <vines@apache.org>
Committed: Fri Jun 28 00:23:52 2013 +0000

----------------------------------------------------------------------
 .../accumulo/fate/curator/CuratorCaches.java    | 218 +++++++++++++++
 .../accumulo/fate/curator/CuratorException.java |  35 +++
 .../accumulo/fate/curator/CuratorReader.java    | 122 +++++++++
 .../fate/curator/CuratorReaderWriter.java       | 215 +++++++++++++++
 .../accumulo/fate/curator/CuratorSession.java   |  81 ++++++
 .../accumulo/fate/curator/CuratorUtil.java      | 180 +++++++++++++
 .../accumulo/fate/zookeeper/IZooReader.java     |  43 ---
 .../fate/zookeeper/IZooReaderWriter.java        |  65 -----
 .../accumulo/fate/zookeeper/ZooCache.java       | 219 ---------------
 .../apache/accumulo/fate/zookeeper/ZooUtil.java | 263 -------------------
 .../server/curator/CuratorReaderWriter.java     |  48 ++++
 11 files changed, 899 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java
new file mode 100644
index 0000000..eac9b8f
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorCaches.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.log4j.Logger;
+
+/**
+ * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
+ * 
+ */
+public class CuratorCaches {
+  private static final Logger log = Logger.getLogger(CuratorCaches.class);
+  
+  private HashMap<String,NodeCache> nodeCache;
+  private HashMap<String,PathChildrenCache> childrenCache;
+  
+  private CuratorFramework curator;
+  
+  public CuratorCaches(String zooKeepers, int sessionTimeout) {
+    this(CuratorSession.getSession(zooKeepers, sessionTimeout));
+  }
+  
+  public CuratorCaches(CuratorFramework curator) {
+    this.curator = curator;
+    this.nodeCache = new HashMap<String,NodeCache>();
+    this.childrenCache = new HashMap<String,PathChildrenCache>();
+  }
+  
+  public synchronized List<ChildData> getChildren(final String zPath) {
+    return getChildren(zPath, null);
+  }
+  
+  public synchronized List<ChildData> getChildren(String zPath, PathChildrenCacheListener listener) {
+    PathChildrenCache cache = childrenCache.get(zPath);
+    if (cache == null) {
+      cache = new PathChildrenCache(curator, zPath, true);
+      if (listener != null) {
+        cache.getListenable().addListener(listener);
+      }
+      try {
+        log.debug("Starting cache against " + zPath + (listener!=null? " using listener " + listener:""));
+        cache.start(StartMode.BUILD_INITIAL_CACHE);
+        // I'll do it myself!
+        if (listener != null)
+          for (ChildData cd : cache.getCurrentData()) {
+            listener.childEvent(curator, new PathChildrenCacheEvent(Type.INITIALIZED, cd));
+          }
+        
+        // Because parent's children are being watched, we don't need to cache the individual node
+        // UNLESS we have a listener on it
+        for (ChildData child : cache.getCurrentData()) {
+          NodeCache childCache = nodeCache.get(child.getPath());
+          if (childCache != null && childCache.getListenable().size() == 0) {
+            log.debug("Removing cache " + childCache.getCurrentData().getPath() + " because parent cache was added");
+            childCache.close();
+            nodeCache.remove(child.getPath());
+          }
+        }
+      } catch (Exception e) {
+        log.error(e, e);
+        try {
+          cache.close();
+        } catch (IOException e1) {
+          // We're already in a bad state at this point, I think, but just in case
+          log.error(e, e);
+        }
+        return null;
+      }
+      childrenCache.put(zPath, cache);
+    } else if (listener != null) {
+      log.debug("LISTENER- cache is null for path " + zPath + ", but got listener " + listener.getClass() + ". this is a broken case!");
+    }
+    return cache.getCurrentData();
+  }
+  
+  public List<String> getChildKeys(final String zPath) {
+    List<String> toRet = new ArrayList<String>();
+    for (ChildData child : getChildren(zPath)) {
+      toRet.add(CuratorUtil.getNodeName(child));
+    }
+    return toRet;
+  }
+  
+  public synchronized ChildData get(final String zPath) {
+    NodeCache cache = nodeCache.get(zPath);
+    if (cache == null) {
+      PathChildrenCache cCache = childrenCache.get(CuratorUtil.getNodeParent(zPath));
+      if (cCache != null) {
+        return cCache.getCurrentData(zPath);
+      }
+      cache = new NodeCache(curator, zPath);
+      try {
+        cache.start(true);
+      } catch (Exception e) {
+        log.error(e, e);
+        try {
+          cache.close();
+        } catch (IOException e1) {
+          // We're already in a bad state at this point, I think, but just in case
+          log.error(e, e);
+        }
+        return null;
+      }
+      nodeCache.put(zPath, cache);
+    }
+    
+    return cache.getCurrentData();
+  }
+  
+  private synchronized void remove(String zPath) {
+    if (log.isTraceEnabled())
+      log.trace("removing " + zPath + " from cache");
+    NodeCache nc = nodeCache.get(zPath);
+    if (nc != null) {
+      try {
+        nc.close();
+      } catch (IOException e) {
+        log.error(e, e);
+      }
+    }
+    
+    PathChildrenCache pc = childrenCache.get(zPath);
+    if (pc != null) {
+      try {
+        pc.close();
+      } catch (IOException e) {
+        log.error(e, e);
+      }
+    }
+    
+    nodeCache.remove(zPath);
+    childrenCache.remove(zPath);
+  }
+  
+  public synchronized void clear() {
+    for (NodeCache nc : nodeCache.values()) {
+      try {
+        nc.close();
+      } catch (IOException e) {
+        log.error(e, e);
+      }
+    }
+    for (PathChildrenCache pc : childrenCache.values()) {
+      try {
+        pc.close();
+      } catch (IOException e) {
+        log.error(e, e);
+      }
+    }
+    
+    nodeCache.clear();
+    childrenCache.clear();
+  }
+  
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+  
+  public synchronized void clear(String zPath) {
+    List<String> pathsToRemove = new ArrayList<String>();
+    for (Iterator<String> i = nodeCache.keySet().iterator(); i.hasNext();) {
+      String path = i.next();
+      if (path.startsWith(zPath))
+        pathsToRemove.add(path);
+    }
+    
+    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
+      String path = i.next();
+      if (path.startsWith(zPath))
+        pathsToRemove.add(path);
+    }
+    
+    for (String path : pathsToRemove)
+      remove(path);
+  }
+  
+  private static Map<String,CuratorCaches> instances = new HashMap<String,CuratorCaches>();
+  
+  public static synchronized CuratorCaches getInstance(String zooKeepers, int sessionTimeout) {
+    String key = zooKeepers + ":" + sessionTimeout;
+    CuratorCaches zc = instances.get(key);
+    if (zc == null) {
+      zc = new CuratorCaches(zooKeepers, sessionTimeout);
+      instances.put(key, zc);
+    }
+    
+    return zc;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java
new file mode 100644
index 0000000..4e8c91c
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+
+/**
+ * Simple wrapper to deal with exceptions from Curator which are not KeeperExceptions and IterruptedExceptions
+ */
+public class CuratorException extends Exception {
+  Exception e;
+  public CuratorException(Exception e) {
+    super(e);
+    this.e = e;
+  }
+
+  private static final long serialVersionUID = 4874809817861147889L;
+  
+  public String toString() {
+    return this.getClass().getName() + ": " + e.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java
new file mode 100644
index 0000000..2f9c46f
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorReader {
+  
+  protected String keepers;
+  protected int timeout;
+  
+  private CuratorFramework curator;
+  
+  public CuratorReader(String zooKeepers, int sessionTimeout) {
+    this(CuratorSession.getSession(zooKeepers, sessionTimeout));
+  }
+  
+  public CuratorReader(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+    this(CuratorSession.getSession(zooKeepers, sessionTimeout, scheme, auth));
+  }
+  
+  public CuratorReader(CuratorFramework curator) {
+    this.curator = curator;
+  }
+  
+  @Deprecated
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+  
+  public byte[] getData(String zPath) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().getData().forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().getData().storingStatIn(stat).forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().checkExists().forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().checkExists().usingWatcher(watcher).forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().getChildren().forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().getChildren().usingWatcher(watcher).forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public boolean exists(String zPath) throws KeeperException, InterruptedException {
+    return getStatus(zPath) != null;
+  }
+  
+  public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    return getStatus(zPath, watcher) != null;
+  }
+  
+  public void sync(final String path) throws KeeperException, InterruptedException {
+    try {
+      getCurator().sync().forPath(path);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public List<ACL> getACL(String zPath) throws KeeperException, InterruptedException {
+    try {
+      return getCurator().getACL().forPath(zPath);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java
new file mode 100644
index 0000000..c03cadb
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorReaderWriter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.security.SecurityPermission;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorReaderWriter extends CuratorReader {
+  private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
+  
+  protected CuratorReaderWriter(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+    super(constructCurator(zooKeepers, sessionTimeout, scheme, auth));
+  }
+  
+  private static CuratorReaderWriter instance = null;
+  
+  public static synchronized CuratorReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
+    if (instance == null)
+      instance = new CuratorReaderWriter(zookeepers, timeInMillis, scheme, auth);
+    return instance;
+  }
+  
+  private static CuratorFramework constructCurator(String zooKeepers, int sessionTimeout, String scheme, byte[] auth) {
+    SecurityManager sm = System.getSecurityManager();
+    if (sm != null) {
+      sm.checkPermission(ZOOWRITER_PERMISSION);
+    }
+    return CuratorSession.getSession(zooKeepers, sessionTimeout, scheme, auth);
+  }
+  
+  public void recursiveDelete(String zPath) throws KeeperException, InterruptedException {
+    recursiveDelete(zPath, -1);
+  }
+  
+  @Deprecated
+  public void recursiveDelete(String zPath, int version) throws KeeperException, InterruptedException {
+    CuratorUtil.recursiveDelete(getCurator(), zPath, version);
+  }
+  
+  public static final List<ACL> PRIVATE;
+  private static final List<ACL> PUBLIC;
+  static {
+    PRIVATE = new ArrayList<ACL>();
+    PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
+    PUBLIC = new ArrayList<ACL>();
+    PUBLIC.addAll(PRIVATE);
+    PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+  }
+  
+  public enum NodeExistsPolicy {
+    SKIP, OVERWRITE, FAIL, SEQUENTIAL
+  }
+  
+  private String putData(String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls) throws KeeperException,
+      InterruptedException {
+    if (policy == null)
+      policy = NodeExistsPolicy.FAIL;
+    
+    CuratorFramework curator = getCurator();
+    try {
+      boolean exists = curator.checkExists().forPath(zPath) != null;
+
+      
+      if (!exists || policy.equals(NodeExistsPolicy.SEQUENTIAL)) {
+        return curator.create().withMode(mode).withACL(acls).forPath(zPath, data);
+      }
+      else if (policy.equals(NodeExistsPolicy.OVERWRITE)) {
+        curator.setData().withVersion(version).forPath(zPath, data);
+        return zPath;
+      }
+      return null;
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  /**
+   * Create a persistent node with the default ACL
+   * 
+   * @return true if the node was created or altered; false if it was skipped
+   */
+  public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return putPersistentData(zPath, data, -1, policy);
+  }
+  
+  public boolean putPersistentDataWithACL(String zPath, byte[] data, NodeExistsPolicy policy, List<ACL> acls)
+      throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, acls) != null;
+  }
+  
+  public boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC) != null;
+  }
+  
+  public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE) != null;
+  }
+  
+  public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.PERSISTENT_SEQUENTIAL, -1, NodeExistsPolicy.SEQUENTIAL, PUBLIC);
+  }
+  
+  public boolean putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.EPHEMERAL, -1, null, PUBLIC) != null;
+  }
+  
+  public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return putData(zPath, data, CreateMode.EPHEMERAL_SEQUENTIAL, -1, NodeExistsPolicy.SEQUENTIAL, PUBLIC);
+  }
+  
+  public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    Stat stat = null;
+    if (!exists(source))
+      throw KeeperException.create(KeeperException.Code.NONODE, source);
+    if (exists(destination)) {
+      switch (policy) {
+        case OVERWRITE:
+          break;
+        case SKIP:
+        case SEQUENTIAL:
+          return;
+        case FAIL:
+        default:
+          throw KeeperException.create(KeeperException.Code.NODEEXISTS, source);
+      }
+    }
+    
+    stat = new Stat();
+    byte[] data = getData(source, stat);
+    if (stat.getEphemeralOwner() == 0) {
+      if (data == null)
+        throw KeeperException.create(KeeperException.Code.NONODE, source);
+      putPersistentData(destination, data, policy);
+      if (stat.getNumChildren() > 0)
+        for (String child : getChildren(source))
+          recursiveCopyPersistent(source + "/" + child, destination + "/" + child, policy);
+    }
+  }
+  
+  public void delete(String path, int version) throws InterruptedException, KeeperException {
+    try {
+      getCurator().delete().withVersion(version).forPath(path);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public interface Mutator {
+    byte[] mutate(byte[] currentValue) throws Exception;
+  }
+  
+  public byte[] mutate(String zPath, byte[] createValue, boolean privateACL, Mutator mutator) throws Exception {
+    if (createValue != null) {
+      byte[] data = getData(zPath);
+      if (data == null) {
+        if (privateACL)
+          putPrivatePersistentData(zPath, createValue, NodeExistsPolicy.FAIL);
+        else
+          putPersistentData(zPath, createValue, NodeExistsPolicy.FAIL);
+        return createValue;
+      }
+    }
+    
+    Stat stat = new Stat();
+    byte[] data = getData(zPath, stat);
+    data = mutator.mutate(data);
+    if (data == null)
+      return data;
+    if (privateACL)
+      putPrivatePersistentData(zPath, createValue, NodeExistsPolicy.OVERWRITE);
+    else
+      putPersistentData(zPath, createValue, NodeExistsPolicy.OVERWRITE);
+    return data;
+  }
+  
+  public boolean isLockHeld(CuratorUtil.LockID lockID) throws KeeperException, InterruptedException {
+    try {
+      return CuratorUtil.isLockHeld(getCurator().getZookeeperClient().getZooKeeper(), lockID);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+  
+  public void mkdirs(String path) throws KeeperException, InterruptedException {
+    try {
+      getCurator().create().creatingParentsIfNeeded().forPath(path);
+    } catch (Exception e) {
+      throw CuratorUtil.manageException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java
new file mode 100644
index 0000000..51b3476
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorSession.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+
+class CuratorSession {
+  
+  private static final Logger log = Logger.getLogger(CuratorSession.class);
+  
+  private static RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);
+  
+  private static Map<String,CuratorFramework> sessions = new HashMap<String,CuratorFramework>();
+  
+  private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
+    return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth));
+  }
+  
+  private static CuratorFramework constructCurator(String zookeeperConnectString, int sessionTimeoutMs, String namespace, String scheme, byte[] bytes) {
+    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().canBeReadOnly(true).sessionTimeoutMs(sessionTimeoutMs).retryPolicy(retry)
+        .connectString(zookeeperConnectString);
+    if (scheme != null && bytes != null)
+      builder = builder.authorization(scheme, bytes);
+    if (namespace != null)
+      builder = builder.namespace(namespace);
+    
+    CuratorFramework toRet = builder.build();
+    toRet.start();
+    return toRet;
+  }
+  
+  public static synchronized CuratorFramework getSession(String zooKeepers, int timeout) {
+    return getSession(zooKeepers, timeout, null, null);
+  }
+  
+  public static synchronized CuratorFramework getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
+    
+    String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
+    
+    // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+    String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
+    CuratorFramework curator = sessions.get(sessionKey);
+    if (curator != null && curator.getState() == CuratorFrameworkState.STOPPED) {
+      if (auth != null && sessions.get(readOnlySessionKey) == curator)
+        sessions.remove(readOnlySessionKey);
+      curator = null;
+      sessions.remove(sessionKey);
+    }
+    
+    if (curator == null) {
+      log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth " + (auth==null? "null":new String(auth)));
+      curator = constructCurator(zooKeepers, timeout, null, scheme, auth);
+      sessions.put(sessionKey, curator);
+      if (auth != null && !sessions.containsKey(readOnlySessionKey))
+        sessions.put(readOnlySessionKey, curator);
+    }
+    return curator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java
new file mode 100644
index 0000000..fee9712
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/curator/CuratorUtil.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.curator;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.Stat;
+
+public class CuratorUtil {
+  public enum NodeMissingPolicy {
+    SKIP, CREATE, FAIL
+  }
+  
+  public static String getNodeName(ChildData node) {
+    return getNodeName(node.getPath());
+  }
+  
+  public static String getNodeName(String nodePath) {
+    return new File(nodePath).getName();
+  }
+  
+  public static String getNodeParent(ChildData node) {
+    return getNodeParent(node.getPath());
+  }
+  
+  public static String getNodeParent(String nodePath) {
+    return new File(nodePath).getParent();
+  }
+  
+  public static void recursiveDelete(CuratorFramework curator, final String pathRoot, int version) throws KeeperException, InterruptedException {
+    PathUtils.validatePath(pathRoot);
+    
+    List<String> tree = listSubTreeBFS(curator, pathRoot);
+    for (int i = tree.size() - 1; i >= 0; --i) {
+      // Delete the leaves first and eventually get rid of the root
+      try {
+        curator.delete().withVersion(version).forPath(tree.get(i));
+      } catch (Exception e) {
+        throw CuratorUtil.manageException(e);
+      } // Delete all versions of the node
+    }
+  }
+  
+  private static List<String> listSubTreeBFS(CuratorFramework curator, final String pathRoot) throws KeeperException, InterruptedException {
+    Deque<String> queue = new LinkedList<String>();
+    List<String> tree = new ArrayList<String>();
+    queue.add(pathRoot);
+    tree.add(pathRoot);
+    while (true) {
+      String node = queue.pollFirst();
+      if (node == null) {
+        break;
+      }
+      List<String> children;
+      try {
+        children = curator.getChildren().forPath(node);
+      } catch (Exception e) {
+        throw CuratorUtil.manageException(e);
+      }
+      for (final String child : children) {
+        final String childPath = node + "/" + child;
+        queue.add(childPath);
+        tree.add(childPath);
+      }
+    }
+    return tree;
+  }
+  
+  public static class LockID {
+    public long eid;
+    public String path;
+    public String node;
+    
+    public LockID(String root, String serializedLID) {
+      String sa[] = serializedLID.split("\\$");
+      int lastSlash = sa[0].lastIndexOf('/');
+      
+      if (sa.length != 2 || lastSlash < 0) {
+        throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
+      }
+      
+      if (lastSlash == 0)
+        path = root;
+      else
+        path = root + "/" + sa[0].substring(0, lastSlash);
+      node = sa[0].substring(lastSlash + 1);
+      eid = new BigInteger(sa[1], 16).longValue();
+    }
+    
+    public LockID(String path, String node, long eid) {
+      this.path = path;
+      this.node = node;
+      this.eid = eid;
+    }
+    
+    public String serialize(String root) {
+      
+      return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
+    }
+    
+    @Override
+    public String toString() {
+      return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
+    }
+  }
+  
+  public static byte[] getLockData(CuratorCaches zc, String path) {
+    
+    List<ChildData> children = zc.getChildren(path);
+    
+    if (children == null || children.size() == 0) {
+      return null;
+    }
+    
+    children = new ArrayList<ChildData>(children);
+    Collections.sort(children);
+    
+    return children.get(0).getData();
+  }
+  
+  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+    while (true) {
+      try {
+        List<String> children = zk.getChildren(lid.path, false);
+        
+        if (children.size() == 0) {
+          return false;
+        }
+        
+        Collections.sort(children);
+        
+        String lockNode = children.get(0);
+        if (!lid.node.equals(lockNode))
+          return false;
+        
+        Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+        return stat != null && stat.getEphemeralOwner() == lid.eid;
+      } catch (KeeperException.ConnectionLossException ex) {
+        UtilWaitThread.sleep(1000);
+      }
+    }
+  }
+
+  /**
+   * Fluffer class. Right now keep it generic but as I probe Curator I can make exceptions better
+   */
+  public static RuntimeException manageException(Exception e) throws KeeperException, InterruptedException {
+    if (e instanceof KeeperException)
+      throw (KeeperException) e;
+    if (e instanceof InterruptedException)
+      throw (InterruptedException) e;
+    return new RuntimeException(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
deleted file mode 100644
index 3902a5c..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.util.List;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-
-public interface IZooReader {
-  
-  public abstract byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException;
-  
-  public abstract Stat getStatus(String zPath) throws KeeperException, InterruptedException;
-  
-  public abstract Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
-  public abstract List<String> getChildren(String zPath) throws KeeperException, InterruptedException;
-  
-  public abstract List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
-  public abstract boolean exists(String zPath) throws KeeperException, InterruptedException;
-  
-  public abstract boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
-  public abstract void sync(final String path) throws KeeperException, InterruptedException;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
deleted file mode 100644
index 5dcad23..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.util.List;
-
-import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-
-public interface IZooReaderWriter extends IZooReader {
-  
-  public abstract ZooKeeper getZooKeeper();
-  
-  public abstract void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
-  
-  public abstract void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
-  
-  /**
-   * Create a persistent node with the default ACL
-   * 
-   * @return true if the node was created or altered; false if it was skipped
-   */
-  public abstract boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
-  
-  public abstract boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
-  
-  public abstract void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
-  
-  public abstract String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
-  
-  public abstract String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
-  
-  public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException;
-
-  public abstract void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
-  
-  public abstract void delete(String path, int version) throws InterruptedException, KeeperException;
-  
-  public abstract byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception;
-  
-  public abstract boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException;
-  
-  public abstract void mkdirs(String path) throws KeeperException, InterruptedException;
-  
-  public abstract void sync(String path) throws KeeperException, InterruptedException;
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
deleted file mode 100644
index a1b1dc1..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.accumulo.fate.curator.CuratorUtil;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.log4j.Logger;
-
-/**
- * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
- * 
- */
-public class ZooCache {
-  private static final Logger log = Logger.getLogger(ZooCache.class);
-  
-  private HashMap<String,NodeCache> nodeCache;
-  private HashMap<String,PathChildrenCache> childrenCache;
-  
-  private CuratorFramework curator;
-  
-  public ZooCache(String zooKeepers, int sessionTimeout) {
-    this(CuratorUtil.constructCurator(zooKeepers, sessionTimeout, null));
-  }
-  
-  public ZooCache(CuratorFramework curator) {
-    this.curator = curator;
-    this.nodeCache = new HashMap<String,NodeCache>();
-    this.childrenCache = new HashMap<String,PathChildrenCache>();
-  }
-  
-  public synchronized List<ChildData> getChildren(final String zPath) {
-    return getChildren(zPath, null);
-  }
-  
-  public synchronized List<ChildData> getChildren(String zPath, PathChildrenCacheListener listener) {
-    PathChildrenCache cache = childrenCache.get(zPath);
-    if (cache == null) {
-      cache = new PathChildrenCache(curator, zPath, true);
-      if (listener != null) {
-        cache.getListenable().addListener(listener);
-      }
-      try {
-        log.debug("Starting cache against " + zPath + (listener!=null? " using listener " + listener:""));
-        cache.start(StartMode.BUILD_INITIAL_CACHE);
-        // I'll do it myself!
-        if (listener != null)
-          for (ChildData cd : cache.getCurrentData()) {
-            listener.childEvent(curator, new PathChildrenCacheEvent(Type.INITIALIZED, cd));
-          }
-        
-        // Because parent's children are being watched, we don't need to cache the individual node
-        // UNLESS we have a listener on it
-        for (ChildData child : cache.getCurrentData()) {
-          NodeCache childCache = nodeCache.get(child.getPath());
-          if (childCache != null && childCache.getListenable().size() == 0) {
-            log.debug("Removing cache " + childCache.getCurrentData().getPath() + " because parent cache was added");
-            childCache.close();
-            nodeCache.remove(child.getPath());
-          }
-        }
-      } catch (Exception e) {
-        log.error(e, e);
-        try {
-          cache.close();
-        } catch (IOException e1) {
-          // We're already in a bad state at this point, I think, but just in case
-          log.error(e, e);
-        }
-        return null;
-      }
-      childrenCache.put(zPath, cache);
-    } else if (listener != null) {
-      log.debug("LISTENER- cache is null for path " + zPath + ", but got listener " + listener.getClass() + ". this is a broken case!");
-    }
-    return cache.getCurrentData();
-  }
-  
-  public List<String> getChildKeys(final String zPath) {
-    List<String> toRet = new ArrayList<String>();
-    for (ChildData child : getChildren(zPath)) {
-      toRet.add(CuratorUtil.getNodeName(child));
-    }
-    return toRet;
-  }
-  
-  public synchronized ChildData get(final String zPath) {
-    NodeCache cache = nodeCache.get(zPath);
-    if (cache == null) {
-      PathChildrenCache cCache = childrenCache.get(CuratorUtil.getNodeParent(zPath));
-      if (cCache != null) {
-        return cCache.getCurrentData(zPath);
-      }
-      cache = new NodeCache(curator, zPath);
-      try {
-        cache.start(true);
-      } catch (Exception e) {
-        log.error(e, e);
-        try {
-          cache.close();
-        } catch (IOException e1) {
-          // We're already in a bad state at this point, I think, but just in case
-          log.error(e, e);
-        }
-        return null;
-      }
-      nodeCache.put(zPath, cache);
-    }
-    
-    return cache.getCurrentData();
-  }
-  
-  private synchronized void remove(String zPath) {
-    if (log.isTraceEnabled())
-      log.trace("removing " + zPath + " from cache");
-    NodeCache nc = nodeCache.get(zPath);
-    if (nc != null) {
-      try {
-        nc.close();
-      } catch (IOException e) {
-        log.error(e, e);
-      }
-    }
-    
-    PathChildrenCache pc = childrenCache.get(zPath);
-    if (pc != null) {
-      try {
-        pc.close();
-      } catch (IOException e) {
-        log.error(e, e);
-      }
-    }
-    
-    nodeCache.remove(zPath);
-    childrenCache.remove(zPath);
-  }
-  
-  public synchronized void clear() {
-    for (NodeCache nc : nodeCache.values()) {
-      try {
-        nc.close();
-      } catch (IOException e) {
-        log.error(e, e);
-      }
-    }
-    for (PathChildrenCache pc : childrenCache.values()) {
-      try {
-        pc.close();
-      } catch (IOException e) {
-        log.error(e, e);
-      }
-    }
-    
-    nodeCache.clear();
-    childrenCache.clear();
-  }
-  
-  public CuratorFramework getCurator() {
-    return curator;
-  }
-  
-  public synchronized void clear(String zPath) {
-    List<String> pathsToRemove = new ArrayList<String>();
-    for (Iterator<String> i = nodeCache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        pathsToRemove.add(path);
-    }
-    
-    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        pathsToRemove.add(path);
-    }
-    
-    for (String path : pathsToRemove)
-      remove(path);
-  }
-  
-  private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>();
-  
-  public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) {
-    String key = zooKeepers + ":" + sessionTimeout;
-    ZooCache zc = instances.get(key);
-    if (zc == null) {
-      zc = new ZooCache(zooKeepers, sessionTimeout);
-      instances.put(key, zc);
-    }
-    
-    return zc;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
deleted file mode 100644
index 9ad3402..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooDefs.Perms;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-
-public class ZooUtil {
-  public enum NodeExistsPolicy {
-    SKIP, OVERWRITE, FAIL
-  }
-  
-  public enum NodeMissingPolicy {
-    SKIP, CREATE, FAIL
-  }
-  
-  public static class LockID {
-    public long eid;
-    public String path;
-    public String node;
-    
-    public LockID(String root, String serializedLID) {
-      String sa[] = serializedLID.split("\\$");
-      int lastSlash = sa[0].lastIndexOf('/');
-      
-      if (sa.length != 2 || lastSlash < 0) {
-        throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
-      }
-      
-      if (lastSlash == 0)
-        path = root;
-      else
-        path = root + "/" + sa[0].substring(0, lastSlash);
-      node = sa[0].substring(lastSlash + 1);
-      eid = new BigInteger(sa[1], 16).longValue();
-    }
-    
-    public LockID(String path, String node, long eid) {
-      this.path = path;
-      this.node = node;
-      this.eid = eid;
-    }
-    
-    public String serialize(String root) {
-      
-      return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
-    }
-    
-    @Override
-    public String toString() {
-      return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
-    }
-  }
-  
-  public static final List<ACL> PRIVATE;
-  public static final List<ACL> PUBLIC;
-  static {
-    PRIVATE = new ArrayList<ACL>();
-    PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
-    PUBLIC = new ArrayList<ACL>();
-    PUBLIC.addAll(PRIVATE);
-    PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
-  }
-  
-  /**
-   * This method will delete a node and all its children from zookeeper
-   * 
-   * @param zPath
-   *          the path to delete
-   */
-  public static void recursiveDelete(ZooKeeper zk, String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
-    if (policy.equals(NodeMissingPolicy.CREATE))
-      throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
-    try {
-      for (String child : zk.getChildren(zPath, false))
-        recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
-      
-      Stat stat;
-      if ((stat = zk.exists(zPath, null)) != null)
-        zk.delete(zPath, stat.getVersion());
-    } catch (KeeperException e) {
-      if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
-        return;
-      throw e;
-    }
-  }
-  
-  public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
-    recursiveDelete(zk, zPath, -1, policy);
-  }
-  
-  /**
-   * Create a persistent node with the default ACL
-   * 
-   * @return true if the node was created or altered; false if it was skipped
-   */
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
-  }
-  
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
-      InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
-  }
-  
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
-      throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
-  }
-  
-  private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
-      throws KeeperException, InterruptedException {
-    if (policy == null)
-      policy = NodeExistsPolicy.FAIL;
-    
-    while (true) {
-      try {
-        zk.create(zPath, data, acls, mode);
-        return true;
-      } catch (NodeExistsException nee) {
-        switch (policy) {
-          case SKIP:
-            return false;
-          case OVERWRITE:
-            try {
-              zk.setData(zPath, data, version);
-              return true;
-            } catch (NoNodeException nne) {
-              // node delete between create call and set data, so try create call again
-              continue;
-            }
-          default:
-            throw nee;
-        }
-      }
-    }
-  }
-  
-  public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
-    return zk.getData(zPath, false, stat);
-  }
-  
-  public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
-    return zk.exists(zPath, false);
-  }
-  
-  public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
-    return getStatus(zk, zPath) != null;
-  }
-  
-  public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
-      InterruptedException {
-    Stat stat = null;
-    if (!exists(zk, source))
-      throw KeeperException.create(Code.NONODE, source);
-    if (exists(zk, destination)) {
-      switch (policy) {
-        case OVERWRITE:
-          break;
-        case SKIP:
-          return;
-        case FAIL:
-        default:
-          throw KeeperException.create(Code.NODEEXISTS, source);
-      }
-    }
-    
-    stat = new Stat();
-    byte[] data = zk.getData(source, false, stat);
-    if (stat.getEphemeralOwner() == 0) {
-      if (data == null)
-        throw KeeperException.create(Code.NONODE, source);
-      putPersistentData(zk, destination, data, policy);
-      if (stat.getNumChildren() > 0)
-        for (String child : zk.getChildren(source, false))
-          recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
-    }
-  }
-  
-  public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
-  }
-  
-  public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
-  }
-  
-  public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
-  }
-
-  public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
-  }
-  
-  public static byte[] getLockData(ZooCache zc, String path) {
-    
-    List<ChildData> children = zc.getChildren(path);
-    
-    if (children == null || children.size() == 0) {
-      return null;
-    }
-    
-    children = new ArrayList<ChildData>(children);
-    Collections.sort(children);
-    
-    return children.get(0).getData();
-  }
-  
-  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
-    
-    while (true) {
-      try {
-        List<String> children = zk.getChildren(lid.path, false);
-        
-        if (children.size() == 0) {
-          return false;
-        }
-        
-        Collections.sort(children);
-        
-        String lockNode = children.get(0);
-        if (!lid.node.equals(lockNode))
-          return false;
-        
-        Stat stat = zk.exists(lid.path + "/" + lid.node, false);
-        return stat != null && stat.getEphemeralOwner() == lid.eid;
-      } catch (KeeperException.ConnectionLossException ex) {
-        UtilWaitThread.sleep(1000);
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4640866b/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java b/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java
new file mode 100644
index 0000000..0fd1bcb
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/curator/CuratorReaderWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.curator;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+
+public class CuratorReaderWriter extends org.apache.accumulo.fate.curator.CuratorReaderWriter {
+  private static final String SCHEME = "digest";
+  private static final String USER = "accumulo";
+  private static CuratorReaderWriter instance = null;
+  
+  private CuratorReaderWriter(String string, int timeInMillis, String secret) {
+    super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes());
+  }
+  
+  public static CuratorReaderWriter getInstance() {
+    AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+    return getInstance(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), conf.get(Property.INSTANCE_SECRET));
+  }
+  
+  public static CuratorReaderWriter getInstance(String secret) {
+    AccumuloConfiguration conf = ServerConfiguration.getSiteConfiguration();
+    return getInstance(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), secret);
+  }
+  
+  public static synchronized CuratorReaderWriter getInstance(String zkHosts, int timeout, String secret) {
+    if (instance == null) {
+      instance = new CuratorReaderWriter(zkHosts, timeout, secret);
+    }
+    return instance;
+  }
+}


Mime
View raw message