accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1451708 - in /accumulo/branches/1.4: ./ src/ src/core/ src/server/ src/server/src/ src/server/src/main/java/org/apache/accumulo/server/gc/ src/server/src/main/java/org/apache/accumulo/server/master/ src/server/src/main/java/org/apache/accu...
Date Fri, 01 Mar 2013 20:17:40 GMT
Author: kturner
Date: Fri Mar  1 20:17:39 2013
New Revision: 1451708

URL: http://svn.apache.org/r1451708
Log:
merged some bug fixes from 1.5:
ACCUMULO-1125 delete distributed work queue task lock when task fails
ACCUMULO-1049 modified master to stop locking tserver nodes and just monitor  tserver nodes
in zookeeper
ACCUMULO-954 made zoolock report when its no longer able to monitor lock node and there does
not know the status of the lock
ACCUMULO-954 Made zoolock rewatch its parent node and added some unit test for zoolock

Modified:
    accumulo/branches/1.4/   (props changed)
    accumulo/branches/1.4/src/   (props changed)
    accumulo/branches/1.4/src/core/   (props changed)
    accumulo/branches/1.4/src/server/   (props changed)
    accumulo/branches/1.4/src/server/src/   (props changed)
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/
  (props changed)
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
  (contents, props changed)

Propchange: accumulo/branches/1.4/
------------------------------------------------------------------------------
  Merged /accumulo/trunk:r1442429,1443790,1444259
  Merged /accumulo/branches/1.5:r1451015

Propchange: accumulo/branches/1.4/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1442429,1443790,1444259
  Merged /accumulo/trunk:r1443790,1444259
  Merged /accumulo/branches/1.5:r1451015
  Merged /accumulo/branches/1.5/src:r1451015

Propchange: accumulo/branches/1.4/src/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src/core:r1442429,1443790,1444259
  Merged /accumulo/trunk/core:r1443790,1444259
  Merged /accumulo/branches/1.5/core:r1451015

Propchange: accumulo/branches/1.4/src/server/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src/server:r1442429,1443790,1444259
  Merged /accumulo/branches/1.5/server:r1451015
  Merged /accumulo/trunk/server:r1443790,1444259

Propchange: accumulo/branches/1.4/src/server/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/server/src:r1451015
  Merged /accumulo/trunk/server/src:r1443790,1444259
  Merged /accumulo/trunk/src/server/src:r1442429,1443790,1444259

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
Fri Mar  1 20:17:39 2013
@@ -369,6 +369,18 @@ public class SimpleGarbageCollector impl
       public void lostLock(LockLossReason reason) {
         Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
       }
+      
+      @Override
+      public void unableToMonitorLockNode(final Throwable e) {
+        Halt.halt(-1, new Runnable() {
+          
+          @Override
+          public void run() {
+            log.fatal("No longer able to monitor lock node ", e);
+          }
+        });
+        
+      }
     };
     
     while (true) {

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
Fri Mar  1 20:17:39 2013
@@ -50,6 +50,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
@@ -191,20 +193,12 @@ public class LiveTServerSet implements W
   }
   
   static class TServerInfo {
-    ZooLock lock;
     TServerConnection connection;
     TServerInstance instance;
-    TServerLockWatcher watcher;
     
-    TServerInfo(ZooLock lock, TServerInstance instance, TServerConnection connection, TServerLockWatcher
watcher) {
-      this.lock = lock;
+    TServerInfo(TServerInstance instance, TServerConnection connection) {
       this.connection = connection;
       this.instance = instance;
-      this.watcher = watcher;
-    }
-    
-    void cleanup() throws InterruptedException, KeeperException {
-      lock.tryToCancelAsyncLockOrUnlock();
     }
   };
   
@@ -230,7 +224,7 @@ public class LiveTServerSet implements W
       public void run() {
         scanServers();
       }
-    }, 0, 1000);
+    }, 0, 5000);
   }
   
   public synchronized void scanServers() {
@@ -239,55 +233,14 @@ public class LiveTServerSet implements W
       final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
       
       final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-      for (String server : getZooCache().getChildren(path)) {
-        // See if we have an async lock in place?
-        TServerInfo info = current.get(server);
-        TServerLockWatcher watcher;
-        ZooLock lock;
-        final String lockPath = path + "/" + server;
-        if (info != null) {
-          // yep: get out the lock/watcher so we can check on it
-          watcher = info.watcher;
-          lock = info.lock;
-        } else {
-          // nope: create a new lock and watcher
-          lock = new ZooLock(lockPath);
-          watcher = new TServerLockWatcher();
-          lock.lockAsync(watcher, "master".getBytes());
-        }
-        TServerInstance instance = null;
-        // Did we win the lock yet?
-        if (!lock.isLocked() && !watcher.gotLock && watcher.failureException
== null) {
-          // Nope... there's a server out there: is this is a new server?
-          if (info == null) {
-            // Yep: hold onto the information about this server
-            Stat stat = new Stat();
-            byte[] lockData = ZooLock.getLockData(lockPath, stat);
-            String lockString = new String(lockData == null ? new byte[] {} : lockData);
-            if (lockString.length() > 0 && !lockString.equals("master")) {
-              ServerServices services = new ServerServices(new String(lockData));
-              InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-              InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
-              TServerConnection conn = new TServerConnection(addr);
-              instance = new TServerInstance(client, stat.getEphemeralOwner());
-              info = new TServerInfo(lock, instance, conn, watcher);
-              current.put(server, info);
-              updates.add(instance);
-            } else {
-              lock.tryToCancelAsyncLockOrUnlock();
-            }
-          }
-        } else {
-          // Yes... there is no server here any more
-          lock.tryToCancelAsyncLockOrUnlock();
-          if (info != null) {
-            doomed.add(info.instance);
-            current.remove(server);
-            info.cleanup();
-          }
-          ZooReaderWriter.getInstance().delete(lockPath, -1);
-        }
+      
+      HashSet<String> all = new HashSet<String>(current.keySet());
+      all.addAll(getZooCache().getChildren(path));
+      
+      for (String server : all) {
+        checkServer(updates, doomed, path, server);
       }
+
       // log.debug("Current: " + current.keySet());
       if (!doomed.isEmpty() || !updates.isEmpty())
         this.cback.update(this, doomed, updates);
@@ -295,10 +248,82 @@ public class LiveTServerSet implements W
       log.error(ex, ex);
     }
   }
+
+  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException
{
+    try {
+      ZooReaderWriter.getInstance().delete(serverNode, -1);
+    } catch (NotEmptyException ex) {
+      // race condition: tserver created the lock after our last check; we'll see it at the
next check
+    } catch (NoNodeException nne) {
+      // someone else deleted it
+    }
+  }
   
+  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance>
doomed, final String path, final String server)
+      throws TException, InterruptedException, KeeperException {
+
+    TServerInfo info = current.get(server);
+    
+    final String lockPath = path + "/" + server;
+    Stat stat = new Stat();
+    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+
+    if (lockData == null) {
+      if (info != null) {
+        doomed.add(info.instance);
+        current.remove(server);
+      }
+      
+      deleteServerNode(path + "/" + server);
+    } else {
+      ServerServices services = new ServerServices(new String(lockData));
+      InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+      InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+      
+      if (info == null) {
+        updates.add(instance);
+        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+      } else if (!info.instance.equals(instance)) {
+        doomed.add(info.instance);
+        updates.add(instance);
+        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+      }
+    }
+  }
+
   @Override
   public void process(WatchedEvent event) {
-    scanServers();
+
+    // its imporant that these event are propogated by ZooCache, because this ensures when
reading zoocache that is has already processed the event and cleared
+    // relevant nodes before code below reads from zoocache
+
+    if (event.getPath() != null) {
+      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+        scanServers();
+      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+        int pos = event.getPath().lastIndexOf('/');
+
+        // do only if ZTSERVER is parent
+        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS))
{
+          
+          String server = event.getPath().substring(pos + 1);
+          
+          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+          
+          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+          try {
+            checkServer(updates, doomed, path, server);
+            if (!doomed.isEmpty() || !updates.isEmpty())
+              this.cback.update(this, doomed, updates);
+          } catch (Exception ex) {
+            log.error(ex, ex);
+          }
+        }
+      }
+    }
   }
   
   public synchronized TServerConnection getConnection(TServerInstance server) throws TException
{
@@ -343,14 +368,8 @@ public class LiveTServerSet implements W
   }
   
   public synchronized void remove(TServerInstance server) {
-    TServerInfo remove = current.remove(server.hostPort());
-    if (remove != null) {
-      try {
-        remove.cleanup();
-      } catch (Exception e) {
-        log.info("error cleaning up connection to server", e);
-      }
-    }
+    current.remove(server.hostPort());
+
     log.info("Removing zookeeper lock for " + server);
     String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort();
     try {

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
Fri Mar  1 20:17:39 2013
@@ -2144,6 +2144,17 @@ public class Master implements LiveTServ
       public void lostLock(LockLossReason reason) {
         Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
       }
+      
+      @Override
+      public void unableToMonitorLockNode(final Throwable e) {
+        Halt.halt(-1, new Runnable() {
+          @Override
+          public void run() {
+            log.fatal("No longer able to monitor master lock node", e);
+          }
+        });
+        
+      }
     };
     long current = System.currentTimeMillis();
     final long waitTime = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
Fri Mar  1 20:17:39 2013
@@ -40,4 +40,10 @@ class TServerLockWatcher implements Asyn
   @Override
   public void lostLock(LockLossReason reason) {}
   
-}
\ No newline at end of file
+  @Override
+  public void unableToMonitorLockNode(Throwable e) {
+    // TODO Auto-generated method stub
+    
+  }
+  
+}

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Fri Mar  1 20:17:39 2013
@@ -2652,6 +2652,17 @@ public class TabletServer extends Abstra
             }
           });
         }
+        
+        @Override
+        public void unableToMonitorLockNode(final Throwable e) {
+          Halt.halt(0, new Runnable() {
+            @Override
+            public void run() {
+              log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
+            }
+          });
+          
+        }
       };
       
       byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT).toString().getBytes();

Propchange: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar  1 20:17:39 2013
@@ -0,0 +1,9 @@
+/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/test/functional:1309369,1328076,1330246,1330264,1349971,1354669
+/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional:1356400
+/accumulo/branches/1.4.2/src/server/src/main/java/org/apache/accumulo/server/test/functional:1399210,1402681
+/accumulo/branches/ACCUMULO-672/server/src/main/java/org/apache/accumulo/server/test/functional:1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/server/test/functional:1433477
+/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional:1329425,1332224,1332278,1332347,1333047,1333070,1337210,1341000,1342452,1344302,1344358,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398090,1423994,1424036,1424115,1426302,1437054,1443790,1444075,1444259,1444826,1445876,1446290,1446314
+/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398290,1415914,1423994,1424036,1424050,1424060,1424099,1424115,1426302,1437054,1442429,1443790,1444075,1444259,1444826,1445876,1446314
+/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional:1443790
+/incubator/accumulo/branches/1.4.0rc/src/server/src/main/java/org/apache/accumulo/server/test/functional:1304025,1305326

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
Fri Mar  1 20:17:39 2013
@@ -88,6 +88,11 @@ public class SplitRecoveryTest extends F
         System.exit(-1);
         
       }
+      
+      @Override
+      public void unableToMonitorLockNode(Throwable e) {
+        System.exit(-1);
+      }
     }, "foo".getBytes());
     
     if (!gotLock) {

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
Fri Mar  1 20:17:39 2013
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Random;
 
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
 import org.apache.accumulo.cloudtrace.thrift.TInfo;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -115,6 +116,16 @@ public class ZombieTServer {
           System.exit(1);
         }
       }
+      
+      @Override
+      public void unableToMonitorLockNode(Throwable e) {
+        try {
+          tch.halt(Tracer.traceInfo(), null, null);
+        } catch (Exception ex) {
+          log.error(ex, ex);
+          System.exit(1);
+        }
+      }
     };
     
     byte[] lockContent = new ServerServices(addressString, Service.TSERV_CLIENT).toString().getBytes();

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
Fri Mar  1 20:17:39 2013
@@ -107,16 +107,16 @@ public class DistributedWorkQueue {
                   log.error("Error received when trying to delete entry in zookeeper " +
childPath, e);
                 }
                 
-                try {
-                  zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
-                } catch (Exception e) {
-                  log.error("Error received when trying to delete entry in zookeeper " +
childPath, e);
-                }
-                
               } catch (Exception e) {
                 log.warn("Failed to process work " + child, e);
               }
               
+              try {
+                zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+              } catch (Exception e) {
+                log.error("Error received when trying to delete entry in zookeeper " + childPath,
e);
+              }
+
             } finally {
               numTask.decrementAndGet();
             }

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
Fri Mar  1 20:17:39 2013
@@ -43,6 +43,13 @@ public class ZooLock implements Watcher 
   
   public interface LockWatcher {
     void lostLock(LockLossReason reason);
+    
+    /**
+     * lost the ability to monitor the lock node, and its status is unknown
+     * 
+     * @param e
+     */
+    void unableToMonitorLockNode(Throwable e);
   }
   
   public interface AsyncLockWatcher extends LockWatcher {
@@ -56,7 +63,7 @@ public class ZooLock implements Watcher 
   final private IZooReaderWriter zooKeeper;
   private String lock;
   private LockWatcher lockWatcher;
-  
+  private boolean watchingParent = false;
   private String asyncLock;
   
   public ZooLock(String path) {
@@ -64,8 +71,10 @@ public class ZooLock implements Watcher 
     zooKeeper = ZooReaderWriter.getInstance();
     try {
       zooKeeper.getStatus(path, this);
+      watchingParent = true;
     } catch (Exception ex) {
       log.warn("Error getting setting initial watch on ZooLock", ex);
+      throw new RuntimeException(ex);
     }
   }
   
@@ -91,6 +100,11 @@ public class ZooLock implements Watcher 
       lw.lostLock(reason);
     }
     
+    @Override
+    public void unableToMonitorLockNode(Throwable e) {
+      lw.unableToMonitorLockNode(e);
+    }
+    
   }
   
   public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException,
InterruptedException {
@@ -126,6 +140,9 @@ public class ZooLock implements Watcher 
     Collections.sort(children);
     
     if (children.get(0).equals(myLock)) {
+      if (!watchingParent) {
+        throw new IllegalStateException("Can not acquire lock, no longer watching parent
: " + path);
+      }
       this.lockWatcher = lw;
       this.lock = myLock;
       asyncLock = null;
@@ -165,7 +182,7 @@ public class ZooLock implements Watcher 
             }
           }
         }
-        
+
         if (event.getState() == KeeperState.Expired) {
           synchronized (ZooLock.this) {
             if (lock == null) {
@@ -181,6 +198,14 @@ public class ZooLock implements Watcher 
       lockAsync(myLock, lw);
   }
   
+  private void lostLock(LockLossReason reason) {
+    LockWatcher localLw = lockWatcher;
+    lock = null;
+    lockWatcher = null;
+    
+    localLw.lostLock(reason);
+  }
+
   public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
     
     if (lockWatcher != null || lock != null || asyncLock != null) {
@@ -190,22 +215,37 @@ public class ZooLock implements Watcher 
     lockWasAcquired = false;
     
     try {
-      String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+      final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX,
data);
       
       Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+        
+        private void failedToAcquireLock(){
+          lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+          asyncLock = null;
+        }
+        
         public void process(WatchedEvent event) {
           synchronized (ZooLock.this) {
             if (lock != null && event.getType() == EventType.NodeDeleted &&
event.getPath().equals(path + "/" + lock)) {
-              LockWatcher localLw = lockWatcher;
-              lock = null;
-              lockWatcher = null;
-              
-              localLw.lostLock(LockLossReason.LOCK_DELETED);
-              
+              lostLock(LockLossReason.LOCK_DELETED);
             } else if (asyncLock != null && event.getType() == EventType.NodeDeleted
&& event.getPath().equals(path + "/" + asyncLock)) {
-              lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
-              asyncLock = null;
+              failedToAcquireLock();
+            } else if (event.getState() != KeeperState.Expired && (lock != null ||
asyncLock != null)) {
+              log.debug("Unexpected event watching lock node "+event+" "+asyncLockPath);
+              try {
+                Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+                if(stat2 == null){
+                  if(lock != null)
+                    lostLock(LockLossReason.LOCK_DELETED);
+                  else if(asyncLock != null)
+                    failedToAcquireLock();
+                }
+              } catch (Throwable e) {
+                lockWatcher.unableToMonitorLockNode(e);
+                log.error("Failed to stat lock node " + asyncLockPath, e);
+              }
             }
+           
           }
         }
       });
@@ -293,12 +333,26 @@ public class ZooLock implements Watcher 
   public synchronized void process(WatchedEvent event) {
     log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
     
+    watchingParent = false;
+
     if (event.getState() == KeeperState.Expired && lock != null) {
-      LockWatcher localLw = lockWatcher;
-      lock = null;
-      lockWatcher = null;
-      localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+      if (lock != null) {
+        lostLock(LockLossReason.SESSION_EXPIRED);
+      }
+    } else {
+      
+      try { // set the watch on the parent node again
+        zooKeeper.getStatus(path, this);
+        watchingParent = true;
+      } catch (Exception ex) {
+        if (lock != null || asyncLock != null) {
+          lockWatcher.unableToMonitorLockNode(ex);
+          log.error("Error resetting watch on ZooLock " + lock == null ? asyncLock : lock
+ " " + event, ex);
+        }
+      }
+       
     }
+
   }
   
   public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException
{
@@ -455,34 +509,4 @@ public class ZooLock implements Watcher 
     return false;
   }
   
-  public static void main(String[] args) throws Exception {
-    String node = "/test/lock1";
-    ZooLock zl = new ZooLock(node);
-    
-    zl.lockAsync(new AsyncLockWatcher() {
-      
-      @Override
-      public void acquiredLock() {
-        System.out.println("I got the lock");
-      }
-      
-      @Override
-      public void lostLock(LockLossReason reason) {
-        System.out.println("OMG I lost my lock, reason = " + reason);
-        
-      }
-      
-      @Override
-      public void failedToAcquireLock(Exception e) {
-        System.out.println("Failed to acquire lock  ");
-        e.printStackTrace();
-      }
-      
-    }, new byte[0]);
-    
-    while (true) {
-      Thread.sleep(1000);
-    }
-  }
-  
 }

Propchange: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar  1 20:17:39 2013
@@ -0,0 +1,10 @@
+/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1309369,1328076,1330246,1330264,1349971,1354669
+/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1356400
+/accumulo/branches/1.4.2/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1399210,1402681
+/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1451015
+/accumulo/branches/ACCUMULO-672/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java:1442429,1443790
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1433477
+/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1329425,1332224,1332278,1332347,1333047,1333070,1337210,1341000,1342452,1344302,1344358,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398090,1423994,1424036,1424115,1426302,1437054,1443790,1444075,1444259,1444826,1445876,1446290,1446314
+/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398290,1415914,1423994,1424036,1424050,1424060,1424099,1424115,1426302,1437054,1442429,1443790,1444075,1444259,1444826,1445876,1446314
+/incubator/accumulo/branches/1.4.0rc/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1304025,1305326



Mime
View raw message