manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1545506 - in /manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core: interfaces/ILockManager.java lockmanager/BaseLockManager.java lockmanager/ZooKeeperConnection.java lockmanager/ZooKeeperLockManager.java
Date Tue, 26 Nov 2013 01:55:11 GMT
Author: kwright
Date: Tue Nov 26 01:55:10 2013
New Revision: 1545506

URL: http://svn.apache.org/r1545506
Log:
Finish zookeeper implementation of service registry

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java?rev=1545506&r1=1545505&r2=1545506&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ILockManager.java
Tue Nov 26 01:55:10 2013
@@ -32,8 +32,6 @@ public interface ILockManager
   // can perform any necessary cleanup if one of the agents processes goes away unexpectedly.
 There is a
   // registration primitive (which can fail if the same guid is used as is already registered
and active), a
   // shutdown primitive (which makes a process id go inactive), and various inspection primitives.
-  // Note well: The transient activity cycle MUST be done within a single thread using a
single
-  // ILockManager implementation - much like a lock.
   
   /** Register a service and begin service activity.
   * This atomic operation creates a permanent registration entry for a service.

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java?rev=1545506&r1=1545505&r2=1545506&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
Tue Nov 26 01:55:10 2013
@@ -68,8 +68,6 @@ public class BaseLockManager implements 
   // can perform any necessary cleanup if one of the agents processes goes away unexpectedly.
 There is a
   // registration primitive (which can fail if the same guid is used as is already registered
and active), a
   // shutdown primitive (which makes a process id go inactive), and various inspection primitives.
-  // Note well: The transient activity cycle MUST be done within a single thread using a
single
-  // ILockManager implementation - much like a lock.
   
   // This implementation of the node infrastructure uses other primitives implemented by
the lock
   // manager for the implementation.  Specifically, instead of synchronizers, we use a write
lock

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java?rev=1545506&r1=1545505&r2=1545506&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperConnection.java
Tue Nov 26 01:55:10 2013
@@ -68,6 +68,119 @@ public class ZooKeeperConnection
     }
   }
 
+  /** Create a transient node.
+  */
+  public void createNode(String nodePath)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      zookeeper.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
+  /** Check whether a node exists.
+  *@param nodePath is the path of the node.
+  *@return true if exists.
+  */
+  public boolean checkNodeExists(String nodePath)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      return zookeeper.exists(nodePath,false) != null;
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
+  /** Delete a node.
+  */
+  public void deleteNode(String nodePath)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      zookeeper.delete(nodePath,-1);
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
+  /** Get the relative paths of all node's children.  If the node does not exist,
+  * return an empty list.
+  */
+  public List<String> getNodeChildren(String nodePath)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      return zookeeper.getChildren(nodePath,false);
+    }
+    catch (KeeperException.NoNodeException e)
+    {
+      return new ArrayList<String>();
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
+  /** Create a persistent child of a node.
+  */
+  public void createChild(String nodePath, String childName)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      while (true)
+      {
+        try
+        {
+          zookeeper.create(nodePath + "/" + childName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+        }
+        catch (KeeperException.NoNodeException e)
+        {
+          try
+          {
+            zookeeper.create(nodePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+          }
+          catch (KeeperException.NodeExistsException e2)
+          {
+          }
+        }
+      }
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
+  /** Delete the child of a node.
+  */
+  public void deleteChild(String nodePath, String childName)
+    throws ManifoldCFException, InterruptedException
+  {
+    try
+    {
+      zookeeper.delete(nodePath + "/" + childName, -1);
+    }
+    catch (KeeperException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e);
+    }
+  }
+  
   /** Obtain a write lock, with no wait.
   *@param lockPath is the lock node path.
   *@return true if the lock was obtained, false otherwise.

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java?rev=1545506&r1=1545505&r2=1545506&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/ZooKeeperLockManager.java
Tue Nov 26 01:55:10 2013
@@ -40,7 +40,10 @@ public class ZooKeeperLockManager extend
   private final static String CONFIGURATION_PATH = "/org.apache.manifoldcf.configuration";
   private final static String RESOURCE_PATH_PREFIX = "/org.apache.manifoldcf.resources-";
   private final static String FLAG_PATH_PREFIX = "/org.apache.manifoldcf.flags-";
-    
+  private final static String SERVICETYPE_LOCK_PATH_PREFIX = "/org.apache.manifoldcf.servicelock-";
+  private final static String SERVICETYPE_ACTIVE_PATH_PREFIX = "/org.apache.manifoldcf.serviceactive-";
+  private final static String SERVICETYPE_REGISTER_PATH_PREFIX = "/org.apache.manifoldcf.service-";
+  
   // ZooKeeper connection pool
   protected static Integer connectionPoolLock = new Integer(0);
   protected static ZooKeeperConnectionPool pool = null;
@@ -73,6 +76,337 @@ public class ZooKeeperLockManager extend
     }
   }
   
+  // The node synchronization model involves keeping track of active agents entities, so
that other entities
+  // can perform any necessary cleanup if one of the agents processes goes away unexpectedly.
 There is a
+  // registration primitive (which can fail if the same guid is used as is already registered
and active), a
+  // shutdown primitive (which makes a process id go inactive), and various inspection primitives.
+  
+  // For the zookeeper implementation, we'll need the following:
+  // - a service-type-specific global write lock transient node
+  // - a service-type-specific permanent root node that has registered services as children
+  // - a service-type-specific transient root node that has active services as children
+  //
+  // This is not necessarily the best implementation that meets the constraints, but it is
straightforward
+  // and will serve until we come up with a better one.
+  
+  /** Register a service and begin service activity.
+  * This atomic operation creates a permanent registration entry for a service.
+  * If the permanent registration entry already exists, this method will not create it or
+  * treat it as an error.  This operation also enters the "active" zone for the service.
 The "active" zone will remain in force until it is
+  * canceled, or until the process is interrupted.  Ideally, the corresponding endServiceActivity
method will be
+  * called when the service shuts down.  Some ILockManager implementations require that this
take place for
+  * proper management.
+  * If the transient registration already exists, it is treated as an error and an exception
will be thrown.
+  * If registration will succeed, then this method may call an appropriate IServiceCleanup
method to clean up either the
+  * current service, or all services on the cluster.
+  *@param serviceType is the type of service.
+  *@param serviceName is the name of the service to register.
+  *@param cleanup is called to clean up either the current service, or all services of this
type, if no other active service exists
+  */
+  @Override
+  public void registerServiceBeginServiceActivity(String serviceType, String serviceName,
IServiceCleanup cleanup)
+    throws ManifoldCFException
+  {
+    try
+    {
+      ZooKeeperConnection connection = pool.grab();
+      try
+      {
+        enterServiceRegistryLock(connection, serviceType);
+        try
+        {
+          String activePath = buildServiceTypeActivePath(serviceType, serviceName);
+          if (connection.checkNodeExists(activePath))
+            throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"'
is already active");
+          // First, see where we stand.
+          // We need to find out whether (a) our service is already registered; (b) how many
registered services there are;
+          // (c) whether there are other active services.  But no changes will be made at
this time.
+          String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
+          List<String> children = connection.getNodeChildren(registrationNodePath);
+          boolean foundService = false;
+          boolean foundActiveService = false;
+          for (String registeredServiceName : children)
+          {
+            if (registeredServiceName.equals(serviceName))
+              foundService = true;
+            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+              foundActiveService = true;
+          }
+          
+          // Call the appropriate cleanup.  This will depend on what's actually registered,
and what's active.
+          // If there were no services registered at all when we started, then no cleanup
is needed, just cluster init.
+          // If this fails, we must revert to having our service not be registered and not
be active.
+          boolean unregisterAll = false;
+          if (cleanup != null)
+          {
+            if (children.size() == 0)
+              cleanup.clusterInit();
+            else if (foundService && foundActiveService)
+              cleanup.cleanUpService(serviceName);
+            else if (!foundActiveService)
+            {
+              cleanup.cleanUpAllServices();
+              cleanup.clusterInit();
+              unregisterAll = true;
+            }
+          }
+
+          if (unregisterAll)
+          {
+            // Unregister all (since we did a global cleanup)
+            for (String registeredServiceName : children)
+            {
+              connection.deleteChild(registrationNodePath, registeredServiceName);
+            }
+          }
+
+          // Now, register (if needed)
+          if (!foundService)
+          {
+            connection.createChild(registrationNodePath, serviceName);
+          }
+          
+          // Last, set the appropriate active flag
+          connection.createNode(activePath);
+        }
+        finally
+        {
+          leaveServiceRegistryLock(connection);
+        }
+      }
+      finally
+      {
+        pool.release(connection);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+  }
+  
+  /** Count all active services of a given type.
+  *@param serviceType is the service type.
+  *@return the count.
+  */
+  @Override
+  public int countActiveServices(String serviceType)
+    throws ManifoldCFException
+  {
+    try
+    {
+      ZooKeeperConnection connection = pool.grab();
+      try
+      {
+        enterServiceRegistryLock(connection, serviceType);
+        try
+        {
+          String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
+          List<String> children = connection.getNodeChildren(registrationNodePath);
+          int activeServiceCount = 0;
+          for (String registeredServiceName : children)
+          {
+            if (connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+              activeServiceCount++;
+          }
+          return activeServiceCount;
+        }
+        finally
+        {
+          leaveServiceRegistryLock(connection);
+        }
+      }
+      finally
+      {
+        pool.release(connection);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+  }
+  
+  /** Clean up any inactive services found.
+  * Calling this method will invoke cleanup of one inactive service at a time.
+  * If there are no inactive services around, then false will be returned.
+  * Note that this method will block whatever service it finds from starting up
+  * for the time the cleanup is proceeding.  At the end of the cleanup, if
+  * successful, the service will be atomically unregistered.
+  *@param serviceType is the service type.
+  *@param cleanup is the object to call to clean up an inactive service.
+  *@return true if there were no cleanup operations necessary.
+  */
+  @Override
+  public boolean cleanupInactiveService(String serviceType, IServiceCleanup cleanup)
+    throws ManifoldCFException
+  {
+    try
+    {
+      ZooKeeperConnection connection = pool.grab();
+      try
+      {
+        enterServiceRegistryLock(connection, serviceType);
+        try
+        {
+          // We find ONE service that is registered but inactive, and clean up after that
one.
+          // Presumably the caller will lather, rinse, and repeat.
+          String registrationNodePath = buildServiceTypeRegistrationPath(serviceType);
+          List<String> children = connection.getNodeChildren(registrationNodePath);
+          String serviceName = null;
+          for (String registeredServiceName : children)
+          {
+            if (!connection.checkNodeExists(buildServiceTypeActivePath(serviceType, registeredServiceName)))
+            {
+              serviceName = registeredServiceName;
+              break;
+            }
+          }
+          if (serviceName == null)
+            return true;
+          
+          // Found one, in serviceName, at position i
+          // Ideally, we should signal at this point that we're cleaning up after it, and
then leave
+          // the exclusive lock, so that other activity can take place.  MHL
+          cleanup.cleanUpService(serviceName);
+
+          // Unregister the service.
+          connection.deleteChild(registrationNodePath, serviceName);
+          return false;
+        }
+        finally
+        {
+          leaveServiceRegistryLock(connection);
+        }
+
+      }
+      finally
+      {
+        pool.release(connection);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+  }
+
+  /** End service activity.
+  * This operation exits the "active" zone for the service.  This must take place using the
same ILockManager
+  * object that was used to registerServiceBeginServiceActivity() - which implies that it
is the same thread.
+  *@param serviceType is the type of service.
+  *@param serviceName is the name of the service to exit.
+  */
+  @Override
+  public void endServiceActivity(String serviceType, String serviceName)
+    throws ManifoldCFException
+  {
+    try
+    {
+      ZooKeeperConnection connection = pool.grab();
+      try
+      {
+        enterServiceRegistryLock(connection, serviceType);
+        try
+        {
+          connection.deleteNode(buildServiceTypeActivePath(serviceType, serviceName));
+        }
+        finally
+        {
+          leaveServiceRegistryLock(connection);
+        }
+      }
+      finally
+      {
+        pool.release(connection);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+  }
+    
+  /** Check whether a service is active or not.
+  * This operation returns true if the specified service is considered active at the moment.
 Once a service
+  * is not active anymore, it can only return to activity by calling beginServiceActivity()
once more.
+  *@param serviceType is the type of service.
+  *@param serviceName is the name of the service to check on.
+  *@return true if the service is considered active.
+  */
+  @Override
+  public boolean checkServiceActive(String serviceType, String serviceName)
+    throws ManifoldCFException
+  {
+    try
+    {
+      ZooKeeperConnection connection = pool.grab();
+      try
+      {
+        enterServiceRegistryLock(connection, serviceType);
+        try
+        {
+          // MHL
+          return false;
+        }
+        finally
+        {
+          leaveServiceRegistryLock(connection);
+        }
+      }
+      finally
+      {
+        pool.release(connection);
+      }
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+  }
+
+  /** Enter service registry lock */
+  protected void enterServiceRegistryLock(ZooKeeperConnection connection, String serviceType)
+    throws ManifoldCFException, InterruptedException
+  {
+    while (true)
+    {
+      if (connection.obtainWriteLockNoWait(buildServiceTypeLockPath(serviceType)))
+        return;
+      ManifoldCF.sleep(100L);
+    }
+  }
+  
+  /** Leave service registry lock */
+  protected void leaveServiceRegistryLock(ZooKeeperConnection connection)
+    throws ManifoldCFException, InterruptedException
+  {
+    connection.releaseLock();
+  }
+  
+  /** Build a zk path for the lock for a specific service type.
+  */
+  protected static String buildServiceTypeLockPath(String serviceType)
+  {
+    return SERVICETYPE_LOCK_PATH_PREFIX + serviceType;
+  }
+  
+  /** Build a zk path for the active node for a specific service of a specific type.
+  */
+  protected static String buildServiceTypeActivePath(String serviceType, String serviceName)
+  {
+    return SERVICETYPE_ACTIVE_PATH_PREFIX + serviceType + "-" + serviceName;
+  }
+  
+  /** Build a zk path for the registration node for a specific service type.
+  */
+  protected static String buildServiceTypeRegistrationPath(String serviceType)
+  {
+    return SERVICETYPE_REGISTER_PATH_PREFIX + serviceType;
+  }
+  
+  // Shared configuration
+
   /** Get the current shared configuration.  This configuration is available in common among
all nodes,
   * and thus must not be accessed through here for the purpose of finding configuration data
that is specific to any one
   * specific node.



Mime
View raw message