accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1355481 [2/4] - in /accumulo/trunk: ./ bin/ core/ core/src/main/java/org/apache/accumulo/core/client/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org...
Date Fri, 29 Jun 2012 17:42:56 GMT
Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+public interface IZooReaderWriter extends IZooReader {
+  
+  public abstract ZooKeeper getZooKeeper();
+  
+  public abstract void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
+  
+  public abstract void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException;
+  
+  /**
+   * Create a persistent node with the default ACL
+   * 
+   * @return true if the node was created or altered; false if it was skipped
+   */
+  public abstract boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+  
+  public abstract boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+  
+  public abstract void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+  
+  public abstract String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
+  
+  public abstract String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException;
+  
+  public abstract void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException;
+  
+  public abstract void delete(String path, int version) throws InterruptedException, KeeperException;
+  
+  public abstract byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception;
+  
+  public abstract boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException;
+  
+  public abstract void mkdirs(String path) throws KeeperException, InterruptedException;
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+public class TransactionWatcher {
+  
+  private static final Logger log = Logger.getLogger(TransactionWatcher.class);
+  final private Map<Long,AtomicInteger> counts = new HashMap<Long,AtomicInteger>();
+  final private Arbitrator arbitrator;
+  
+  public interface Arbitrator {
+    boolean transactionAlive(String type, long tid) throws Exception;
+  }
+  
+  public TransactionWatcher(Arbitrator arbitrator) {
+    this.arbitrator = arbitrator;
+  }
+  
+  public <T> T run(String ztxBulk, long tid, Callable<T> callable) throws Exception {
+    synchronized (counts) {
+      if (!arbitrator.transactionAlive(ztxBulk, tid)) {
+        throw new Exception("Transaction " + tid + " of type " + ztxBulk + " is no longer active");
+      }
+      AtomicInteger count = counts.get(tid);
+      if (count == null)
+        counts.put(tid, count = new AtomicInteger());
+      count.incrementAndGet();
+    }
+    try {
+      return callable.call();
+    } finally {
+      synchronized (counts) {
+        AtomicInteger count = counts.get(tid);
+        if (count == null) {
+          log.error("unexpected missing count for transaction" + tid);
+        } else {
+          if (count.decrementAndGet() == 0)
+            counts.remove(tid);
+        }
+      }
+    }
+  }
+  
+  public boolean isActive(long tid) {
+    synchronized (counts) {
+      log.debug("Transactions in progress " + counts);
+      AtomicInteger count = counts.get(tid);
+      return count != null && count.get() > 0;
+    }
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/TransactionWatcher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
+ * 
+ */
+public class ZooCache {
+  private static final Logger log = Logger.getLogger(ZooCache.class);
+  
+  private ZCacheWatcher watcher = new ZCacheWatcher();
+  private Watcher externalWatcher = null;
+  
+  private HashMap<String,byte[]> cache;
+  private HashMap<String,Stat> statCache;
+  private HashMap<String,List<String>> childrenCache;
+  
+  private ZooReader zReader;
+  
+  private ZooKeeper getZooKeeper() {
+    return zReader.getZooKeeper();
+  }
+  
+  private class ZCacheWatcher implements Watcher {
+    @Override
+    public void process(WatchedEvent event) {
+      
+      if (log.isTraceEnabled())
+        log.trace(event);
+      
+      switch (event.getType()) {
+        case NodeDataChanged:
+        case NodeChildrenChanged:
+        case NodeCreated:
+        case NodeDeleted:
+          remove(event.getPath());
+          break;
+        case None:
+          switch (event.getState()) {
+            case Disconnected:
+              if (log.isTraceEnabled())
+                log.trace("Zoo keeper connection disconnected, clearing cache");
+              clear();
+              break;
+            case SyncConnected:
+              break;
+            case Expired:
+              if (log.isTraceEnabled())
+                log.trace("Zoo keeper connection expired, clearing cache");
+              clear();
+              break;
+            default:
+              log.warn("Unhandled: " + event);
+          }
+          break;
+        default:
+          log.warn("Unhandled: " + event);
+      }
+      
+      if (externalWatcher != null) {
+        externalWatcher.process(event);
+      }
+    }
+  }
+  
+  public ZooCache(String zooKeepers, int sessionTimeout) {
+    this(zooKeepers, sessionTimeout, null);
+  }
+  
+  public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
+    this(new ZooReader(zooKeepers, sessionTimeout), watcher);
+  }
+  
+  public ZooCache(ZooReader reader, Watcher watcher) {
+    this.zReader = reader;
+    this.cache = new HashMap<String,byte[]>();
+    this.statCache = new HashMap<String,Stat>();
+    this.childrenCache = new HashMap<String,List<String>>();
+    this.externalWatcher = watcher;
+  }
+  
+  private static interface ZooRunnable {
+    void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
+  }
+  
+  private synchronized void retry(ZooRunnable op) {
+    
+    int sleepTime = 100;
+    
+    while (true) {
+      
+      ZooKeeper zooKeeper = getZooKeeper();
+      
+      try {
+        op.run(zooKeeper);
+        return;
+        
+      } catch (KeeperException e) {
+        if (e.code() == Code.NONODE) {
+          log.error("Looked up non existant node in cache " + e.getPath(), e);
+        }
+        log.warn("Zookeeper error, will retry", e);
+      } catch (InterruptedException e) {
+        log.info("Zookeeper error, will retry", e);
+      } catch (ConcurrentModificationException e) {
+        log.debug("Zookeeper was modified, will retry");
+      }
+      
+      try {
+        // do not hold lock while sleeping
+        wait(sleepTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+      if (sleepTime < 10000)
+        sleepTime = (int) (sleepTime + sleepTime * Math.random());
+      
+    }
+  }
+  
+  public synchronized List<String> getChildren(final String zPath) {
+    
+    ZooRunnable zr = new ZooRunnable() {
+      
+      @Override
+      public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+        
+        if (childrenCache.containsKey(zPath))
+          return;
+        
+        try {
+          List<String> children = zooKeeper.getChildren(zPath, watcher);
+          childrenCache.put(zPath, children);
+        } catch (KeeperException ke) {
+          if (ke.code() != Code.NONODE) {
+            throw ke;
+          }
+        }
+      }
+      
+    };
+    
+    retry(zr);
+    
+    List<String> children = childrenCache.get(zPath);
+    if (children == null) {
+      return null;
+    }
+    return Collections.unmodifiableList(children);
+  }
+  
+  public synchronized byte[] get(final String zPath) {
+    return get(zPath, null);
+  }
+  
+  public synchronized byte[] get(final String zPath, Stat stat) {
+    ZooRunnable zr = new ZooRunnable() {
+      
+      @Override
+      public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
+        
+        if (cache.containsKey(zPath))
+          return;
+        
+        /*
+         * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existance, it will be added to
+         * the cache. But this notification of a node coming into existance will only be given if exists() was previously called.
+         * 
+         * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then
+         * non-existance can not be cached.
+         */
+        
+        Stat stat = zooKeeper.exists(zPath, watcher);
+        
+        byte[] data = null;
+        
+        if (stat == null) {
+          if (log.isTraceEnabled())
+            log.trace("zookeeper did not contain " + zPath);
+        } else {
+          try {
+            data = zooKeeper.getData(zPath, watcher, stat);
+          } catch (KeeperException.BadVersionException e1) {
+            throw new ConcurrentModificationException();
+          } catch (KeeperException.NoNodeException e2) {
+            throw new ConcurrentModificationException();
+          }
+          if (log.isTraceEnabled())
+            log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data)));
+        }
+        if (log.isTraceEnabled())
+          log.trace("putting " + zPath + " " + (data == null ? null : new String(data)) + " in cache");
+        put(zPath, data, stat);
+      }
+      
+    };
+    
+    retry(zr);
+    
+    if (stat != null) {
+      Stat cstat = statCache.get(zPath);
+      if (cstat != null) {
+        try {
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          DataOutputStream dos = new DataOutputStream(baos);
+          cstat.write(dos);
+          dos.close();
+          
+          ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+          DataInputStream dis = new DataInputStream(bais);
+          stat.readFields(dis);
+          
+          dis.close();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    
+    return cache.get(zPath);
+  }
+  
+  private synchronized void put(String zPath, byte[] data, Stat stat) {
+    cache.put(zPath, data);
+    statCache.put(zPath, stat);
+  }
+  
+  private synchronized void remove(String zPath) {
+    if (log.isTraceEnabled())
+      log.trace("removing " + zPath + " from cache");
+    cache.remove(zPath);
+    childrenCache.remove(zPath);
+    statCache.remove(zPath);
+  }
+  
+  public synchronized void clear() {
+    cache.clear();
+    childrenCache.clear();
+    statCache.clear();
+  }
+  
+  public synchronized void clear(String zPath) {
+    
+    for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
+      String path = i.next();
+      if (path.startsWith(zPath))
+        i.remove();
+    }
+    
+    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) {
+      String path = i.next();
+      if (path.startsWith(zPath))
+        i.remove();
+    }
+    
+    for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
+      String path = i.next();
+      if (path.startsWith(zPath))
+        i.remove();
+    }
+  }
+  
+  private static Map<String,ZooCache> instances = new HashMap<String,ZooCache>();
+  
+  public static synchronized ZooCache getInstance(String zooKeepers, int sessionTimeout) {
+    String key = zooKeepers + ":" + sessionTimeout;
+    ZooCache zc = instances.get(key);
+    if (zc == null) {
+      zc = new ZooCache(zooKeepers, sessionTimeout);
+      instances.put(key, zc);
+    }
+    
+    return zc;
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooLock implements Watcher {
+  
+  protected static final Logger log = Logger.getLogger(ZooLock.class);
+  
+  public static final String LOCK_PREFIX = "zlock-";
+  
+  public enum LockLossReason {
+    LOCK_DELETED, SESSION_EXPIRED
+  }
+  
+  public interface LockWatcher {
+    void lostLock(LockLossReason reason);
+  }
+  
+  public interface AsyncLockWatcher extends LockWatcher {
+    void acquiredLock();
+    
+    void failedToAcquireLock(Exception e);
+  }
+  
+  private boolean lockWasAcquired;
+  final private String path;
+  protected final IZooReaderWriter zooKeeper;
+  private String lock;
+  private LockWatcher lockWatcher;
+  
+  private String asyncLock;
+  
+  public ZooLock(String zookeepers, int timeInMillis, String auth, String path) {
+    this(new ZooCache(zookeepers, timeInMillis), ZooReaderWriter.getInstance(zookeepers, timeInMillis, auth), path);
+  }
+  
+  protected ZooLock(ZooCache zc, IZooReaderWriter zrw, String path) {
+    getLockDataZooCache = zc;
+    this.path = path;
+    zooKeeper = zrw;
+    try {
+      zooKeeper.getStatus(path, this);
+    } catch (Exception ex) {
+      log.warn("Error getting setting initial watch on ZooLock", ex);
+    }
+  }
+  
+  private static class TryLockAsyncLockWatcher implements AsyncLockWatcher {
+    
+    boolean acquiredLock = false;
+    LockWatcher lw;
+    
+    public TryLockAsyncLockWatcher(LockWatcher lw2) {
+      this.lw = lw2;
+    }
+    
+    @Override
+    public void acquiredLock() {
+      acquiredLock = true;
+    }
+    
+    @Override
+    public void failedToAcquireLock(Exception e) {}
+    
+    @Override
+    public void lostLock(LockLossReason reason) {
+      lw.lostLock(reason);
+    }
+    
+  }
+  
+  public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException, InterruptedException {
+    
+    TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
+    
+    lockAsync(tlalw, data);
+    
+    if (tlalw.acquiredLock) {
+      return true;
+    }
+    
+    if (asyncLock != null) {
+      zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+      asyncLock = null;
+    }
+    
+    return false;
+  }
+  
+  private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw) throws KeeperException, InterruptedException {
+    
+    if (asyncLock == null) {
+      throw new IllegalStateException("Called lockAsync() when asyncLock == null");
+    }
+    
+    List<String> children = zooKeeper.getChildren(path);
+    
+    if (!children.contains(myLock)) {
+      throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
+    }
+    
+    Collections.sort(children);
+    
+    if (children.get(0).equals(myLock)) {
+      this.lockWatcher = lw;
+      this.lock = myLock;
+      asyncLock = null;
+      lockWasAcquired = true;
+      lw.acquiredLock();
+      return;
+    }
+    String prev = null;
+    for (String child : children) {
+      if (child.equals(myLock)) {
+        break;
+      }
+      
+      prev = child;
+    }
+    
+    final String lockToWatch = path + "/" + prev;
+    
+    Stat stat = zooKeeper.getStatus(path + "/" + prev, new Watcher() {
+      
+      @Override
+      public void process(WatchedEvent event) {
+        
+        if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockToWatch)) {
+          synchronized (ZooLock.this) {
+            try {
+              if (asyncLock != null) {
+                lockAsync(myLock, lw);
+              } else if (log.isTraceEnabled()) {
+                log.trace("While waiting for another lock " + lockToWatch + " " + myLock + " was deleted");
+              }
+            } catch (Exception e) {
+              if (lock == null) {
+                // have not acquired lock yet
+                lw.failedToAcquireLock(e);
+              }
+            }
+          }
+        }
+        
+        if (event.getState() == KeeperState.Expired) {
+          synchronized (ZooLock.this) {
+            if (lock == null) {
+              lw.failedToAcquireLock(new Exception("Zookeeper Session expired"));
+            }
+          }
+        }
+      }
+      
+    });
+    
+    if (stat == null)
+      lockAsync(myLock, lw);
+  }
+  
+  public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
+    
+    if (lockWatcher != null || lock != null || asyncLock != null) {
+      throw new IllegalStateException();
+    }
+    
+    lockWasAcquired = false;
+    
+    try {
+      String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+      
+      Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+        public void process(WatchedEvent event) {
+          synchronized (ZooLock.this) {
+            if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
+              LockWatcher localLw = lockWatcher;
+              lock = null;
+              lockWatcher = null;
+              
+              localLw.lostLock(LockLossReason.LOCK_DELETED);
+              
+            } else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
+              lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+              asyncLock = null;
+            }
+          }
+        }
+      });
+      
+      if (stat == null) {
+        lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
+        return;
+      }
+      
+      asyncLock = asyncLockPath.substring(path.length() + 1);
+      
+      lockAsync(asyncLock, lw);
+      
+    } catch (KeeperException e) {
+      lw.failedToAcquireLock(e);
+    } catch (InterruptedException e) {
+      lw.failedToAcquireLock(e);
+    }
+  }
+  
+  public synchronized boolean tryToCancelAsyncLockOrUnlock() throws InterruptedException, KeeperException {
+    boolean del = false;
+    
+    if (asyncLock != null) {
+      zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+      del = true;
+    }
+    
+    if (lock != null) {
+      unlock();
+      del = true;
+    }
+    
+    return del;
+  }
+  
+  public synchronized void unlock() throws InterruptedException, KeeperException {
+    if (lock == null) {
+      throw new IllegalStateException();
+    }
+    
+    LockWatcher localLw = lockWatcher;
+    String localLock = lock;
+    
+    lock = null;
+    lockWatcher = null;
+    
+    zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP);
+    
+    localLw.lostLock(LockLossReason.LOCK_DELETED);
+  }
+  
+  public synchronized String getLockPath() {
+    if (lock == null) {
+      return null;
+    }
+    return path + "/" + lock;
+  }
+  
+  public synchronized String getLockName() {
+    return lock;
+  }
+  
+  public synchronized LockID getLockID() {
+    if (lock == null) {
+      throw new IllegalStateException("Lock not held");
+    }
+    return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId());
+  }
+  
+  /**
+   * indicates if the lock was acquired in the past.... helps discriminate between the case where the lock was never held, or held and lost....
+   * 
+   * @return true if the lock was aquired, otherwise false.
+   */
+  public synchronized boolean wasLockAcquired() {
+    return lockWasAcquired;
+  }
+  
+  public synchronized boolean isLocked() {
+    return lock != null;
+  }
+  
+  @Override
+  public synchronized void process(WatchedEvent event) {
+    log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+    
+    if (event.getState() == KeeperState.Expired && lock != null) {
+      LockWatcher localLw = lockWatcher;
+      lock = null;
+      lockWatcher = null;
+      localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+    }
+  }
+  
+  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+    
+    List<String> children = zk.getChildren(lid.path, false);
+    
+    if (children == null || children.size() == 0) {
+      return false;
+    }
+    
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    if (!lid.node.equals(lockNode))
+      return false;
+    
+    Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+    return stat != null && stat.getEphemeralOwner() == lid.eid;
+  }
+  
+  public static boolean isLockHeld(ZooCache zc, LockID lid) {
+    
+    List<String> children = zc.getChildren(lid.path);
+    
+    if (children == null || children.size() == 0) {
+      return false;
+    }
+    
+    children = new ArrayList<String>(children);
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    if (!lid.node.equals(lockNode))
+      return false;
+    
+    Stat stat = new Stat();
+    return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid;
+  }
+  
+  public static byte[] getLockData(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
+    List<String> children = zk.getChildren(path, false);
+    
+    if (children == null || children.size() == 0) {
+      return null;
+    }
+    
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    return zk.getData(path + "/" + lockNode, false, null);
+  }
+  
+  public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path, Stat stat) {
+    
+    List<String> children = zc.getChildren(path);
+    
+    if (children == null || children.size() == 0) {
+      return null;
+    }
+    
+    children = new ArrayList<String>(children);
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    if (!lockNode.startsWith(LOCK_PREFIX)) {
+      throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+    }
+    
+    return zc.get(path + "/" + lockNode, stat);
+  }
+  
+  private static ZooCache getLockDataZooCache;
+  
+  public static byte[] getLockData(String path) {
+    return getLockData(path, null);
+  }
+  
+  public static byte[] getLockData(String path, Stat stat) {
+    return getLockData(getLockDataZooCache, path, stat);
+  }
+  
+  public static long getSessionId(ZooCache zc, String path) throws KeeperException, InterruptedException {
+    List<String> children = zc.getChildren(path);
+    
+    if (children == null || children.size() == 0) {
+      return 0;
+    }
+    
+    children = new ArrayList<String>(children);
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    Stat stat = new Stat();
+    if (zc.get(path + "/" + lockNode, stat) != null)
+      return stat.getEphemeralOwner();
+    return 0;
+  }
+  
+  public long getSessionId() throws KeeperException, InterruptedException {
+    return getSessionId(getLockDataZooCache, path);
+  }
+  
+  public static void deleteLock(IZooReaderWriter zk, String path) throws InterruptedException, KeeperException {
+    List<String> children;
+    
+    children = zk.getChildren(path);
+    
+    if (children == null || children.size() == 0) {
+      throw new IllegalStateException("No lock is held at " + path);
+    }
+    
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    if (!lockNode.startsWith(LOCK_PREFIX)) {
+      throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+    }
+    
+    zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP);
+    
+  }
+  
+  public static boolean deleteLock(IZooReaderWriter zk, String path, String lockData) throws InterruptedException, KeeperException {
+    List<String> children;
+    
+    children = zk.getChildren(path);
+    
+    if (children == null || children.size() == 0) {
+      throw new IllegalStateException("No lock is held at " + path);
+    }
+    
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    if (!lockNode.startsWith(LOCK_PREFIX)) {
+      throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
+    }
+    
+    Stat stat = new Stat();
+    byte[] data = zk.getData(path + "/" + lockNode, stat);
+    
+    if (lockData.equals(new String(data))) {
+      zk.recursiveDelete(path + "/" + lockNode, stat.getVersion(), NodeMissingPolicy.FAIL);
+      return true;
+    }
+    
+    return false;
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
+
+public class ZooQueueLock implements QueueLock {
+  
+  private static final String PREFIX = "lock-";
+  
+  // private static final Logger log = Logger.getLogger(ZooQueueLock.class);
+  
+  private IZooReaderWriter zoo;
+  private String path;
+  private boolean ephemeral;
+  
+  public ZooQueueLock(String zookeepers, int timeInMillis, String auth, String path, boolean ephemeral) throws KeeperException, InterruptedException {
+    this(ZooReaderWriter.getRetryingInstance(zookeepers, timeInMillis, auth), path, ephemeral);
+  }
+  
+  protected ZooQueueLock(IZooReaderWriter zrw, String path, boolean ephemeral) {
+    this.zoo = zrw;
+    this.path = path;
+    this.ephemeral = ephemeral;
+  }
+  
+  @Override
+  public long addEntry(byte[] data) {
+    String newPath;
+    try {
+      while (true) {
+        try {
+          if (ephemeral) {
+            newPath = zoo.putEphemeralSequential(path + "/" + PREFIX, data);
+          } else {
+            newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
+          }
+          String[] parts = newPath.split("/");
+          String last = parts[parts.length - 1];
+          return Long.parseLong(last.substring(PREFIX.length()));
+        } catch (NoNodeException nne) {
+          // the parent does not exist so try to create it
+          zoo.putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
+        }
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  @Override
+  public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
+    SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
+    try {
+      List<String> children = Collections.emptyList();
+      try {
+        children = zoo.getChildren(path);
+      } catch (KeeperException.NoNodeException ex) {
+        // the path does not exist (it was deleted or not created yet), that is ok there are no earlier entries then
+      }
+      
+      for (String name : children) {
+        // this try catch must be done inside the loop because some subset of the children may exist
+        try {
+          byte[] data = zoo.getData(path + "/" + name, null);
+          long order = Long.parseLong(name.substring(PREFIX.length()));
+          if (order <= entry)
+            result.put(order, data);
+        } catch (KeeperException.NoNodeException ex) {
+          // ignored
+        }
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    return result;
+  }
+  
+  @Override
+  public void removeEntry(long entry) {
+    try {
+      zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP);
+      try {
+        // try to delete the parent if it has no children
+        zoo.delete(path, -1);
+      } catch (NotEmptyException nee) {
+        // the path had other lock nodes, no big deal
+      } catch (NoNodeException nne) {
+        // someone else deleted the lock path
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooQueueLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReader implements IZooReader {
+  
+  protected String keepers;
+  protected int timeout;
+  
+  protected ZooKeeper getSession(String keepers, int timeout, String auth) {
+    return ZooSession.getSession(keepers, timeout, auth);
+  }
+  
+  protected ZooKeeper getZooKeeper() {
+    return getSession(keepers, timeout, null);
+  }
+  
+  @Override
+  public byte[] getData(String zPath, Stat stat) throws KeeperException, InterruptedException {
+    return getZooKeeper().getData(zPath, false, stat);
+  }
+  
+  @Override
+  public Stat getStatus(String zPath) throws KeeperException, InterruptedException {
+    return getZooKeeper().exists(zPath, false);
+  }
+  
+  @Override
+  public Stat getStatus(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    return getZooKeeper().exists(zPath, watcher);
+  }
+  
+  @Override
+  public List<String> getChildren(String zPath) throws KeeperException, InterruptedException {
+    return getZooKeeper().getChildren(zPath, false);
+  }
+  
+  @Override
+  public List<String> getChildren(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    return getZooKeeper().getChildren(zPath, watcher);
+  }
+  
+  @Override
+  public boolean exists(String zPath) throws KeeperException, InterruptedException {
+    return getZooKeeper().exists(zPath, false) != null;
+  }
+  
+  @Override
+  public boolean exists(String zPath, Watcher watcher) throws KeeperException, InterruptedException {
+    return getZooKeeper().exists(zPath, watcher) != null;
+  }
+  
+  public ZooReader(String keepers, int timeout) {
+    this.keepers = keepers;
+    this.timeout = timeout;
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.security.SecurityPermission;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.BadVersionException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReaderWriter extends ZooReader implements IZooReaderWriter {
+  
+  private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission");
+  
+  private static ZooReaderWriter instance = null;
+  private static IZooReaderWriter retryingInstance = null;
+  private final String auth;
+  
+  @Override
+  public ZooKeeper getZooKeeper() {
+    SecurityManager sm = System.getSecurityManager();
+    if (sm != null) {
+      sm.checkPermission(ZOOWRITER_PERMISSION);
+    }
+    return getSession(keepers, timeout, auth);
+  }
+  
+  public ZooReaderWriter(String string, int timeInMillis, String auth) {
+    super(string, timeInMillis);
+    this.auth = "accumulo:" + auth;
+  }
+  
+  @Override
+  public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+    ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy);
+  }
+  
+  @Override
+  public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+    ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy);
+  }
+  
+  /**
+   * Create a persistent node with the default ACL
+   * 
+   * @return true if the node was created or altered; false if it was skipped
+   */
+  @Override
+  public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy);
+  }
+  
+  @Override
+  public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy);
+  }
+  
+  @Override
+  public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy);
+  }
+  
+  @Override
+  public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data);
+  }
+  
+  @Override
+  public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
+  }
+  
+  @Override
+  public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy);
+  }
+  
+  @Override
+  public void delete(String path, int version) throws InterruptedException, KeeperException {
+    getZooKeeper().delete(path, version);
+  }
+  
+  public interface Mutator {
+    byte[] mutate(byte[] currentValue) throws Exception;
+  }
+  
+  @Override
+  public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) throws Exception {
+    if (createValue != null) {
+      try {
+        getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT);
+        return createValue;
+      } catch (NodeExistsException ex) {
+        // expected
+      }
+    }
+    do {
+      Stat stat = new Stat();
+      byte[] data = getZooKeeper().getData(zPath, false, stat);
+      data = mutator.mutate(data);
+      if (data == null)
+        return data;
+      try {
+        getZooKeeper().setData(zPath, data, stat.getVersion());
+        return data;
+      } catch (BadVersionException ex) {
+        //
+      }
+    } while (true);
+  }
+  
+  public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String auth) {
+    if (instance == null)
+      instance = new ZooReaderWriter(zookeepers, timeInMillis, auth);
+    return instance;
+  }
+  
+  /**
+   * get an instance that retries when zookeeper connection errors occur
+   * 
+   * @return an instance that retries when Zookeeper connection errors occur.
+   */
+  public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String auth) {
+    
+    if (retryingInstance == null) {
+      final IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, auth);
+      
+      InvocationHandler ih = new InvocationHandler() {
+        @Override
+        public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
+          long retryTime = 250;
+          while (true) {
+            try {
+              return method.invoke(inst, args);
+            } catch (InvocationTargetException e) {
+              if (e.getCause() instanceof KeeperException.ConnectionLossException) {
+                Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause());
+                UtilWaitThread.sleep(retryTime);
+                retryTime = Math.min(5000, retryTime + 250);
+              } else {
+                throw e.getCause();
+              }
+            }
+          }
+        }
+      };
+      
+      retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih);
+    }
+    
+    return retryingInstance;
+  }
+  
+  @Override
+  public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException {
+    return ZooUtil.isLockHeld(getZooKeeper(), lockID);
+  }
+  
+  @Override
+  public void mkdirs(String path) throws KeeperException, InterruptedException {
+    if (path.equals(""))
+      return;
+    if (!path.startsWith("/"))
+      throw new IllegalArgumentException(path + "does not start with /");
+    if (getZooKeeper().exists(path, false) != null)
+      return;
+    String parent = path.substring(0, path.lastIndexOf("/"));
+    mkdirs(parent);
+    putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP);
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooReservation {
+  
+  public static boolean attempt(IZooReaderWriter zk, String path, String reservationID, String debugInfo) throws KeeperException, InterruptedException {
+    if (reservationID.contains(":"))
+      throw new IllegalArgumentException();
+    
+    while (true) {
+      try {
+        zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(), NodeExistsPolicy.FAIL);
+        return true;
+      } catch (NodeExistsException nee) {
+        Stat stat = new Stat();
+        byte[] zooData;
+        try {
+          zooData = zk.getData(path, stat);
+        } catch (NoNodeException nne) {
+          continue;
+        }
+        
+        String idInZoo = new String(zooData).split(":")[0];
+        
+        return idInZoo.equals(new String(reservationID));
+      }
+    }
+    
+  }
+  
+  public static void release(IZooReaderWriter zk, String path, String reservationID) throws KeeperException, InterruptedException {
+    Stat stat = new Stat();
+    byte[] zooData;
+    
+    try {
+      zooData = zk.getData(path, stat);
+    } catch (NoNodeException e) {
+      // TODO log warning? this may happen as a normal course of business.... could return a boolean...
+      Logger.getLogger(ZooReservation.class).debug("Node does not exist " + path);
+      return;
+    }
+    
+    String idInZoo = new String(zooData).split(":")[0];
+    
+    if (!idInZoo.equals(new String(reservationID))) {
+      throw new IllegalStateException("Tried to release reservation " + path + " with data mismatch " + new String(reservationID) + " " + new String(zooData));
+    }
+    
+    zk.recursiveDelete(path, stat.getVersion(), NodeMissingPolicy.SKIP);
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReservation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+
+class ZooSession {
+  
+  private static final Logger log = Logger.getLogger(ZooSession.class);
+  
+  private static class ZooSessionInfo {
+    public ZooSessionInfo(ZooKeeper zooKeeper, ZooWatcher watcher) {
+      this.zooKeeper = zooKeeper;
+    }
+    
+    ZooKeeper zooKeeper;
+  }
+  
+  private static Map<String,ZooSessionInfo> sessions = new HashMap<String,ZooSessionInfo>();
+  
+  private static String sessionKey(String keepers, int timeout, String auth) {
+    return keepers + ":" + timeout + ":" + (auth == null ? "" : auth);
+  }
+  
+  private static class ZooWatcher implements Watcher {
+    
+    private HashSet<Watcher> watchers = new HashSet<Watcher>();
+    
+    public void process(WatchedEvent event) {
+      // copy the watchers, in case the callback adds() more Watchers
+      // otherwise we get a ConcurrentModificationException
+      Collection<Watcher> watcherCopy = new ArrayList<Watcher>(watchers);
+      
+      for (Watcher watcher : watcherCopy) {
+        watcher.process(event);
+      }
+      
+      if (event.getState() == KeeperState.Expired) {
+        log.debug("Session expired, state of current session : " + event.getState());
+      }
+    }
+    
+  }
+  
+  public static ZooKeeper connect(String host, int timeout, String auth, Watcher watcher) {
+    final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+    final int TOTAL_CONNECT_TIME_WAIT_MS = 10 * 1000;
+    boolean tryAgain = true;
+    int sleepTime = 100;
+    ZooKeeper zooKeeper = null;
+    
+    while (tryAgain) {
+      try {
+        zooKeeper = new ZooKeeper(host, timeout, watcher);
+        // it may take some time to get connected to zookeeper if some of the servers are down
+        for (int i = 0; i < TOTAL_CONNECT_TIME_WAIT_MS / TIME_BETWEEN_CONNECT_CHECKS_MS && tryAgain; i++) {
+          if (zooKeeper.getState().equals(States.CONNECTED)) {
+            if (auth != null)
+              zooKeeper.addAuthInfo("digest", auth.getBytes());
+            tryAgain = false;
+          } else
+            UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+        }
+      } catch (UnknownHostException uhe) {
+        // do not expect to recover from this
+        log.warn(uhe.getClass().getName() + " : " + uhe.getMessage());
+        throw new RuntimeException(uhe);
+      } catch (IOException e) {
+        log.warn("Connection to zooKeeper failed, will try again in " + String.format("%.2f secs", sleepTime / 1000.0), e);
+      } finally {
+        if (tryAgain && zooKeeper != null)
+          try {
+            zooKeeper.close();
+            zooKeeper = null;
+          } catch (InterruptedException e) {
+            log.warn("interrupted", e);
+          }
+      }
+      
+      if (tryAgain) {
+        UtilWaitThread.sleep(sleepTime);
+        if (sleepTime < 10000)
+          sleepTime = (int) (sleepTime + sleepTime * Math.random());
+      }
+    }
+    
+    return zooKeeper;
+  }
+  
+  public static synchronized ZooKeeper getSession(String zooKeepers, int timeout) {
+    return getSession(zooKeepers, timeout, null);
+  }
+  
+  public static synchronized ZooKeeper getSession(String zooKeepers, int timeout, String auth) {
+    
+    String sessionKey = sessionKey(zooKeepers, timeout, auth);
+    
+    // a read-only session can use a session with authorizations, so cache a copy for it w/out auths
+    String readOnlySessionKey = sessionKey(zooKeepers, timeout, null);
+    
+    ZooSessionInfo zsi = sessions.get(sessionKey);
+    if (zsi != null && zsi.zooKeeper.getState() == States.CLOSED) {
+      if (auth != null && sessions.get(readOnlySessionKey) == zsi)
+        sessions.remove(readOnlySessionKey);
+      zsi = null;
+      sessions.remove(sessionKey);
+    }
+    
+    if (zsi == null) {
+      ZooWatcher watcher = new ZooWatcher();
+      log.debug("Connecting to " + zooKeepers + " with timeout " + timeout + " with auth");
+      zsi = new ZooSessionInfo(connect(zooKeepers, timeout, auth, watcher), watcher);
+      sessions.put(sessionKey, zsi);
+      if (auth != null && !sessions.containsKey(readOnlySessionKey))
+        sessions.put(readOnlySessionKey, zsi);
+    }
+    return zsi.zooKeeper;
+  }
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java (added)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooUtil {
+  public enum NodeExistsPolicy {
+    SKIP, OVERWRITE, FAIL
+  }
+  
+  public enum NodeMissingPolicy {
+    SKIP, CREATE, FAIL
+  }
+  
+  public static class LockID {
+    public long eid;
+    public String path;
+    public String node;
+    
+    public LockID(String root, String serializedLID) {
+      String sa[] = serializedLID.split("\\$");
+      int lastSlash = sa[0].lastIndexOf('/');
+      
+      if (sa.length != 2 || lastSlash < 0) {
+        throw new IllegalArgumentException("Malformed serialized lock id " + serializedLID);
+      }
+      
+      if (lastSlash == 0)
+        path = root;
+      else
+        path = root + "/" + sa[0].substring(0, lastSlash);
+      node = sa[0].substring(lastSlash + 1);
+      eid = Long.parseLong(sa[1], 16);
+    }
+    
+    public LockID(String path, String node, long eid) {
+      this.path = path;
+      this.node = node;
+      this.eid = eid;
+    }
+    
+    public String serialize(String root) {
+      
+      return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid);
+    }
+    
+    @Override
+    public String toString() {
+      return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
+    }
+  }
+  
+  public static final List<ACL> PRIVATE;
+  public static final List<ACL> PUBLIC;
+  static {
+    PRIVATE = new ArrayList<ACL>();
+    PRIVATE.addAll(Ids.CREATOR_ALL_ACL);
+    PUBLIC = new ArrayList<ACL>();
+    PUBLIC.addAll(PRIVATE);
+    PUBLIC.add(new ACL(Perms.READ, Ids.ANYONE_ID_UNSAFE));
+  }
+  
+  /**
+   * This method will delete a node and all its children from zookeeper
+   * 
+   * @param zPath
+   *          the path to delete
+   */
+  public static void recursiveDelete(ZooKeeper zk, String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+    if (policy.equals(NodeMissingPolicy.CREATE))
+      throw new IllegalArgumentException(policy.name() + " is invalid for this operation");
+    try {
+      for (String child : zk.getChildren(zPath, false))
+        recursiveDelete(zk, zPath + "/" + child, NodeMissingPolicy.SKIP);
+      
+      Stat stat;
+      if ((stat = zk.exists(zPath, null)) != null)
+        zk.delete(zPath, stat.getVersion());
+    } catch (KeeperException e) {
+      if (policy.equals(NodeMissingPolicy.SKIP) && e.code().equals(KeeperException.Code.NONODE))
+        return;
+      throw e;
+    }
+  }
+  
+  public static void recursiveDelete(ZooKeeper zk, String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException {
+    recursiveDelete(zk, zPath, -1, policy);
+  }
+  
+  /**
+   * Create a persistent node with the default ACL
+   * 
+   * @return true if the node was created or altered; false if it was skipped
+   */
+  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PUBLIC);
+  }
+  
+  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException,
+      InterruptedException {
+    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, PUBLIC);
+  }
+  
+  public static boolean putPersistentData(ZooKeeper zk, String zPath, byte[] data, int version, NodeExistsPolicy policy, List<ACL> acls)
+      throws KeeperException, InterruptedException {
+    return putData(zk, zPath, data, CreateMode.PERSISTENT, version, policy, acls);
+  }
+  
+  private static boolean putData(ZooKeeper zk, String zPath, byte[] data, CreateMode mode, int version, NodeExistsPolicy policy, List<ACL> acls)
+      throws KeeperException, InterruptedException {
+    if (policy == null)
+      policy = NodeExistsPolicy.FAIL;
+    
+    while (true) {
+      try {
+        zk.create(zPath, data, acls, mode);
+        return true;
+      } catch (NodeExistsException nee) {
+        switch (policy) {
+          case SKIP:
+            return false;
+          case OVERWRITE:
+            try {
+              zk.setData(zPath, data, version);
+              return true;
+            } catch (NoNodeException nne) {
+              // node delete between create call and set data, so try create call again
+              continue;
+            }
+          default:
+            throw nee;
+        }
+      }
+    }
+  }
+  
+  public static byte[] getData(ZooKeeper zk, String zPath, Stat stat) throws KeeperException, InterruptedException {
+    return zk.getData(zPath, false, stat);
+  }
+  
+  public static Stat getStatus(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
+    return zk.exists(zPath, false);
+  }
+  
+  public static boolean exists(ZooKeeper zk, String zPath) throws KeeperException, InterruptedException {
+    return getStatus(zk, zPath) != null;
+  }
+  
+  public static void recursiveCopyPersistent(ZooKeeper zk, String source, String destination, NodeExistsPolicy policy) throws KeeperException,
+      InterruptedException {
+    Stat stat = null;
+    if (!exists(zk, source))
+      throw KeeperException.create(Code.NONODE, source);
+    if (exists(zk, destination)) {
+      switch (policy) {
+        case OVERWRITE:
+          break;
+        case SKIP:
+          return;
+        case FAIL:
+        default:
+          throw KeeperException.create(Code.NODEEXISTS, source);
+      }
+    }
+    
+    stat = new Stat();
+    byte[] data = zk.getData(source, false, stat);
+    if (stat.getEphemeralOwner() == 0) {
+      if (data == null)
+        throw KeeperException.create(Code.NONODE, source);
+      putPersistentData(zk, destination, data, policy);
+      if (stat.getNumChildren() > 0)
+        for (String child : zk.getChildren(source, false))
+          recursiveCopyPersistent(zk, source + "/" + child, destination + "/" + child, policy);
+    }
+  }
+  
+  public static boolean putPrivatePersistentData(ZooKeeper zk, String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
+    return putData(zk, zPath, data, CreateMode.PERSISTENT, -1, policy, PRIVATE);
+  }
+  
+  public static String putPersistentSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+  
+  public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException {
+    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+  
+  public static byte[] getLockData(ZooCache zc, String path) {
+    
+    List<String> children = zc.getChildren(path);
+    
+    if (children.size() == 0) {
+      return null;
+    }
+    
+    children = new ArrayList<String>(children);
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    
+    return zc.get(path + "/" + lockNode);
+  }
+  
+  public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
+    
+    List<String> children = zk.getChildren(lid.path, false);
+    
+    if (children.size() == 0) {
+      return false;
+    }
+    
+    Collections.sort(children);
+    
+    String lockNode = children.get(0);
+    if (!lid.node.equals(lockNode))
+      return false;
+    
+    Stat stat = zk.exists(lid.path + "/" + lid.node, false);
+    return stat != null && stat.getEphemeralOwner() == lid.eid;
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import junit.framework.Assert;
+
+import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock.QueueLock;
+import org.junit.Test;
+
+public class DistributedReadWriteLockTest {
+  
+  // Non-zookeeper version of QueueLock
+  public static class MockQueueLock implements QueueLock {
+    
+    long next = 0L;
+    SortedMap<Long,byte[]> locks = new TreeMap<Long,byte[]>();
+    
+    @Override
+    synchronized public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
+      SortedMap<Long,byte[]> result = new TreeMap<Long,byte[]>();
+      result.putAll(locks.headMap(entry + 1));
+      return result;
+    }
+    
+    @Override
+    synchronized public void removeEntry(long entry) {
+      synchronized (locks) {
+        locks.remove(entry);
+        locks.notifyAll();
+      }
+    }
+    
+    @Override
+    synchronized public long addEntry(byte[] data) {
+      long result;
+      synchronized (locks) {
+        locks.put(result = next++, data);
+        locks.notifyAll();
+      }
+      return result;
+    }
+  }
+  
+  // some data that is probably not going to update atomically
+  static class SomeData {
+    volatile int[] data = new int[100];
+    volatile int counter;
+    
+    void read() {
+      for (int i = 0; i < data.length; i++)
+        Assert.assertEquals(counter, data[i]);
+    }
+    
+    void write() {
+      ++counter;
+      for (int i = data.length - 1; i >= 0; i--)
+        data[i] = counter;
+    }
+  }
+  
+  @Test
+  public void testLock() throws Exception {
+    final SomeData data = new SomeData();
+    data.write();
+    data.read();
+    QueueLock qlock = new MockQueueLock();
+    
+    final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes());
+    Lock readLock = locker.readLock();
+    Lock writeLock = locker.writeLock();
+    readLock.lock();
+    readLock.unlock();
+    writeLock.lock();
+    writeLock.unlock();
+    readLock.lock();
+    readLock.unlock();
+    
+    // do a bunch of reads/writes in separate threads, look for inconsistent updates
+    Thread[] threads = new Thread[2];
+    for (int i = 0; i < threads.length; i++) {
+      final int which = i;
+      threads[i] = new Thread() {
+        public void run() {
+          if (which % 2 == 0) {
+            Lock wl = locker.writeLock();
+            wl.lock();
+            try {
+              data.write();
+            } finally {
+              wl.unlock();
+            }
+          } else {
+            Lock rl = locker.readLock();
+            rl.lock();
+            data.read();
+            try {
+              data.read();
+            } finally {
+              rl.unlock();
+            }
+          }
+        }
+      };
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      t.join();
+    }
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java?rev=1355481&view=auto
==============================================================================
--- accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java (added)
+++ accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java Fri Jun 29 17:42:35 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TransactionWatcherTest {
+  
+  static class SimpleArbitrator implements TransactionWatcher.Arbitrator {
+    Map<String,List<Long>> map = new HashMap<String,List<Long>>();
+    
+    public synchronized void start(String txType, Long txid) throws Exception {
+      List<Long> txids = map.get(txType);
+      if (txids == null)
+        txids = new ArrayList<Long>();
+      if (txids.contains(txid))
+        throw new Exception("transaction already started");
+      txids.add(txid);
+      map.put(txType, txids);
+    }
+    
+    public synchronized void stop(String txType, Long txid) throws Exception {
+      List<Long> txids = map.get(txType);
+      if (txids != null && txids.contains(txid)) {
+        txids.remove(txids.indexOf(txid));
+        return;
+      }
+      throw new Exception("transaction does not exist");
+    }
+    
+    @Override
+    synchronized public boolean transactionAlive(String txType, long tid) throws Exception {
+      List<Long> txids = map.get(txType);
+      if (txids == null)
+        return false;
+      return txids.contains(tid);
+    }
+    
+  }
+  
+  @Test
+  public void testTransactionWatcher() throws Exception {
+    final String txType = "someName";
+    final long txid = 7;
+    final SimpleArbitrator sa = new SimpleArbitrator();
+    final TransactionWatcher txw = new TransactionWatcher(sa);
+    sa.start(txType, txid);
+    try {
+      sa.start(txType, txid);
+      Assert.fail("simple arbitrator did not throw an exception");
+    } catch (Exception ex) {
+      // expected
+    }
+    txw.isActive(txid);
+    Assert.assertFalse(txw.isActive(txid));
+    txw.run(txType, txid, new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        Assert.assertTrue(txw.isActive(txid));
+        return null;
+      }
+    });
+    Assert.assertFalse(txw.isActive(txid));
+    sa.stop(txType, txid);
+    Assert.assertFalse(sa.transactionAlive(txType, txid));
+    try {
+      txw.run(txType, txid, new Callable<Object>() {
+        @Override
+        public Object call() throws Exception {
+          Assert.fail("Should not be able to start a new work on a discontinued transaction");
+          return null;
+        }
+      });
+      Assert.fail("work against stopped transaction should fail");
+    } catch (Exception ex) {
+      ;
+    }
+    final long txid2 = 9;
+    sa.start(txType, txid2);
+    txw.run(txType, txid2, new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        Assert.assertTrue(txw.isActive(txid2));
+        sa.stop(txType, txid2);
+        try {
+          txw.run(txType, txid2, new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+              Assert.fail("Should not be able to start a new work on a discontinued transaction");
+              return null;
+            }
+          });
+          Assert.fail("work against a stopped transaction should fail");
+        } catch (Exception ex) {
+          // expected
+        }
+        Assert.assertTrue(txw.isActive(txid2));
+        return null;
+      }
+    });
+    
+  }
+  
+}

Propchange: accumulo/trunk/fate/src/test/java/org/apache/accumulo/fate/zookeeper/TransactionWatcherTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/pom.xml?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/pom.xml (original)
+++ accumulo/trunk/pom.xml Fri Jun 29 17:42:35 2012
@@ -48,6 +48,7 @@
   <modules>
     <module>trace</module>
     <module>core</module>
+    <module>fate</module>
     <module>server</module>
     <module>start</module>
     <module>examples</module>
@@ -528,6 +529,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-fate</artifactId>
+        <version>1.5.0-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
         <artifactId>accumulo-start</artifactId>
         <version>1.5.0-SNAPSHOT</version>
       </dependency>

Modified: accumulo/trunk/server/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/pom.xml?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/pom.xml (original)
+++ accumulo/trunk/server/pom.xml Fri Jun 29 17:42:35 2012
@@ -56,6 +56,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-fate</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
     </dependency>
     <dependency>

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/HdfsZooInstance.java Fri Jun 29 17:42:35 2012
@@ -35,8 +35,8 @@ import org.apache.accumulo.core.util.Byt
 import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.core.util.TextUtil;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.zookeeper.ZooLock;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java Fri Jun 29 17:42:35 2012
@@ -30,8 +30,8 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationObserver;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.log4j.Logger;
 

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java?rev=1355481&r1=1355480&r2=1355481&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java Fri Jun 29 17:42:35 2012
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.client.I
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.zookeeper.ZooCache;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance.AccumuloNotInitializedException;
 import org.apache.log4j.Logger;



Mime
View raw message