accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [05/16] git commit: It seems to work at this point
Date Mon, 22 Sep 2014 20:48:05 GMT
It seems to work at this point



git-svn-id: https://svn.apache.org/repos/asf/accumulo/branches/ACCUMULO-CURATOR@1496155 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cba8ab61
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cba8ab61
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cba8ab61

Branch: refs/heads/ACCUMULO-CURATOR
Commit: cba8ab618b3188ad0858ae2f3db976cdfc2b159a
Parents: 518844b
Author: John Vines <vines@apache.org>
Authored: Mon Jun 24 18:38:06 2013 +0000
Committer: John Vines <vines@apache.org>
Committed: Mon Jun 24 18:38:06 2013 +0000

----------------------------------------------------------------------
 .../accumulo/fate/zookeeper/ZooCache.java       |  37 +++-
 .../accumulo/server/conf/TableConfWatcher.java  |  96 ++-------
 .../server/conf/TableConfiguration.java         |  23 +--
 .../accumulo/server/master/LiveTServerSet.java  | 198 ++++++++++---------
 .../master/state/MetaDataTableScanner.java      |   4 +-
 .../master/state/tables/TableManager.java       | 121 +++++-------
 6 files changed, 213 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index 9080ed9..14bb300 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -29,8 +29,10 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher;
 
 /**
  * Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
@@ -45,31 +47,40 @@ public class ZooCache {
   private CuratorFramework curator;
   
   public ZooCache(String zooKeepers, int sessionTimeout) {
-    this(zooKeepers, sessionTimeout, null);
+    this(CuratorUtil.constructCurator(zooKeepers, sessionTimeout, null));
   }
   
-  public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
-    this(CuratorUtil.constructCurator(zooKeepers, sessionTimeout, null), watcher);
-  }
-  
-  public ZooCache(CuratorFramework curator, Watcher watcher) {
+  public ZooCache(CuratorFramework curator) {
     this.curator = curator;
     this.nodeCache = new HashMap<String,NodeCache>();
     this.childrenCache = new HashMap<String,PathChildrenCache>();
   }
   
   public synchronized List<ChildData> getChildren(final String zPath) {
+    return getChildren(zPath, null);
+  }
+  
+  public synchronized List<ChildData> getChildren(String zPath, PathChildrenCacheListener
listener) {
     PathChildrenCache cache = childrenCache.get(zPath);
     if (cache == null) {
       cache = new PathChildrenCache(curator, zPath, true);
+      if (listener != null) {
+        cache.getListenable().addListener(listener);
+      }
       try {
         cache.start(StartMode.BUILD_INITIAL_CACHE);
+        // I'll do it myself!
+        if (listener != null)
+          for (ChildData cd : cache.getCurrentData()) {
+            listener.childEvent(curator, new PathChildrenCacheEvent(Type.INITIALIZED, cd));
+          }
         
-        // Because parent's children are being watched, we don't need a node watcher on the
individual node
+        // Because parent's children are being watched, we don't need to cache the individual
node
+        // UNLESS we have a listener on it
         for (ChildData child : cache.getCurrentData()) {
           NodeCache childCache = nodeCache.get(child.getPath());
-          if (childCache != null)
-          {
+          if (childCache != null && childCache.getListenable().size() == 0) {
+            log.debug("Removing cache " + childCache.getCurrentData().getPath() + " because
parent cache was added");
             childCache.close();
             nodeCache.remove(child.getPath());
           }
@@ -85,6 +96,8 @@ public class ZooCache {
         return null;
       }
       childrenCache.put(zPath, cache);
+    } else if (listener != null) {
+      log.debug("LISTENER- cache is null for path " + zPath + ", but got listener " + listener.getClass()
+ ". this is a broken case!");
     }
     return cache.getCurrentData();
   }
@@ -168,6 +181,10 @@ public class ZooCache {
     childrenCache.clear();
   }
   
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+  
   public synchronized void clear(String zPath) {
     List<String> pathsToRemove = new ArrayList<String>();
     for (Iterator<String> i = nodeCache.keySet().iterator(); i.hasNext();) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
index c407309..e8c18a5 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
@@ -16,95 +16,35 @@
  */
 package org.apache.accumulo.server.conf;
 
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.log4j.Level;
+import org.apache.accumulo.fate.curator.CuratorUtil;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
-class TableConfWatcher implements Watcher {
-  static {
-    Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
-    Logger.getLogger("org.apache.hadoop.io.compress").setLevel(Level.WARN);
-  }
-  
+class TableConfWatcher implements PathChildrenCacheListener {
   private static final Logger log = Logger.getLogger(TableConfWatcher.class);
-  private Instance instance = null;
-  
-  TableConfWatcher(Instance instance) {
-    this.instance = instance;
+  private TableConfiguration tableConfig;
+
+  TableConfWatcher(TableConfiguration tableConfiguration) {
+    tableConfig = tableConfiguration;
   }
   
   @Override
-  public void process(WatchedEvent event) {
-    String path = event.getPath();
+  public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
     if (log.isTraceEnabled())
-      log.trace("WatchEvent : " + path + " " + event.getState() + " " + event.getType());
-    
-    String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/";
-    
-    String tableId = null;
-    String key = null;
-    
-    if (path != null) {
-      if (path.startsWith(tablesPrefix)) {
-        tableId = path.substring(tablesPrefix.length());
-        if (tableId.contains("/")) {
-          tableId = tableId.substring(0, tableId.indexOf('/'));
-          if (path.startsWith(tablesPrefix + tableId + Constants.ZTABLE_CONF + "/"))
-            key = path.substring((tablesPrefix + tableId + Constants.ZTABLE_CONF + "/").length());
-        }
-      }
-      
-      if (tableId == null) {
-        log.warn("Zookeeper told me about a path I was not watching " + path + " state="
+ event.getState() + " type=" + event.getType());
-        return;
-      }
-    }
+      log.trace("WatchEvent : " + event.getData().getPath() + " " + event.getType());
+    String key = CuratorUtil.getNodeName(event.getData());
     
     switch (event.getType()) {
-      case NodeDataChanged:
-        if (log.isTraceEnabled())
-          log.trace("EventNodeDataChanged " + event.getPath());
-        if (key != null)
-          ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key);
-        break;
-      case NodeChildrenChanged:
-        ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key);
-        break;
-      case NodeDeleted:
-        if (key == null) {
-          // only remove the AccumuloConfiguration object when a
-          // table node is deleted, not when a tables property is
-          // deleted.
-          ServerConfiguration.removeTableIdInstance(tableId);
-        }
-        break;
-      case None:
-        switch (event.getState()) {
-          case Expired:
-            ServerConfiguration.expireAllTableObservers();
-            break;
-          case SyncConnected:
-            break;
-          case Disconnected:
-            break;
-          default:
-            log.warn("EventNone event not handled path = " + event.getPath() + " state="
+ event.getState());
-        }
-        break;
-      case NodeCreated:
-        switch (event.getState()) {
-          case SyncConnected:
-            break;
-          default:
-            log.warn("Event NodeCreated event not handled path = " + event.getPath() + "
state=" + event.getState());
-        }
+      case INITIALIZED:
+      case CHILD_ADDED:
+      case CHILD_UPDATED:
+      case CHILD_REMOVED:
+        tableConfig.propertyChanged(key);
         break;
       default:
-        log.warn("Event not handled path = " + event.getPath() + " state=" + event.getState()
+ " type = " + event.getType());
+        log.debug("Unhandled state " + event.getType() + " encountered for table " + tableConfig.getTableId()
+ ". Ignoring.");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 12e83da..456cf34 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -26,14 +26,13 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ConfigurationObserver;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.curator.CuratorUtil;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.log4j.Logger;
 
@@ -53,20 +52,14 @@ public class TableConfiguration extends AccumuloConfiguration {
     this.parent = parent;
     
     this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
-  }
-  
-  /**
-   * @deprecated not for client use
-   */
-  @Deprecated
-  private static ZooCache getTablePropCache() {
-    Instance inst = HdfsZooInstance.getInstance();
+    
     if (tablePropCache == null)
       synchronized (TableConfiguration.class) {
         if (tablePropCache == null)
-          tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(),
new TableConfWatcher(inst));
+          tablePropCache = new ZooCache(HdfsZooInstance.getInstance().getConfiguration());
       }
-    return tablePropCache;
+    String confPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + '/' + table + Constants.ZTABLE_CONF;
+    tablePropCache.getChildren(confPath, new TableConfWatcher(this));
   }
   
   public void addObserver(ConfigurationObserver co) {
@@ -100,7 +93,7 @@ public class TableConfiguration extends AccumuloConfiguration {
       co.propertyChanged(key);
   }
   
-  public void propertiesChanged(String key) {
+  public void propertiesChanged() {
     Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
     for (ConfigurationObserver co : copy)
       co.propertiesChanged();
@@ -120,7 +113,7 @@ public class TableConfiguration extends AccumuloConfiguration {
   
   private String get(String key) {
     String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF
+ "/" + key;
-    ChildData v = getTablePropCache().get(zPath);
+    ChildData v = tablePropCache.get(zPath);
     String value = null;
     if (v != null)
       value = new String(v.getData());
@@ -135,7 +128,7 @@ public class TableConfiguration extends AccumuloConfiguration {
       entries.put(parentEntry.getKey(), parentEntry.getValue());
     
     String path = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF;
-    List<ChildData> children = getTablePropCache().getChildren(path);
+    List<ChildData> children = tablePropCache.getChildren(path);
     if (children != null) {
       for (ChildData child : children) {
         entries.put(CuratorUtil.getNodeName(child), new String(child.getData()));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
index 85da665..6525512 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
@@ -20,9 +20,14 @@ import static org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
 
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.Constants;
@@ -36,6 +41,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.curator.CuratorUtil;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.AddressUtil;
@@ -45,18 +51,18 @@ import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NotEmptyException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
-public class LiveTServerSet implements Watcher {
+public class LiveTServerSet {
   
   public interface Listener {
     void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance>
added);
@@ -214,124 +220,133 @@ public class LiveTServerSet implements Watcher {
   
   public synchronized ZooCache getZooCache() {
     if (zooCache == null)
-      zooCache = new ZooCache(this);
+      zooCache = new ZooCache();
     return zooCache;
   }
   
   public synchronized void startListeningForTabletServerChanges() {
-    scanServers();
     SimpleTimer.getInstance().schedule(new Runnable() {
       @Override
       public void run() {
-        scanServers();
+        synchronized (locklessServers) {
+          if (!locklessServers.isEmpty()) {
+            List<String> toRemove = new ArrayList<String>();
+            for (Entry<String,Long> entry : locklessServers.entrySet()) {
+              if (System.currentTimeMillis() - entry.getValue() > 600000) {
+                deleteServerNode(entry.getKey());
+                toRemove.add(entry.getKey());
+              }
+            }
+            locklessServers.keySet().removeAll(toRemove);
+          }
+        }
       }
     }, 0, 5000);
+    
+    Collection<ChildData> result = getZooCache().getChildren(ZooUtil.getRoot(instance)
+ Constants.ZTSERVERS, serversListener);
+    log.debug("Attaching SERVERSLISTENER to " + (ZooUtil.getRoot(instance) + Constants.ZTSERVERS)
+ " - received " + result);
   }
   
-  public synchronized void scanServers() {
-    try {
-      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
-      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-      
-      final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-      
-      HashSet<String> all = new HashSet<String>(current.keySet());
-      all.addAll(getZooCache().getChildKeys(path));
-      
-      locklessServers.keySet().retainAll(all);
-      
-      for (String server : all) {
-        checkServer(updates, doomed, path, server);
-      }
-      
-      // log.debug("Current: " + current.keySet());
-      if (!doomed.isEmpty() || !updates.isEmpty())
-        this.cback.update(this, doomed, updates);
-    } catch (Exception ex) {
-      log.error(ex, ex);
-    }
-  }
-  
-  private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException
{
+  private void deleteServerNode(String server) {
     try {
-      ZooReaderWriter.getInstance().delete(serverNode, -1);
+      getZooCache().getCurator().delete().forPath(ZooUtil.getRoot(instance) + Constants.ZTSERVERS
+ '/' + server);
     } catch (NotEmptyException ex) {
       // race condition: tserver created the lock after our last check; we'll see it at the
next check
     } catch (NoNodeException nne) {
       // someone else deleted it
+    } catch (Exception e) {
+      // Some other curator exception, we don't care that much here.
+      log.error(e,e);
     }
   }
   
-  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance>
doomed, final String path, final String server)
-      throws TException, InterruptedException, KeeperException {
-    
-    TServerInfo info = current.get(server);
+  private ServersDirectoryListener serversListener = new ServersDirectoryListener(this);
+  private TServerLockListener lockListener = new TServerLockListener(this);
+  
+  private class ServersDirectoryListener implements PathChildrenCacheListener {
+    LiveTServerSet liveTServerSet;
     
-    final String lockPath = path + "/" + server;
-    ChildData lockData = ZooLock.getLockData(getZooCache(), lockPath);
+    public ServersDirectoryListener(LiveTServerSet liveTServerSet) {
+      this.liveTServerSet = liveTServerSet;
+    }
     
-    if (lockData == null) {
-      if (info != null) {
-        doomed.add(info.instance);
-        current.remove(server);
-      }
-      
-      Long firstSeen = locklessServers.get(server);
-      if (firstSeen == null) {
-        locklessServers.put(server, System.currentTimeMillis());
-      } else if (System.currentTimeMillis() - firstSeen > 600000) {
-        deleteServerNode(path + "/" + server);
-        locklessServers.remove(server);
-      }
-    } else {
-      locklessServers.remove(server);
-      ServerServices services = new ServerServices(new String(lockData.getData()));
-      InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-      InetSocketAddress addr = AddressUtil.parseAddress(server);
-      TServerInstance instance = new TServerInstance(client, lockData.getStat().getEphemeralOwner());
+    @Override
+    public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws
Exception {
+      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+      log.debug("SERVERSLISTENER - Received event " + event.getType() + " for node " + event.getData().getPath());
+
+      String server = CuratorUtil.getNodeName(event.getData());
+      TServerInfo info = current.get(server);
       
-      if (info == null) {
-        updates.add(instance);
-        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
-      } else if (!info.instance.equals(instance)) {
-        doomed.add(info.instance);
-        updates.add(instance);
-        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+      switch (event.getType()) {
+        case INITIALIZED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+          getZooCache().getChildren(event.getData().getPath(), lockListener);
+          break;
+        case CHILD_REMOVED:
+          getZooCache().clear(event.getData().getPath());
+          if (info != null) {
+            doomed.add(info.instance);
+            current.remove(server);
+          }
+          break;
+        default:
+          log.debug("Unhandled state " + event.getType() + " encountered for tserver manager.
Ignoring.");
       }
+      if (!doomed.isEmpty())
+        liveTServerSet.cback.update(liveTServerSet, doomed, Collections.<TServerInstance>
emptySet());
     }
   }
   
-  @Override
-  public void process(WatchedEvent event) {
+  private class TServerLockListener implements PathChildrenCacheListener {
+    LiveTServerSet liveTServerSet;
     
-    // its important that these event are propagated by ZooCache, because this ensures when
reading zoocache that is has already processed the event and cleared
-    // relevant nodes before code below reads from zoocache
+    public TServerLockListener(LiveTServerSet liveTServerSet) {
+      this.liveTServerSet = liveTServerSet;
+    }
     
-    if (event.getPath() != null) {
-      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
-        scanServers();
-      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
-        int pos = event.getPath().lastIndexOf('/');
-        
-        // do only if ZTSERVER is parent
-        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS))
{
-          
-          String server = event.getPath().substring(pos + 1);
-          
-          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
-          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-          
-          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+    @Override
+    public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws
Exception {
+      final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+      final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+      log.debug("LOCKLISTENER - Received event " + event.getType() + " for node " + event.getData().getPath());
+
+      String server = CuratorUtil.getNodeName(CuratorUtil.getNodeParent(event.getData()));
+      TServerInfo info = current.get(server);
+      
+      switch (event.getType()) {
+        case INITIALIZED:
+        case CHILD_ADDED:
+        case CHILD_UPDATED:
+          synchronized (locklessServers) {
+            locklessServers.remove(server);
+          }
+          ServerServices services = new ServerServices(new String(event.getData().getData()));
+          InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+          InetSocketAddress addr = AddressUtil.parseAddress(server);
+          TServerInstance instance = new TServerInstance(client, event.getData().getStat().getEphemeralOwner());
           
-          try {
-            checkServer(updates, doomed, path, server);
-            if (!doomed.isEmpty() || !updates.isEmpty())
-              this.cback.update(this, doomed, updates);
-          } catch (Exception ex) {
-            log.error(ex, ex);
+          if (info == null) {
+            updates.add(instance);
+            current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+          } else if (!info.instance.equals(instance)) {
+            doomed.add(info.instance);
+            updates.add(instance);
+            current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
           }
-        }
+          break;
+        case CHILD_REMOVED:
+          synchronized (locklessServers) {
+            locklessServers.put(server, System.currentTimeMillis());
+          }
+          break;
+        default:
+          log.debug("Unhandled state " + event.getType() + " encountered for tserver lock
manager. Ignoring.");
       }
+
+      if (!doomed.isEmpty() || !updates.isEmpty())
+        liveTServerSet.cback.update(liveTServerSet, doomed, updates);
     }
   }
   
@@ -354,6 +369,7 @@ public class LiveTServerSet implements Watcher {
     for (TServerInfo c : current.values()) {
       result.add(c.instance);
     }
+    log.debug("Returning " + result + " for current tservers");
     return result;
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index 7570908..ac9d97f 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -58,7 +58,9 @@ public class MetaDataTableScanner implements Iterator<TabletLocationState>
{
       mdScanner.setRanges(Collections.singletonList(range));
       iter = mdScanner.iterator();
     } catch (Exception ex) {
-      mdScanner.close();
+      log.error(ex, ex);
+      if (mdScanner != null)
+        mdScanner.close();
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba8ab61/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
b/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
index 6b3a3c0..040d080 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
@@ -37,12 +37,13 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
 
 public class TableManager {
   private static SecurityPermission TABLE_MANAGER_PERMISSION = new SecurityPermission("tableManagerPermission");
@@ -82,7 +83,8 @@ public class TableManager {
   
   private TableManager() {
     instance = HdfsZooInstance.getInstance();
-    zooStateCache = new ZooCache(new TableStateWatcher());
+    zooStateCache = new ZooCache();
+    setupListeners();
     updateTableStateCache();
   }
   
@@ -159,10 +161,11 @@ public class TableManager {
     }
   }
   
-  public TableState updateTableStateCache(String tableId) {
+  // tableId argument for better debug statements
+  private TableState updateTableStateCache(ChildData node, String tableId) {
     synchronized (tableStateCache) {
       TableState tState = TableState.UNKNOWN;
-      byte[] data = zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/"
+ tableId + Constants.ZTABLE_STATE).getData();
+      byte[] data = node.getData();
       if (data != null) {
         String sState = new String(data);
         try {
@@ -177,6 +180,10 @@ public class TableManager {
     }
   }
   
+  public TableState updateTableStateCache(String tableId) {
+    return updateTableStateCache(zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES
+ "/" + tableId + Constants.ZTABLE_STATE), tableId);
+  }
+  
   public void addTable(String tableId, String tableName, NodeExistsPolicy existsPolicy) throws
KeeperException, InterruptedException {
     prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
     updateTableStateCache(tableId);
@@ -221,74 +228,46 @@ public class TableManager {
     return observers.remove(to);
   }
   
-  private class TableStateWatcher implements Watcher {
+  // Sets up cache listeners for the zookeeper cache
+  private void setupListeners() {
+    zooStateCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES, new AllTablesListener());
+  }
+  
+  // This just manages the listeners for each table. Let the table listener do the heavy
lifting
+  private class AllTablesListener implements PathChildrenCacheListener {
     @Override
-    public void process(WatchedEvent event) {
-      if (log.isTraceEnabled())
-        log.trace(event);
-      
-      final String zPath = event.getPath();
-      final EventType zType = event.getType();
-      
-      String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES;
-      String tableId = null;
-      
-      if (zPath != null && zPath.startsWith(tablesPrefix + "/")) {
-        String suffix = zPath.substring(tablesPrefix.length() + 1);
-        if (suffix.contains("/")) {
-          String[] sa = suffix.split("/", 2);
-          if (Constants.ZTABLE_STATE.equals("/" + sa[1]))
-            tableId = sa[0];
-        }
-        if (tableId == null) {
-          log.warn("Unknown path in " + event);
-          return;
-        }
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws
Exception {
+      if (event.getType().equals(Type.CHILD_ADDED)) {
+        zooStateCache.getChildren(event.getData().getPath(), new TableListener());
+      } else if (event.getType().equals(Type.CHILD_REMOVED)) {
+        zooStateCache.clear(event.getData().getPath());
       }
-      
-      switch (zType) {
-        case NodeChildrenChanged:
-          if (zPath != null && zPath.equals(tablesPrefix)) {
-            updateTableStateCache();
-          } else {
-            log.warn("Unexpected path " + zPath);
-          }
-          break;
-        case NodeCreated:
-        case NodeDataChanged:
-          // state transition
-          TableState tState = updateTableStateCache(tableId);
-          log.debug("State transition to " + tState + " @ " + event);
-          synchronized (observers) {
-            for (TableObserver to : observers)
-              to.stateChanged(tableId, tState);
-          }
-          break;
-        case NodeDeleted:
-          if (zPath != null
-              && tableId != null
-              && (zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_STATE)
|| zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_CONF) || zPath
-                  .equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_NAME)))
+    }
+  }
+  
+  private class TableListener implements PathChildrenCacheListener {
+    @Override
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws
Exception {
+      if (event.getData().getPath().endsWith(Constants.ZTABLE_STATE)) {
+        String tableId = CuratorUtil.getNodeName(CuratorUtil.getNodeParent(event.getData().getPath()));
+        String msg = null;
+        switch (event.getType()) {
+          case CHILD_ADDED:
+          case INITIALIZED:
+            msg = "Initializing ";
+          case CHILD_UPDATED:
+            if (msg == null)
+              msg = "Updating ";
+            TableState state = updateTableStateCache(event.getData(), tableId);
+            log.debug(msg + tableId + " to state " + state);
+            break;
+          case CHILD_REMOVED:
             tableStateCache.remove(tableId);
-          break;
-        case None:
-          switch (event.getState()) {
-            case Expired:
-              if (log.isTraceEnabled())
-                log.trace("Session expired " + event);
-              synchronized (observers) {
-                for (TableObserver to : observers)
-                  to.sessionExpired();
-              }
-              break;
-            case SyncConnected:
-            default:
-              if (log.isTraceEnabled())
-                log.trace("Ignored " + event);
-          }
-          break;
-        default:
-          log.warn("Unandled " + event);
+            log.debug("Table " + tableId + " removed.");
+            break;
+          default:
+            log.debug("Unhandled state " + event.getType() + " encountered for table " +
tableId + ". Ignoring.");
+        }
       }
     }
   }


Mime
View raw message