accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [10/16] git commit: ACCUMULO-3242 Rip out the rest of the improper ZK usages in ZooUtil.
Date Mon, 20 Oct 2014 01:05:51 GMT
ACCUMULO-3242 Rip out the rest of the improper ZK usages in ZooUtil.

The way ZooUtil was written made it really hard to correctly handle
session expirations. Remove ZooKeeper as an argument and use ZooSession
(like ZooReader and ZooReaderWriter) which should correctly account
for a new connection.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1ce12aea
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1ce12aea
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1ce12aea

Branch: refs/heads/1.6
Commit: 1ce12aea2c61ad804e8c27a9a9d1b125bf9d2752
Parents: 6e71636
Author: Josh Elser <elserj@apache.org>
Authored: Sat Oct 18 15:46:56 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Sun Oct 19 20:16:04 2014 -0400

----------------------------------------------------------------------
 .../fate/zookeeper/IZooReaderWriter.java        |  35 ++---
 .../apache/accumulo/fate/zookeeper/Retry.java   |   4 +-
 .../accumulo/fate/zookeeper/RetryFactory.java   |   6 +-
 .../accumulo/fate/zookeeper/ZooReader.java      |   2 +-
 .../fate/zookeeper/ZooReaderWriter.java         |  29 ++--
 .../apache/accumulo/fate/zookeeper/ZooUtil.java | 143 +++++++++++--------
 .../apache/accumulo/server/init/Initialize.java |  10 +-
 7 files changed, 137 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
index afc2250..2f3ed62 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
@@ -25,44 +25,47 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 public interface IZooReaderWriter extends IZooReader {
-  
+
   ZooKeeper getZooKeeper();
-  
+
   void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
-  
+
   void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException,
InterruptedException;
-  
+
   /**
    * Create a persistent node with the default ACL
-   * 
+   *
    * @return true if the node was created or altered; false if it was skipped
    */
   boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException,
InterruptedException;
-  
+
   boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws
KeeperException, InterruptedException;
-  
+
   void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy)
throws KeeperException, InterruptedException;
   
+  boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy,
List<ACL> acls) throws KeeperException, InterruptedException;
+
   String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
-  
+
   String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
-  
+
   String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException;
 
   void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy)
throws KeeperException, InterruptedException;
-  
+
   void delete(String path, int version) throws InterruptedException, KeeperException;
-  
+
   interface Mutator {
     byte[] mutate(byte[] currentValue) throws Exception;
   }
-  
+
   byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws
Exception;
-  
+
   boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException;
-  
+
   void mkdirs(String path) throws KeeperException, InterruptedException;
-  
+
+  @Override
   void sync(String path) throws KeeperException, InterruptedException;
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
index 4a37172..63aa902 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.fate.zookeeper;
 import org.apache.log4j.Logger;
 
 /**
- *
+ * Encapsulates the retrying implementation for some operation. Provides bounded retry attempts
with a bounded, linear backoff.
  */
 public class Retry {
   private static final Logger log = Logger.getLogger(Retry.class);
@@ -37,7 +37,7 @@ public class Retry {
    * @param waitIncrement
    *          The amount of time (ms) to increment next wait time by
    */
-  public Retry(long maxRetries, long startWait, long maxWait, long waitIncrement) {
+  public Retry(long maxRetries, long startWait, long waitIncrement, long maxWait) {
     this.maxRetries = maxRetries;
     this.maxWait = maxWait;
     this.waitIncrement = waitIncrement;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
index 3fcb738..ff96350 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -17,13 +17,15 @@
 package org.apache.accumulo.fate.zookeeper;
 
 /**
- *
+ * Creates {@link Retry} instances with the given parameters
  */
 public class RetryFactory {
+  public static final long DEFAULT_MAX_RETRIES = 10l, DEFAULT_START_WAIT = 250l, DEFAULT_WAIT_INCREMENT
= 250l, DEFAULT_MAX_WAIT = 5000l;
+  public static final RetryFactory DEFAULT_INSTANCE = new RetryFactory(DEFAULT_MAX_RETRIES,
DEFAULT_START_WAIT, DEFAULT_WAIT_INCREMENT, DEFAULT_MAX_WAIT);
 
   private final long maxRetries, startWait, maxWait, waitIncrement;
 
-  public RetryFactory(long maxRetries, long startWait, long maxWait, long waitIncrement)
{
+  public RetryFactory(long maxRetries, long startWait, long waitIncrement, long maxWait)
{
     this.maxRetries = maxRetries;
     this.startWait = startWait;
     this.maxWait = maxWait;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index fd7938a..5706cf3 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@ -223,6 +223,6 @@ public class ZooReader implements IZooReader {
   public ZooReader(String keepers, int timeout) {
     this.keepers = keepers;
     this.timeout = timeout;
-    this.retryFactory = new RetryFactory(10l, 250l, 250l, 5000l);
+    this.retryFactory = RetryFactory.DEFAULT_INSTANCE;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
index 79c2219..5b6447c 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.ZooKeeperConnectionInfo;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,6 +39,7 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter
{
   private static ZooReaderWriter instance = null;
   private final String scheme;
   private final byte[] auth;
+  private final ZooKeeperConnectionInfo info;
 
   @Override
   public ZooKeeper getZooKeeper() {
@@ -52,16 +54,17 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter
{
     super(string, timeInMillis);
     this.scheme = scheme;
     this.auth = Arrays.copyOf(auth, auth.length);
+    this.info = new ZooKeeperConnectionInfo(string, timeInMillis, scheme, this.auth);
   }
 
   @Override
   public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException,
InterruptedException {
-    ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
+    ZooUtil.recursiveDelete(info, zPath, policy);
   }
 
   @Override
   public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws
KeeperException, InterruptedException {
-    ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
+    ZooUtil.recursiveDelete(info, zPath, version, policy);
   }
 
   /**
@@ -71,37 +74,43 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter
{
    */
   @Override
   public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws
KeeperException, InterruptedException {
-    return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
+    return ZooUtil.putPersistentData(info, zPath, data, policy);
+  }
+
+  @Override
+  public boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy
policy, List<ACL> acls) throws KeeperException,
+      InterruptedException {
+    return ZooUtil.putPersistentData(info, zPath, data, version, policy, acls);
   }
 
   @Override
   public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy)
throws KeeperException, InterruptedException {
-    return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
+    return ZooUtil.putPrivatePersistentData(info, zPath, data, policy);
   }
 
   @Override
   public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy
policy) throws KeeperException, InterruptedException {
-    ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
+    ZooUtil.putPersistentData(info, zPath, data, version, policy);
   }
 
   @Override
   public String putPersistentSequential(String zPath, byte[] data) throws KeeperException,
InterruptedException {
-    return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
+    return ZooUtil.putPersistentSequential(info, zPath, data);
   }
 
   @Override
   public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException
{
-    return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
+    return ZooUtil.putEphemeralData(info, zPath, data);
   }
 
   @Override
   public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException,
InterruptedException {
-    return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
+    return ZooUtil.putEphemeralSequential(info, zPath, data);
   }
 
   @Override
   public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy
policy) throws KeeperException, InterruptedException {
-    ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
+    ZooUtil.recursiveCopyPersistent(info, source, destination, policy);
   }
 
   @Override
@@ -189,7 +198,7 @@ public class ZooReaderWriter extends ZooReader implements IZooReaderWriter
{
 
   @Override
   public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException
{
-    return ZooUtil.isLockHeld(getZooKeeper(), lockID);
+    return ZooUtil.isLockHeld(info, lockID);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index e0d8831..674c1d8 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@ -81,6 +81,19 @@ public class ZooUtil {
     }
   }
 
+  protected static class ZooKeeperConnectionInfo {
+    String keepers, scheme;
+    int timeout;
+    byte[] auth;
+
+    public ZooKeeperConnectionInfo(String keepers, int timeout, String scheme, byte[] auth)
{
+      this.keepers = keepers;
+      this.timeout = timeout;
+      this.scheme = scheme;
+      this.auth = auth;
+    }
+  }
+
   public static final List<ACL> PRIVATE;
   public static final List<ACL> PUBLIC;
   private static final RetryFactory RETRY_FACTORY;
@@ -90,7 +103,15 @@ public class ZooUtil {
     PUBLIC = new ArrayList<ACL>();
     PUBLIC.addAll(PRIVATE);
     PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
-    RETRY_FACTORY = new RetryFactory(10l, 250l, 250l, 5000l);
+    RETRY_FACTORY = RetryFactory.DEFAULT_INSTANCE;
+  }
+
+  protected static ZooKeeper getZooKeeper(ZooKeeperConnectionInfo info) {
+    return getZooKeeper(info.keepers, info.timeout, info.scheme, info.auth);
+  }
+
+  protected static ZooKeeper getZooKeeper(String keepers, int timeout, String scheme, byte[]
auth) {
+    return ZooSession.getSession(keepers, timeout, scheme, auth);
   }
 
   protected static void retryOrThrow(Retry retry, KeeperException e) throws KeeperException
{
@@ -110,7 +131,7 @@ public class ZooUtil {
    * @param zPath
    *          the path to delete
    */
-  public static void recursiveDelete(ZooKeeper zk, String zPath, int version, NodeMissingPolicy
policy) throws KeeperException, InterruptedException {
+  static void recursiveDelete(ZooKeeperConnectionInfo info, String zPath, int version, NodeMissingPolicy
policy) throws KeeperException, InterruptedException {
     if (policy.equals(NodeMissingPolicy.CREATE))
       throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
     try {
@@ -118,12 +139,11 @@ public class ZooUtil {
       final Retry retry = RETRY_FACTORY.create();
       while (true) {
         try {
-          children = zk.getChildren(zPath, false);
+          children = getZooKeeper(info).getChildren(zPath, false);
           break;
         } catch (KeeperException e) {
           final Code c = e.code();
           if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-            // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
             retryOrThrow(retry, e);
           } else {
             throw e;
@@ -132,17 +152,17 @@ public class ZooUtil {
         retry.waitForNextAttempt();
       }
       for (String child : children)
-        recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
+        recursiveDelete(info, zPath + "/" + child, NodeMissingPolicy.SKIP);
 
       Stat stat;
       while (true) {
         try {
-          stat = zk.exists(zPath, null);
+          stat = getZooKeeper(info).exists(zPath, null);
           // Node exists
           if (stat != null) {
             try {
               // Try to delete it
-              zk.delete(zPath, stat.getVersion());
+              getZooKeeper(info).delete(zPath, stat.getVersion());
               return;
             } catch (NoNodeException e) {
               // If the node is gone now, it's ok if we have SKIP
@@ -156,7 +176,6 @@ public class ZooUtil {
         } catch (KeeperException e) {
           final Code c = e.code();
           if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-            // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
             retryOrThrow(retry, e);
           } else {
             throw e;
@@ -172,8 +191,8 @@ public class ZooUtil {
     }
   }
 
-  public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy)
throws KeeperException, InterruptedException {
-    recursiveDelete(zk, zPath, -1, policy);
+  public static void recursiveDelete(ZooKeeperConnectionInfo info, String zPath, NodeMissingPolicy
policy) throws KeeperException, InterruptedException {
+    recursiveDelete(info, zPath, -1, policy);
   }
 
   /**
@@ -181,21 +200,24 @@ public class ZooUtil {
    *
    * @return true if the node was created or altered; false if it was skipped
    */
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy
policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
+  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data, NodeExistsPolicy policy) throws KeeperException,
+      InterruptedException {
+    return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
   }
 
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version,
NodeExistsPolicy policy) throws KeeperException,
+  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data, int version, NodeExistsPolicy policy)
+      throws KeeperException,
       InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
+    return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
   }
 
-  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version,
NodeExistsPolicy policy, List<ACL> acls)
+  public static boolean putPersistentData(ZooKeeperConnectionInfo info, String zPath, byte[]
data, int version, NodeExistsPolicy policy, List<ACL> acls)
       throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
+    return putData(info, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
   }
 
-  private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode,
int version, NodeExistsPolicy policy, List<ACL> acls)
+  private static boolean putData(ZooKeeperConnectionInfo info, String zPath, byte[] data,
CreateMode mode, int version,
+      NodeExistsPolicy policy, List<ACL> acls)
       throws KeeperException, InterruptedException {
     if (policy == null)
       policy = NodeExistsPolicy.FAIL;
@@ -203,7 +225,7 @@ public class ZooUtil {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        zk.create(zPath, data, acls, mode);
+        getZooKeeper(info).create(zPath, data, acls, mode);
         return true;
       } catch (KeeperException e) {
         final Code code = e.code();
@@ -214,26 +236,24 @@ public class ZooUtil {
             case OVERWRITE:
               // overwrite the data in the node when it already exists
               try {
-                zk.setData(zPath, data, version);
+                getZooKeeper(info).setData(zPath, data, version);
                 return true;
               } catch (KeeperException e2) {
                 final Code code2 = e2.code();
                 if (code2 == Code.NONODE) {
                   // node delete between create call and set data, so try create call again
                   continue;
-                } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT ||
code == Code.SESSIONEXPIRED) {
-                  // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use
ZooSession
-                  retryOrThrow(retry, e);
+                } else if (code2 == Code.CONNECTIONLOSS || code2 == Code.OPERATIONTIMEOUT
|| code2 == Code.SESSIONEXPIRED) {
+                  retryOrThrow(retry, e2);
                 } else {
                   // unhandled exception on setData()
-                  throw e;
+                  throw e2;
                 }
               }
             default:
               throw e;
           }
         } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code ==
Code.SESSIONEXPIRED) {
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           // unhandled exception on create()
@@ -246,15 +266,14 @@ public class ZooUtil {
     }
   }
 
-  public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException,
InterruptedException {
+  public static byte[] getData(ZooKeeperConnectionInfo info, String zPath, Stat stat) throws
KeeperException, InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        return zk.getData(zPath, false, stat);
+        return getZooKeeper(info).getData(zPath, false, stat);
       } catch (KeeperException e) {
         final Code c = e.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           throw e;
@@ -265,15 +284,14 @@ public class ZooUtil {
     }
   }
 
-  public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException
{
+  public static Stat getStatus(ZooKeeperConnectionInfo info, String zPath) throws KeeperException,
InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        return zk.exists(zPath, false);
+        return getZooKeeper(info).exists(zPath, false);
       } catch (KeeperException e) {
         final Code c = e.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           throw e;
@@ -284,16 +302,16 @@ public class ZooUtil {
     }
   }
 
-  public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException
{
-    return getStatus(zk, zPath) != null;
+  public static boolean exists(ZooKeeperConnectionInfo info, String zPath) throws KeeperException,
InterruptedException {
+    return getStatus(info, zPath) != null;
   }
 
-  public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination,
NodeExistsPolicy policy) throws KeeperException,
+  public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String source,
String destination, NodeExistsPolicy policy) throws KeeperException,
       InterruptedException {
     Stat stat = null;
-    if (!exists(zk, source))
+    if (!exists(info, source))
       throw KeeperException.create(Code.NONODE, source);
-    if (exists(zk, destination)) {
+    if (exists(info, destination)) {
       switch (policy) {
         case OVERWRITE:
           break;
@@ -306,31 +324,48 @@ public class ZooUtil {
     }
 
     stat = new Stat();
-    byte[] data = getData(zk, source, stat);
+    byte[] data = getData(info, source, stat);
 
     if (stat.getEphemeralOwner() == 0) {
       if (data == null)
         throw KeeperException.create(Code.NONODE, source);
-      putPersistentData(zk, destination, data, policy);
-      if (stat.getNumChildren() > 0)
-        for (String child : zk.getChildren(source, false))
-          recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
+      putPersistentData(info, destination, data, policy);
+      if (stat.getNumChildren() > 0) {
+        List<String> children;
+        final Retry retry = RETRY_FACTORY.create();
+        while (true) {
+          try {
+            children = getZooKeeper(info).getChildren(source, false);
+            break;
+          } catch (KeeperException e) {
+            final Code c = e.code();
+            if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
+              retryOrThrow(retry, e);
+            } else {
+              throw e;
+            }
+          }
+          retry.waitForNextAttempt();
+        }
+        for (String child : children)
+          recursiveCopyPersistent(info, source + "/" + child, destination + "/" + child,
policy);
+      }
     }
   }
 
-  public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data,
NodeExistsPolicy policy) throws KeeperException, InterruptedException {
-    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
+  public static boolean putPrivatePersistentData(ZooKeeperConnectionInfo info, String zPath,
byte[] data, NodeExistsPolicy policy) throws KeeperException,
+      InterruptedException {
+    return putData(info, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
   }
 
-  public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws
KeeperException, InterruptedException {
+  public static String putPersistentSequential(ZooKeeperConnectionInfo info, String zPath,
byte[] data) throws KeeperException, InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
       } catch (KeeperException e) {
         final Code c = e.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           throw e;
@@ -341,15 +376,14 @@ public class ZooUtil {
     }
   }
 
-  public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException,
InterruptedException {
+  public static String putEphemeralData(ZooKeeperConnectionInfo info, String zPath, byte[]
data) throws KeeperException, InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
       } catch (KeeperException e) {
         final Code c = e.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           throw e;
@@ -360,15 +394,14 @@ public class ZooUtil {
     }
   }
 
-  public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws
KeeperException, InterruptedException {
+  public static String putEphemeralSequential(ZooKeeperConnectionInfo info, String zPath,
byte[] data) throws KeeperException, InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+        return getZooKeeper(info).create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
       } catch (KeeperException e) {
         final Code c = e.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, e);
         } else {
           throw e;
@@ -395,12 +428,11 @@ public class ZooUtil {
     return zc.get(path + "/" + lockNode);
   }
 
-  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException
{
+  public static boolean isLockHeld(ZooKeeperConnectionInfo info, LockID lid) throws KeeperException,
InterruptedException {
     final Retry retry = RETRY_FACTORY.create();
     while (true) {
       try {
-        // TODO push down retry to getChildren and exists
-        List<String> children = zk.getChildren(lid.path, false);
+        List<String> children = getZooKeeper(info).getChildren(lid.path, false);
 
         if (children.size() == 0) {
           return false;
@@ -412,12 +444,11 @@ public class ZooUtil {
         if (!lid.node.equals(lockNode))
           return false;
 
-        Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+        Stat stat = getZooKeeper(info).exists(lid.path + "/" + lid.node, false);
         return stat != null && stat.getEphemeralOwner() == lid.eid;
       } catch (KeeperException ex) {
         final Code c = ex.code();
         if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == Code.SESSIONEXPIRED)
{
-          // TODO ZooKeeper needs to be recreated with SESSIONEXPIRED, should use ZooSession
           retryOrThrow(retry, ex);
         }
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/1ce12aea/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 5cbffc3..fcecc37 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -83,7 +83,7 @@ import com.beust.jcommander.Parameter;
 
 /**
  * This class is used to setup the directory structure and the root tablet to get an instance
started
- * 
+ *
  */
 public class Initialize {
   private static final Logger log = Logger.getLogger(Initialize.class);
@@ -101,7 +101,7 @@ public class Initialize {
 
   /**
    * Sets this class's ZooKeeper reader/writer.
-   * 
+   *
    * @param izoo
    *          reader/writer
    */
@@ -111,7 +111,7 @@ public class Initialize {
 
   /**
    * Gets this class's ZooKeeper reader/writer.
-   * 
+   *
    * @return reader/writer
    */
   static IZooReaderWriter getZooReaderWriter() {
@@ -421,8 +421,8 @@ public class Initialize {
 
   private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath, Path
rootTablet) throws KeeperException, InterruptedException {
     // setup basic data in zookeeper
-    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP,
Ids.OPEN_ACL_UNSAFE);
-    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES,
new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    zoo.putPersistentData(Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    zoo.putPersistentData(Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP,
Ids.OPEN_ACL_UNSAFE);
 
     // setup instance name
     if (opts.clearInstanceName)


Mime
View raw message