accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [04/16] git commit: ACCUMULO-3242 Add proper retry logic to ZooReader, ZooReaderWriter and ZooUtil removing the RetryingInstance.
Date Mon, 20 Oct 2014 01:05:45 GMT
ACCUMULO-3242 Add proper retry logic to ZooReader, ZooReaderWriter and ZooUtil removing the RetryingInstance.

The RetryingInvocationHandler was a nice way to encapsulate retry logic
on a ZooKeeper; however it fails when you actually need to do additional
handling of the KeeperException. Removing the retrying/non-retrying logic
also simplifies methods that the clients can interact with.

ZooUtil still has some unaddressed issues, but I left some TODOs
in place to mark what still needs to be fixed, usually WRT to
session expirations.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c023f74e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c023f74e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c023f74e

Branch: refs/heads/master
Commit: c023f74e4329b8c764c46f9468681b71762e976a
Parents: 6ad16a9
Author: Josh Elser <elserj@apache.org>
Authored: Fri Oct 17 01:22:15 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Sun Oct 19 20:15:59 2014 -0400

----------------------------------------------------------------------
 .../accumulo/fate/zookeeper/IZooReader.java     |  20 +-
 .../apache/accumulo/fate/zookeeper/Retry.java   |  74 +++++
 .../accumulo/fate/zookeeper/RetryFactory.java   |  36 +++
 .../zookeeper/RetryingInvocationHandler.java    |  63 ----
 .../accumulo/fate/zookeeper/ZooQueueLock.java   |  20 +-
 .../accumulo/fate/zookeeper/ZooReader.java      | 133 +++++++-
 .../fate/zookeeper/ZooReaderWriter.java         | 129 +++++---
 .../accumulo/fate/zookeeper/ZooSession.java     |  42 +--
 .../apache/accumulo/fate/zookeeper/ZooUtil.java | 301 ++++++++++++++-----
 .../RetryingInvocationHandlerTest.java          |  87 ------
 .../org/apache/accumulo/server/Accumulo.java    |   2 +-
 .../accumulo/server/master/LiveTServerSet.java  |   2 +-
 .../security/handler/ZKAuthenticator.java       |  42 +--
 .../server/security/handler/ZKAuthorizor.java   |   8 +-
 .../server/security/handler/ZKPermHandler.java  |  42 +--
 .../accumulo/server/tables/TableManager.java    |  16 +-
 .../server/tablets/UniqueNameAllocator.java     |   2 +-
 .../accumulo/server/zookeeper/ZooQueueLock.java |   8 +-
 .../server/zookeeper/ZooReaderWriter.java       |  45 ---
 .../zookeeper/ZooReaderWriterFactory.java       |  22 --
 .../java/org/apache/accumulo/master/Master.java |   4 +-
 .../master/tableOps/CancelCompactions.java      |  34 +--
 .../accumulo/master/tableOps/CompactRange.java  |   6 +-
 .../master/tableOps/RenameNamespace.java        |   2 +-
 .../accumulo/master/tableOps/RenameTable.java   |   2 +-
 .../apache/accumulo/master/tableOps/Utils.java  |  12 +-
 .../apache/accumulo/master/util/FateAdmin.java  |   2 +-
 .../org/apache/accumulo/tserver/Tablet.java     |  10 +-
 28 files changed, 688 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
index 0610e79..610b1bd 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
@@ -23,21 +23,23 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 
 public interface IZooReader {
-  
+
   byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException;
-  
+
+  byte[] getData(String zPath, boolean watch, Stat stat) throws KeeperException, InterruptedException;
+
   Stat getStatus(String zPath) throws KeeperException, InterruptedException;
-  
+
   Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
+
   List<String> getChildren(String zPath) throws KeeperException, InterruptedException;
-  
+
   List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
+
   boolean exists(String zPath) throws KeeperException, InterruptedException;
-  
+
   boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException;
-  
+
   void sync(final String path) throws KeeperException, InterruptedException;
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
new file mode 100644
index 0000000..4a37172
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import org.apache.log4j.Logger;
+
+/**
+ *
+ */
+public class Retry {
+  private static final Logger log = Logger.getLogger(Retry.class);
+
+  private final long maxRetries, maxWait, waitIncrement;
+  private long retriesDone, currentWait;
+
+  /**
+   * @param maxRetries
+   *          Maximum times to retry
+   * @param startWait
+   *          The amount of time (ms) to wait for the initial retry
+   * @param maxWait
+   *          The maximum wait (ms)
+   * @param waitIncrement
+   *          The amount of time (ms) to increment next wait time by
+   */
+  public Retry(long maxRetries, long startWait, long maxWait, long waitIncrement) {
+    this.maxRetries = maxRetries;
+    this.maxWait = maxWait;
+    this.waitIncrement = waitIncrement;
+    this.retriesDone = 0l;
+    this.currentWait = startWait;
+  }
+
+  public boolean canRetry() {
+    return retriesDone < maxRetries;
+  }
+
+  public void useRetry() {
+    if (!canRetry()) {
+      throw new IllegalStateException("No retries left");
+    }
+
+    retriesDone++;
+  }
+
+  public boolean hasRetried() {
+    return retriesDone > 0;
+  }
+
+  public long retriesCompleted() {
+    return retriesDone;
+  }
+
+  public void waitForNextAttempt() throws InterruptedException {
+    log.debug("Sleeping for " + currentWait + "ms before retrying operation");
+    Thread.sleep(currentWait);
+    currentWait = Math.min(maxWait, currentWait + waitIncrement);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
new file mode 100644
index 0000000..3fcb738
--- /dev/null
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+/**
+ *
+ */
+public class RetryFactory {
+
+  private final long maxRetries, startWait, maxWait, waitIncrement;
+
+  public RetryFactory(long maxRetries, long startWait, long maxWait, long waitIncrement) {
+    this.maxRetries = maxRetries;
+    this.startWait = startWait;
+    this.maxWait = maxWait;
+    this.waitIncrement = waitIncrement;
+  }
+
+  public Retry create() {
+    return new Retry(maxRetries, startWait, maxWait, waitIncrement);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
deleted file mode 100644
index 4597036..0000000
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * An invocation handler for ZooKeeper reader/writers that retries calls that fail due to connection loss.
- */
-public class RetryingInvocationHandler implements InvocationHandler {
-  private final IZooReaderWriter zrw;
-
-  /**
-   * Creates a new invocation handler.
-   *
-   * @param zrw
-   *          ZooKeeper reader/writer being handled
-   */
-  public RetryingInvocationHandler(IZooReaderWriter zrw) {
-    this.zrw = zrw;
-  }
-
-  private static final long INITIAL_RETRY_TIME = 250L;
-  private static final long RETRY_INCREMENT = 250L;
-  private static final long MAXIMUM_RETRY_TIME = 5000L;
-
-  @Override
-  public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
-    long retryTime = INITIAL_RETRY_TIME;
-    while (true) {
-      try {
-        return method.invoke(zrw, args);
-      } catch (InvocationTargetException e) {
-        if (e.getCause() instanceof KeeperException.ConnectionLossException) {
-          Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause());
-          UtilWaitThread.sleep(retryTime);
-          retryTime = Math.min(MAXIMUM_RETRY_TIME, retryTime + RETRY_INCREMENT);
-        } else {
-          throw e.getCause();
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
index f30cda3..f9195f3 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
@@ -29,26 +29,26 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NotEmptyException;
 
 public class ZooQueueLock implements QueueLock {
-  
+
   private static final String PREFIX = "lock-";
-  
+
   // private static final Logger log = Logger.getLogger(ZooQueueLock.class);
-  
+
   private IZooReaderWriter zoo;
   private String path;
   private boolean ephemeral;
-  
+
   public ZooQueueLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path, boolean ephemeral) throws KeeperException,
       InterruptedException {
-    this(ZooReaderWriter.getRetryingInstance(zookeepers, timeInMillis, scheme, auth), path, ephemeral);
+    this(ZooReaderWriter.getInstance(zookeepers, timeInMillis, scheme, auth), path, ephemeral);
   }
-  
+
   protected ZooQueueLock(IZooReaderWriter zrw, String path, boolean ephemeral) {
     this.zoo = zrw;
     this.path = path;
     this.ephemeral = ephemeral;
   }
-  
+
   @Override
   public long addEntry(byte[] data) {
     String newPath;
@@ -72,7 +72,7 @@ public class ZooQueueLock implements QueueLock {
       throw new RuntimeException(ex);
     }
   }
-  
+
   @Override
   public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
     SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
@@ -83,7 +83,7 @@ public class ZooQueueLock implements QueueLock {
       } catch (KeeperException.NoNodeException ex) {
         // the path does not exist (it was deleted or not created yet), that is ok there are no earlier entries then
       }
-      
+
       for (String name : children) {
         // this try catch must be done inside the loop because some subset of the children may exist
         try {
@@ -100,7 +100,7 @@ public class ZooQueueLock implements QueueLock {
     }
     return result;
   }
-  
+
   @Override
   public void removeEntry(long entry) {
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index 60660d6..994f395 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -28,8 +29,11 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooReader implements IZooReader {
+  private static final Logger log = Logger.getLogger(ZooReader.class);
+
   protected String keepers;
   protected int timeout;
+  protected final RetryFactory retryFactory;
 
   protected ZooKeeper getSession(String keepers, int timeout, String scheme, byte[] auth) {
     return ZooSession.getSession(keepers, timeout, scheme, auth);
@@ -39,39 +43,153 @@ public class ZooReader implements IZooReader {
     return getSession(keepers, timeout, null, null);
   }
 
+  protected void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+    log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+    if (retry.canRetry()) {
+      retry.useRetry();
+      return;
+    }
+
+    log.error("Retry attempts (" + retry.retriesCompleted() + ") exceeded trying to communicate with ZooKeeper");
+    throw e;
+  }
+
   @Override
   public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
-    return getZooKeeper().getData(zPath, false, stat);
+    return getData(zPath, false, stat);
+  }
+
+  @Override
+  public byte[] getData(String zPath, boolean watch, Stat stat) throws KeeperException, InterruptedException {
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().getData(zPath, watch, stat);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
-    return getZooKeeper().exists(zPath, false);
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().exists(zPath, false);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
-    return getZooKeeper().exists(zPath, watcher);
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().exists(zPath, watcher);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
-    return getZooKeeper().getChildren(zPath, false);
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().getChildren(zPath, false);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
-    return getZooKeeper().getChildren(zPath, watcher);
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().getChildren(zPath, watcher);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public boolean exists(String zPath) throws KeeperException, InterruptedException {
-    return getZooKeeper().exists(zPath, false) != null;
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().exists(zPath, false) != null;
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
   public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
-    return getZooKeeper().exists(zPath, watcher) != null;
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        return getZooKeeper().exists(zPath, watcher) != null;
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   @Override
@@ -101,5 +219,6 @@ public class ZooReader implements IZooReader {
   public ZooReader(String keepers, int timeout) {
     this.keepers = keepers;
     this.timeout = timeout;
+    this.retryFactory = new RetryFactory(10l, 250l, 250l, 5000l);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 1f0ae14..b29b88a 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@ -16,31 +16,29 @@
  */
 package org.apache.accumulo.fate.zookeeper;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
 import java.security.SecurityPermission;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
-  
+  private static final Logger log = Logger.getLogger(ZooReaderWriter.class);
+
   private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
-  
+
   private static ZooReaderWriter instance = null;
-  private static IZooReaderWriter retryingInstance = null;
   private final String scheme;
   private final byte[] auth;
-  
+
   @Override
   public ZooKeeper getZooKeeper() {
     SecurityManager sm = System.getSecurityManager();
@@ -49,127 +47,158 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
     }
     return getSession(keepers, timeout, scheme, auth);
   }
-  
+
   public ZooReaderWriter(String string, int timeInMillis, String scheme, byte[] auth) {
     super(string, timeInMillis);
     this.scheme = scheme;
     this.auth = Arrays.copyOf(auth, auth.length);
   }
-  
+
   @Override
   public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
     ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
   }
-  
+
   @Override
   public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
     ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
   }
-  
+
   /**
    * Create a persistent node with the default ACL
-   * 
+   *
    * @return true if the node was created or altered; false if it was skipped
    */
   @Override
   public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
   }
-  
+
   @Override
   public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
   }
-  
+
   @Override
   public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
   }
-  
+
   @Override
   public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
     return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
   }
-  
+
   @Override
   public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException {
     return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
   }
-  
+
   @Override
   public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
     return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
   }
-  
+
   @Override
   public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
   }
-  
+
   @Override
   public void delete(String path, int version) throws InterruptedException, KeeperException {
-    getZooKeeper().delete(path, version);
+    final Retry retry = retryFactory.create();
+    while (true) {
+      try {
+        getZooKeeper().delete(path, version);
+        return;
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.NONODE) {
+          if (retry.hasRetried()) {
+            // A retried delete could have deleted the node, assume that was the case
+            log.debug("Delete saw no node on a retry. Assuming node was deleted");
+            return;
+          }
+
+          throw e;
+        } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          // retry if we have more attempts to do so
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
-  
+
   @Override
   public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception {
     if (createValue != null) {
-      try {
-        getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
-        return createValue;
-      } catch (NodeExistsException ex) {
-        // expected
+      while (true) {
+        final Retry retry = retryFactory.create();
+        try {
+          getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
+          return createValue;
+        } catch (KeeperException ex) {
+          final Code code = ex.code();
+          if (code == Code.NODEEXISTS) {
+            // expected
+            break;
+          } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS || code == Code.SESSIONEXPIRED) {
+            retryOrThrow(retry, ex);
+          } else {
+            throw ex;
+          }
+        }
+
+        retry.waitForNextAttempt();
       }
     }
     do {
+      final Retry retry = retryFactory.create();
       Stat stat = new Stat();
-      byte[] data = getZooKeeper().getData(zPath, false, stat);
+      byte[] data = getData(zPath, false, stat);
       data = mutator.mutate(data);
       if (data == null)
         return data;
       try {
         getZooKeeper().setData(zPath, data, stat.getVersion());
         return data;
-      } catch (BadVersionException ex) {
-        //
+      } catch (KeeperException ex) {
+        final Code code = ex.code();
+        if (code == Code.BADVERSION) {
+          // Retry, but don't increment. This makes it backwards compatible with the infinite
+          // loop that previously happened. I'm not sure if that's really desirable though.
+        } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS || code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, ex);
+          retry.waitForNextAttempt();
+        } else {
+          throw ex;
+        }
       }
     } while (true);
   }
-  
+
   public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
     if (instance == null)
       instance = new ZooReaderWriter(zookeepers, timeInMillis, scheme, auth);
     return instance;
   }
-  
-  /**
-   * get an instance that retries when zookeeper connection errors occur
-   * 
-   * @return an instance that retries when Zookeeper connection errors occur.
-   */
-  public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) {
-    
-    if (retryingInstance == null) {
-      IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth);
-      InvocationHandler ih = new RetryingInvocationHandler(inst);
-      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
-    }
-    
-    return retryingInstance;
-  }
-  
+
   @Override
   public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException {
     return ZooUtil.isLockHeld(getZooKeeper(), lockID);
   }
-  
+
   @Override
   public void mkdirs(String path) throws KeeperException, InterruptedException {
     if (path.equals(""))
       return;
     if (!path.startsWith("/"))
       throw new IllegalArgumentException(path + "does not start with /");
-    if (getZooKeeper().exists(path, false) != null)
+    if (exists(path))
       return;
     String parent = path.substring(0, path.lastIndexOf("/"));
     mkdirs(parent);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
index 33bd77b..b6890c6 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
@@ -33,7 +33,7 @@ import org.apache.zookeeper.ZooKeeper.States;
 
 public class ZooSession {
   private static final Charset UTF8 = Charset.forName("UTF-8");
-  
+
   public static class ZooSessionShutdownException extends RuntimeException {
 
     private static final long serialVersionUID = 1L;
@@ -41,31 +41,32 @@ public class ZooSession {
   }
 
   private static final Logger log = Logger.getLogger(ZooSession.class);
-  
+
   private static class ZooSessionInfo {
     public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
       this.zooKeeper = zooKeeper;
     }
-    
+
     ZooKeeper zooKeeper;
   }
-  
+
   private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
-  
+
   private static String sessionKey(String keepers, int timeout, String scheme, byte[] auth) {
     return keepers + ":" + timeout + ":" + (scheme == null ? "" : scheme) + ":" + (auth == null ? "" : new String(auth, UTF8));
   }
-  
+
   private static class ZooWatcher implements Watcher {
-    
+
+    @Override
     public void process(WatchedEvent event) {
       if (event.getState() == KeeperState.Expired) {
         log.debug("Session expired, state of current session : " + event.getState());
       }
     }
-    
+
   }
-  
+
   /**
    * @param host comma separated list of zk servers
    * @param timeout in milliseconds
@@ -79,9 +80,9 @@ public class ZooSession {
     boolean tryAgain = true;
     long sleepTime = 100;
     ZooKeeper zooKeeper = null;
-    
+
     long startTime = System.currentTimeMillis();
-    
+
     while (tryAgain) {
       try {
         zooKeeper = new ZooKeeper(host, timeout, watcher);
@@ -94,7 +95,7 @@ public class ZooSession {
           } else
             UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
         }
-        
+
       } catch (IOException e) {
         if (e instanceof UnknownHostException) {
           /*
@@ -116,13 +117,13 @@ public class ZooSession {
       if (System.currentTimeMillis() - startTime > 2 * timeout) {
         throw new RuntimeException("Failed to connect to zookeeper (" + host + ") within 2x zookeeper timeout period " + timeout);
       }
-      
+
       if (tryAgain) {
         if (startTime + 2 * timeout < System.currentTimeMillis() + sleepTime + connectTimeWait)
           sleepTime = startTime + 2 * timeout - System.currentTimeMillis() - connectTimeWait;
         if (sleepTime < 0)
         {
-          connectTimeWait -= sleepTime; 
+          connectTimeWait -= sleepTime;
           sleepTime = 0;
         }
         UtilWaitThread.sleep(sleepTime);
@@ -130,31 +131,32 @@ public class ZooSession {
           sleepTime = sleepTime + (long)(sleepTime * Math.random());
       }
     }
-    
+
     return zooKeeper;
   }
-  
+
   public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
     return getSession(zooKeepers, timeout, null, null);
   }
-  
+
   public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String scheme, byte[] auth) {
-    
+
     if (sessions == null)
       throw new ZooSessionShutdownException();
 
     String sessionKey = sessionKey(zooKeepers, timeout, scheme, auth);
-    
+
     // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
     String readOnlySessionKey = sessionKey(zooKeepers, timeout, null, null);
     ZooSessionInfo zsi = sessions.get(sessionKey);
     if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+      log.debug("Removing closed ZooKeeper session to " + zooKeepers);
       if (auth != null && sessions.get(readOnlySessionKey) == zsi)
         sessions.remove(readOnlySessionKey);
       zsi = null;
       sessions.remove(sessionKey);
     }
-    
+
     if (zsi == null) {
       ZooWatcher watcher = new ZooWatcher();
       log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index 5b856c0..e0d8831 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@ -21,12 +21,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
 import org.apache.zookeeper.ZooKeeper;
@@ -34,27 +33,29 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooUtil {
+  private static final Logger log = Logger.getLogger(ZooUtil.class);
+
   public enum NodeExistsPolicy {
     SKIP, OVERWRITE, FAIL
   }
-  
+
   public enum NodeMissingPolicy {
     SKIP, CREATE, FAIL
   }
-  
+
   public static class LockID {
     public long eid;
     public String path;
     public String node;
-    
+
     public LockID(String root, String serializedLID) {
       String sa[] = serializedLID.split("\\$");
       int lastSlash = sa[0].lastIndexOf('/');
-      
+
       if (sa.length != 2 || lastSlash < 0) {
         throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
       }
-      
+
       if (lastSlash == 0)
         path = root;
       else
@@ -62,37 +63,50 @@ public class ZooUtil {
       node = sa[0].substring(lastSlash + 1);
       eid = new BigInteger(sa[1], 16).longValue();
     }
-    
+
     public LockID(String path, String node, long eid) {
       this.path = path;
       this.node = node;
       this.eid = eid;
     }
-    
+
     public String serialize(String root) {
-      
+
       return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
     }
-    
+
     @Override
     public String toString() {
       return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
     }
   }
-  
+
   public static final List<ACL> PRIVATE;
   public static final List<ACL> PUBLIC;
+  private static final RetryFactory RETRY_FACTORY;
   static {
     PRIVATE = new ArrayList<ACL>();
     PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
     PUBLIC = new ArrayList<ACL>();
     PUBLIC.addAll(PRIVATE);
     PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+    RETRY_FACTORY = new RetryFactory(10l, 250l, 250l, 5000l);
   }
-  
+
+  protected static void retryOrThrow(Retry retry, KeeperException e) throws KeeperException {
+    log.warn("Saw (possibly) transient exception communicating with ZooKeeper", e);
+    if (retry.canRetry()) {
+      retry.useRetry();
+      return;
+    }
+
+    log.error("Retry attempts (" + retry.retriesCompleted() + ") exceeded trying to communicate with ZooKeeper");
+    throw e;
+  }
+
   /**
    * This method will delete a node and all its children from zookeeper
-   * 
+   *
    * @param zPath
    *          the path to delete
    */
@@ -100,82 +114,180 @@ public class ZooUtil {
     if (policy.equals(NodeMissingPolicy.CREATE))
       throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
     try {
-      for (String child : zk.getChildren(zPath, false))
+      List<String> children;
+      final Retry retry = RETRY_FACTORY.create();
+      while (true) {
+        try {
+          children = zk.getChildren(zPath, false);
+          break;
+        } catch (KeeperException e) {
+          final Code c = e.code();
+          if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+            // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+            retryOrThrow(retry, e);
+          } else {
+            throw e;
+          }
+        }
+        retry.waitForNextAttempt();
+      }
+      for (String child : children)
         recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
-      
+
       Stat stat;
-      if ((stat = zk.exists(zPath, null)) != null)
-        zk.delete(zPath, stat.getVersion());
+      while (true) {
+        try {
+          stat = zk.exists(zPath, null);
+          // Node exists
+          if (stat != null) {
+            try {
+              // Try to delete it
+              zk.delete(zPath, stat.getVersion());
+              return;
+            } catch (NoNodeException e) {
+              // If the node is gone now, it's ok if we have SKIP
+              if (policy.equals(NodeMissingPolicy.SKIP)) {
+                return;
+              }
+              throw e;
+            }
+            // Let other KeeperException bubble to the outer catch
+          }
+        } catch (KeeperException e) {
+          final Code c = e.code();
+          if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+            // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+            retryOrThrow(retry, e);
+          } else {
+            throw e;
+          }
+        }
+
+        retry.waitForNextAttempt();
+      }
     } catch (KeeperException e) {
       if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
         return;
       throw e;
     }
   }
-  
+
   public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
     recursiveDelete(zk, zPath, -1, policy);
   }
-  
+
   /**
    * Create a persistent node with the default ACL
-   * 
+   *
    * @return true if the node was created or altered; false if it was skipped
    */
   public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
   }
-  
+
   public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
       InterruptedException {
     return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
   }
-  
+
   public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
       throws KeeperException, InterruptedException {
     return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
   }
-  
+
   private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
       throws KeeperException, InterruptedException {
     if (policy == null)
       policy = NodeExistsPolicy.FAIL;
-    
+
+    final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
         zk.create(zPath, data, acls, mode);
         return true;
-      } catch (NodeExistsException nee) {
-        switch (policy) {
-          case SKIP:
-            return false;
-          case OVERWRITE:
-            try {
-              zk.setData(zPath, data, version);
-              return true;
-            } catch (NoNodeException nne) {
-              // node delete between create call and set data, so try create call again
-              continue;
-            }
-          default:
-            throw nee;
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.NODEEXISTS) {
+          switch (policy) {
+            case SKIP:
+              return false;
+            case OVERWRITE:
+              // overwrite the data in the node when it already exists
+              try {
+                zk.setData(zPath, data, version);
+                return true;
+              } catch (KeeperException e2) {
+                final Code code2 = e2.code();
+                if (code2 == Code.NONODE) {
+                  // node delete between create call and set data, so try create call again
+                  continue;
+                } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+                  // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+                  retryOrThrow(retry, e);
+                } else {
+                  // unhandled exception on setData()
+                  throw e;
+                }
+              }
+            default:
+              throw e;
+          }
+        } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          // unhandled exception on create()
+          throw e;
         }
       }
+
+      // Catch all to wait before retrying
+      retry.waitForNextAttempt();
     }
   }
-  
+
   public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
-    return zk.getData(zPath, false, stat);
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return zk.getData(zPath, false, stat);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
-  
+
   public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
-    return zk.exists(zPath, false);
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return zk.exists(zPath, false);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
-  
+
   public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
     return getStatus(zk, zPath) != null;
   }
-  
+
   public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
       InterruptedException {
     Stat stat = null;
@@ -192,9 +304,10 @@ public class ZooUtil {
           throw KeeperException.create(Code.NODEEXISTS, source);
       }
     }
-    
+
     stat = new Stat();
-    byte[] data = zk.getData(source, false, stat);
+    byte[] data = getData(zk, source, stat);
+
     if (stat.getEphemeralOwner() == 0) {
       if (data == null)
         throw KeeperException.create(Code.NONODE, source);
@@ -204,61 +317,113 @@ public class ZooUtil {
           recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
     }
   }
-  
+
   public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
     return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
   }
-  
+
   public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
-  
+
   public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
 
   public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
-    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
   }
-  
+
   public static byte[] getLockData(ZooCache zc, String path) {
-    
+
     List<String> children = zc.getChildren(path);
-    
+
     if (children == null || children.size() == 0) {
       return null;
     }
-    
+
     children = new ArrayList<String>(children);
     Collections.sort(children);
-    
+
     String lockNode = children.get(0);
-    
+
     return zc.get(path + "/" + lockNode);
   }
-  
+
   public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
-    
+    final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
+        // TODO push down retry to getChildren and exists
         List<String> children = zk.getChildren(lid.path, false);
-        
+
         if (children.size() == 0) {
           return false;
         }
-        
+
         Collections.sort(children);
-        
+
         String lockNode = children.get(0);
         if (!lid.node.equals(lockNode))
           return false;
-        
+
         Stat stat = zk.exists(lid.path + "/" + lid.node, false);
         return stat != null && stat.getEphemeralOwner() == lid.eid;
-      } catch (KeeperException.ConnectionLossException ex) {
-        UtilWaitThread.sleep(1000);
+      } catch (KeeperException ex) {
+        final Code c = ex.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED) {
+          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
+          retryOrThrow(retry, ex);
+        }
       }
+
+      retry.waitForNextAttempt();
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
deleted file mode 100644
index 0613a1f..0000000
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryingInvocationHandlerTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.fate.zookeeper;
-
-import java.lang.reflect.Method;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.easymock.EasyMock.aryEq;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-public class RetryingInvocationHandlerTest {
-  private static final String PATH = "/path/to/somewhere";
-  private static final byte[] DATA = {(byte) 1, (byte) 2};
-  private static final Object[] ARGS = {PATH, DATA};
-  private static final String RV = "OK";
-
-  private static Method putMethod;
-
-  @BeforeClass
-  public static void setUpClass() throws Exception {
-    putMethod = IZooReaderWriter.class.getMethod("putEphemeralData", String.class, byte[].class);
-  }
-
-  private IZooReaderWriter zrw;
-  private RetryingInvocationHandler ih;
-
-  @Before
-  public void setUp() throws Exception {
-    zrw = createMock(IZooReaderWriter.class);
-    ih = new RetryingInvocationHandler(zrw);
-  }
-
-  @Test
-  public void testInvokeSuccessful() throws Throwable {
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
-    replay(zrw);
-    Object rv = ih.invoke(null, putMethod, ARGS);
-    verify(zrw);
-    assertEquals(RV, rv);
-  }
-
-  @Test
-  public void testInvokeRetrySuccessful() throws Throwable {
-    ConnectionLossException e = createMock(ConnectionLossException.class);
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andReturn(RV);
-    replay(zrw);
-    Object rv = ih.invoke(null, putMethod, ARGS);
-    verify(zrw);
-    assertEquals(RV, rv);
-  }
-
-  @Test(expected = InterruptedException.class)
-  public void testInvokeRetryFailure() throws Throwable {
-    ConnectionLossException e = createMock(ConnectionLossException.class);
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(e);
-    expect(zrw.putEphemeralData(eq(PATH), aryEq(DATA))).andThrow(new InterruptedException());
-    replay(zrw);
-    try {
-      ih.invoke(null, putMethod, ARGS);
-    } finally {
-      verify(zrw);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index ca70efe..6ba05cc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -301,7 +301,7 @@ public class Accumulo {
   public static void abortIfFateTransactions() {
     try {
       final ReadOnlyTStore<Accumulo> fate = new ReadOnlyStore<Accumulo>(new ZooStore<Accumulo>(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZFATE,
-          ZooReaderWriter.getRetryingInstance()));
+          ZooReaderWriter.getInstance()));
       if (!(fate.list().isEmpty())) {
         throw new AccumuloException("Aborting upgrade because there are outstanding FATE transactions from a previous Accumulo version. Please see the README document for instructions on what to do under your previous version.");
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 63bd894..e8d5bbf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -388,7 +388,7 @@ public class LiveTServerSet implements Watcher {
     log.info("Removing zookeeper lock for " + server);
     String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath;
     try {
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP);
+      ZooReaderWriter.getInstance().recursiveDelete(fullpath, SKIP);
     } catch (Exception e) {
       String msg = "error removing tablet server lock";
       log.fatal(msg, e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
index 1646a28..5f6ff71 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java
@@ -39,40 +39,40 @@ import org.apache.zookeeper.KeeperException;
 public final class ZKAuthenticator implements Authenticator {
   static final Logger log = Logger.getLogger(ZKAuthenticator.class);
   private static Authenticator zkAuthenticatorInstance = null;
-  
+
   private String ZKUserPath;
   private final ZooCache zooCache;
-  
+
   public static synchronized Authenticator getInstance() {
     if (zkAuthenticatorInstance == null)
       zkAuthenticatorInstance = new ZKAuthenticator();
     return zkAuthenticatorInstance;
   }
-  
+
   public ZKAuthenticator() {
     zooCache = new ZooCache();
   }
-  
+
   @Override
   public void initialize(String instanceId, boolean initialize) {
     ZKUserPath = Constants.ZROOT + "/" + instanceId + "/users";
   }
-  
+
   @Override
   public void initializeSecurity(TCredentials credentials, String principal, byte[] token) throws AccumuloSecurityException {
     try {
       // remove old settings from zookeeper first, if any
-      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
       synchronized (zooCache) {
         zooCache.clear();
         if (zoo.exists(ZKUserPath)) {
           zoo.recursiveDelete(ZKUserPath, NodeMissingPolicy.SKIP);
           log.info("Removed " + ZKUserPath + "/" + " from zookeeper");
         }
-        
+
         // prep parent node of users with root username
         zoo.putPersistentData(ZKUserPath, principal.getBytes(Constants.UTF8), NodeExistsPolicy.FAIL);
-        
+
         constructUser(principal, ZKSecurityTool.createPass(token));
       }
     } catch (KeeperException e) {
@@ -86,23 +86,23 @@ public final class ZKAuthenticator implements Authenticator {
       throw new RuntimeException(e);
     }
   }
-  
+
   /**
    * Sets up the user in ZK for the provided user. No checking for existence is done here, it should be done before calling.
    */
   private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException {
     synchronized (zooCache) {
       zooCache.clear();
-      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
       zoo.putPrivatePersistentData(ZKUserPath + "/" + user, pass, NodeExistsPolicy.FAIL);
     }
   }
-  
+
   @Override
   public Set<String> listUsers() {
     return new TreeSet<String>(zooCache.getChildren(ZKUserPath));
   }
-  
+
   /**
    * Creates a user with no permissions whatsoever
    */
@@ -125,13 +125,13 @@ public final class ZKAuthenticator implements Authenticator {
       throw new AccumuloSecurityException(principal, SecurityErrorCode.DEFAULT_SECURITY_ERROR, e);
     }
   }
-  
+
   @Override
   public void dropUser(String user) throws AccumuloSecurityException {
     try {
       synchronized (zooCache) {
         zooCache.clear();
-        ZooReaderWriter.getRetryingInstance().recursiveDelete(ZKUserPath + "/" + user, NodeMissingPolicy.FAIL);
+        ZooReaderWriter.getInstance().recursiveDelete(ZKUserPath + "/" + user, NodeMissingPolicy.FAIL);
       }
     } catch (InterruptedException e) {
       log.error(e, e);
@@ -143,7 +143,7 @@ public final class ZKAuthenticator implements Authenticator {
       throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e);
     }
   }
-  
+
   @Override
   public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
     if (!(token instanceof PasswordToken))
@@ -153,7 +153,7 @@ public final class ZKAuthenticator implements Authenticator {
       try {
         synchronized (zooCache) {
           zooCache.clear(ZKUserPath + "/" + principal);
-          ZooReaderWriter.getRetryingInstance().putPrivatePersistentData(ZKUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()),
+          ZooReaderWriter.getInstance().putPrivatePersistentData(ZKUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()),
               NodeExistsPolicy.OVERWRITE);
         }
       } catch (KeeperException e) {
@@ -169,7 +169,7 @@ public final class ZKAuthenticator implements Authenticator {
     } else
       throw new AccumuloSecurityException(principal, SecurityErrorCode.USER_DOESNT_EXIST); // user doesn't exist
   }
-  
+
   /**
    * Checks if a user exists
    */
@@ -177,12 +177,12 @@ public final class ZKAuthenticator implements Authenticator {
   public boolean userExists(String user) {
     return zooCache.get(ZKUserPath + "/" + user) != null;
   }
-  
+
   @Override
   public boolean validSecurityHandlers(Authorizor auth, PermissionHandler pm) {
     return true;
   }
-  
+
   @Override
   public boolean authenticateUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
     if (!(token instanceof PasswordToken))
@@ -199,14 +199,14 @@ public final class ZKAuthenticator implements Authenticator {
     }
     return result;
   }
-  
+
   @Override
   public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
     Set<Class<? extends AuthenticationToken>> cs = new HashSet<Class<? extends AuthenticationToken>>();
     cs.add(PasswordToken.class);
     return cs;
   }
-  
+
   @Override
   public boolean validTokenClass(String tokenClass) {
     return tokenClass.equals(PasswordToken.class.getName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
index bbaf592..75b73fc 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java
@@ -81,7 +81,7 @@ public class ZKAuthorizor implements Authorizor {
 
   @Override
   public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 
     // create the root user with all system privileges, no table privileges, and no record-level authorizations
     Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
@@ -110,7 +110,7 @@ public class ZKAuthorizor implements Authorizor {
 
   @Override
   public void initUser(String user) throws AccumuloSecurityException {
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     try {
       zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
     } catch (KeeperException e) {
@@ -126,7 +126,7 @@ public class ZKAuthorizor implements Authorizor {
   public void dropUser(String user) throws AccumuloSecurityException {
     try {
       synchronized (zooCache) {
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP);
         zooCache.clear(ZKUserPath + "/" + user);
       }
@@ -147,7 +147,7 @@ public class ZKAuthorizor implements Authorizor {
     try {
       synchronized (zooCache) {
         zooCache.clear();
-        ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserAuths, ZKSecurityTool.convertAuthorizations(authorizations),
+        ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserAuths, ZKSecurityTool.convertAuthorizations(authorizations),
             NodeExistsPolicy.OVERWRITE);
       }
     } catch (KeeperException e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
index 1b7e7d3..3cac2e9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java
@@ -81,14 +81,14 @@ public class ZKPermHandler implements PermissionHandler {
     byte[] serializedPerms;
     try {
       String path = ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table;
-      ZooReaderWriter.getRetryingInstance().sync(path);
-      serializedPerms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+      ZooReaderWriter.getInstance().sync(path);
+      serializedPerms = ZooReaderWriter.getInstance().getData(path, null);
     } catch (KeeperException e) {
       if (e.code() == Code.NONODE) {
         // maybe the table was just deleted?
         try {
           // check for existence:
-          ZooReaderWriter.getRetryingInstance().getData(ZKTablePath + "/" + table, null);
+          ZooReaderWriter.getInstance().getData(ZKTablePath + "/" + table, null);
           // it's there, you don't have permission
           return false;
         } catch (InterruptedException ex) {
@@ -129,14 +129,14 @@ public class ZKPermHandler implements PermissionHandler {
     byte[] serializedPerms;
     try {
       String path = ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace;
-      ZooReaderWriter.getRetryingInstance().sync(path);
-      serializedPerms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+      ZooReaderWriter.getInstance().sync(path);
+      serializedPerms = ZooReaderWriter.getInstance().getData(path, null);
     } catch (KeeperException e) {
       if (e.code() == Code.NONODE) {
         // maybe the namespace was just deleted?
         try {
           // check for existence:
-          ZooReaderWriter.getRetryingInstance().getData(ZKNamespacePath + "/" + namespace, null);
+          ZooReaderWriter.getInstance().getData(ZKNamespacePath + "/" + namespace, null);
           // it's there, you don't have permission
           return false;
         } catch (InterruptedException ex) {
@@ -187,7 +187,7 @@ public class ZKPermHandler implements PermissionHandler {
       if (perms.add(permission)) {
         synchronized (zooCache) {
           zooCache.clear();
-          ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
+          ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(perms),
               NodeExistsPolicy.OVERWRITE);
         }
       }
@@ -213,7 +213,7 @@ public class ZKPermHandler implements PermissionHandler {
       if (tablePerms.add(permission)) {
         synchronized (zooCache) {
           zooCache.clear(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table);
-          IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+          IZooReaderWriter zoo = ZooReaderWriter.getInstance();
           zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, ZKSecurityTool.convertTablePermissions(tablePerms),
               NodeExistsPolicy.OVERWRITE);
         }
@@ -240,7 +240,7 @@ public class ZKPermHandler implements PermissionHandler {
       if (namespacePerms.add(permission)) {
         synchronized (zooCache) {
           zooCache.clear(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace);
-          IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+          IZooReaderWriter zoo = ZooReaderWriter.getInstance();
           zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, ZKSecurityTool.convertNamespacePermissions(namespacePerms),
               NodeExistsPolicy.OVERWRITE);
         }
@@ -268,7 +268,7 @@ public class ZKPermHandler implements PermissionHandler {
       if (sysPerms.remove(permission)) {
         synchronized (zooCache) {
           zooCache.clear();
-          ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
+          ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserSysPerms, ZKSecurityTool.convertSystemPermissions(sysPerms),
               NodeExistsPolicy.OVERWRITE);
         }
       }
@@ -293,7 +293,7 @@ public class ZKPermHandler implements PermissionHandler {
     try {
       if (tablePerms.remove(permission)) {
         zooCache.clear();
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         if (tablePerms.size() == 0)
           zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
         else
@@ -321,7 +321,7 @@ public class ZKPermHandler implements PermissionHandler {
     try {
       if (namespacePerms.remove(permission)) {
         zooCache.clear();
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         if (namespacePerms.size() == 0)
           zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP);
         else
@@ -342,7 +342,7 @@ public class ZKPermHandler implements PermissionHandler {
     try {
       synchronized (zooCache) {
         zooCache.clear();
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         for (String user : zooCache.getChildren(ZKUserPath))
           zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table, NodeMissingPolicy.SKIP);
       }
@@ -360,7 +360,7 @@ public class ZKPermHandler implements PermissionHandler {
     try {
       synchronized (zooCache) {
         zooCache.clear();
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         for (String user : zooCache.getChildren(ZKUserPath))
           zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, NodeMissingPolicy.SKIP);
       }
@@ -375,7 +375,7 @@ public class ZKPermHandler implements PermissionHandler {
 
   @Override
   public void initializeSecurity(TCredentials itw, String rootuser) throws AccumuloSecurityException {
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 
     // create the root user with all system privileges, no table privileges, and no record-level authorizations
     Set<SystemPermission> rootPerms = new TreeSet<SystemPermission>();
@@ -412,7 +412,7 @@ public class ZKPermHandler implements PermissionHandler {
 
   @Override
   public void initUser(String user) throws AccumuloSecurityException {
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     try {
       zoo.putPersistentData(ZKUserPath + "/" + user, new byte[0], NodeExistsPolicy.SKIP);
       zoo.putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms, new byte[0], NodeExistsPolicy.SKIP);
@@ -432,7 +432,7 @@ public class ZKPermHandler implements PermissionHandler {
   private void createTablePerm(String user, String table, Set<TablePermission> perms) throws KeeperException, InterruptedException {
     synchronized (zooCache) {
       zooCache.clear();
-      ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
+      ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserTablePerms + "/" + table,
           ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL);
     }
   }
@@ -443,7 +443,7 @@ public class ZKPermHandler implements PermissionHandler {
   private void createNamespacePerm(String user, String namespace, Set<NamespacePermission> perms) throws KeeperException, InterruptedException {
     synchronized (zooCache) {
       zooCache.clear();
-      ZooReaderWriter.getRetryingInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace,
+      ZooReaderWriter.getInstance().putPersistentData(ZKUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace,
           ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL);
     }
   }
@@ -452,7 +452,7 @@ public class ZKPermHandler implements PermissionHandler {
   public void cleanUser(String user) throws AccumuloSecurityException {
     try {
       synchronized (zooCache) {
-        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP);
         zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP);
         zoo.recursiveDelete(ZKUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP);
@@ -475,8 +475,8 @@ public class ZKPermHandler implements PermissionHandler {
     byte[] perms;
     try {
       String path = ZKUserPath + "/" + user + ZKUserSysPerms;
-      ZooReaderWriter.getRetryingInstance().sync(path);
-      perms = ZooReaderWriter.getRetryingInstance().getData(path, null);
+      ZooReaderWriter.getInstance().sync(path);
+      perms = ZooReaderWriter.getInstance().getData(path, null);
     } catch (KeeperException e) {
       if (e.code() == Code.NONODE) {
         return false;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
index 7a61eb6..71b7155 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java
@@ -63,7 +63,7 @@ public class TableManager {
     log.debug("Creating ZooKeeper entries for new namespace " + namespace + " (ID: " + namespaceId + ")");
     String zPath = Constants.ZROOT + "/" + instanceId + Constants.ZNAMESPACES + "/" + namespaceId;
 
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     zoo.putPersistentData(zPath, new byte[0], existsPolicy);
     zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(Constants.UTF8), existsPolicy);
     zoo.putPersistentData(zPath + Constants.ZNAMESPACE_CONF, new byte[0], existsPolicy);
@@ -76,7 +76,7 @@ public class TableManager {
     Pair<String,String> qualifiedTableName = Tables.qualify(tableName);
     tableName = qualifiedTableName.getSecond();
     String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId;
-    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
     zoo.putPersistentData(zTablePath, new byte[0], existsPolicy);
     zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy);
     zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAMESPACE, namespaceId.getBytes(Constants.UTF8), existsPolicy);
@@ -132,7 +132,7 @@ public class TableManager {
     String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE;
 
     try {
-      ZooReaderWriter.getRetryingInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8), ZooUtil.PUBLIC, new Mutator() {
+      ZooReaderWriter.getInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8), ZooUtil.PUBLIC, new Mutator() {
         @Override
         public byte[] mutate(byte[] oldData) throws Exception {
           TableState oldState = TableState.UNKNOWN;
@@ -205,13 +205,13 @@ public class TableManager {
 
     String srcTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + srcTable + Constants.ZTABLE_CONF;
     String newTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
-    ZooReaderWriter.getRetryingInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
+    ZooReaderWriter.getInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE);
 
     for (Entry<String,String> entry : propertiesToSet.entrySet())
       TablePropUtil.setTableProperty(tableId, entry.getKey(), entry.getValue());
 
     for (String prop : propertiesToExclude)
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(
+      ZooReaderWriter.getInstance().recursiveDelete(
           Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF + "/" + prop, NodeMissingPolicy.SKIP);
 
     updateTableStateCache(tableId);
@@ -220,9 +220,9 @@ public class TableManager {
   public void removeTable(String tableId) throws KeeperException, InterruptedException {
     synchronized (tableStateCache) {
       tableStateCache.remove(tableId);
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
+      ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
           NodeMissingPolicy.SKIP);
-      ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
+      ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
     }
   }
 
@@ -312,7 +312,7 @@ public class TableManager {
   }
 
   public void removeNamespace(String namespaceId) throws KeeperException, InterruptedException {
-    ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES + "/" + namespaceId, NodeMissingPolicy.SKIP);
+    ZooReaderWriter.getInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES + "/" + namespaceId, NodeMissingPolicy.SKIP);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
index 4ae8335..eabdc0d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java
@@ -48,7 +48,7 @@ public class UniqueNameAllocator {
       final int allocate = 100 + rand.nextInt(100);
       
       try {
-        byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
+        byte[] max = ZooReaderWriter.getInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() {
           public byte[] mutate(byte[] currentValue) throws Exception {
             long l = Long.parseLong(new String(currentValue, Constants.UTF8), Character.MAX_RADIX);
             l += allocate;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
index f7c1e68..b02e5d4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java
@@ -24,11 +24,11 @@ import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock;
 import org.apache.zookeeper.KeeperException;
 
 public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLock {
-  
+
   public ZooQueueLock(String path, boolean ephemeral) throws KeeperException, InterruptedException {
-    super(ZooReaderWriter.getRetryingInstance(), path, ephemeral);
+    super(ZooReaderWriter.getInstance(), path, ephemeral);
   }
-  
+
   public static void main(String args[]) throws InterruptedException, KeeperException {
     ZooQueueLock lock = new ZooQueueLock("/lock", true);
     DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes(Constants.UTF8));
@@ -50,5 +50,5 @@ public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLoc
     readLock.lock();
     System.out.println("success");
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
index f950077..435591d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
@@ -16,25 +16,15 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
 
 public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReaderWriter {
   private static final String SCHEME = "digest";
   private static final String USER = "accumulo";
   private static ZooReaderWriter instance = null;
-  private static IZooReaderWriter retryingInstance = null;
   
   public ZooReaderWriter(String string, int timeInMillis, String secret) {
     super(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(Constants.UTF8));
@@ -49,39 +39,4 @@ public class ZooReaderWriter extends org.apache.accumulo.fate.zookeeper.ZooReade
     return instance;
   }
   
-  /**
-   * get an instance that retries when zookeeper connection errors occur
-   * 
-   * @return an instance that retries when Zookeeper connection errors occur.
-   */
-  public static synchronized IZooReaderWriter getRetryingInstance() {
-    
-    if (retryingInstance == null) {
-      final IZooReaderWriter inst = getInstance();
-      
-      InvocationHandler ih = new InvocationHandler() {
-        @Override
-        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
-          long retryTime = 250;
-          while (true) {
-            try {
-              return method.invoke(inst, args);
-            } catch (InvocationTargetException e) {
-              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
-                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
-                UtilWaitThread.sleep(retryTime);
-                retryTime = Math.min(5000, retryTime + 250);
-              } else {
-                throw e.getCause();
-              }
-            }
-          }
-        }
-      };
-      
-      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
-    }
-    
-    return retryingInstance;
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c023f74e/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
index 0a9508c..4a7111d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriterFactory.java
@@ -16,14 +16,10 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.RetryingInvocationHandler;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 
@@ -34,7 +30,6 @@ public class ZooReaderWriterFactory {
   private static final String SCHEME = "digest";
   private static final String USER = "accumulo";
   private static IZooReaderWriter instance = null;
-  private static IZooReaderWriter retryingInstance = null;
 
   /**
    * Gets a new reader/writer.
@@ -66,21 +61,4 @@ public class ZooReaderWriterFactory {
       return instance;
     }
   }
-
-  /**
-   * Gets a reader/writer, retrieving ZooKeeper information from the site configuration, and that retries on connection loss. The same instance may be returned
-   * for multiple calls.
-   *
-   * @return retrying reader/writer
-   */
-  public IZooReaderWriter getRetryingInstance() {
-    synchronized (ZooReaderWriterFactory.class) {
-      if (retryingInstance == null) {
-        IZooReaderWriter inst = getInstance();
-        InvocationHandler ih = new RetryingInvocationHandler(inst);
-        retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(IZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
-      }
-      return retryingInstance;
-    }
-  }
 }


Mime
View raw message