manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1544423 - in /manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager: BaseLockManager.java FileLockManager.java
Date Fri, 22 Nov 2013 07:07:49 GMT
Author: kwright
Date: Fri Nov 22 07:07:49 2013
New Revision: 1544423

URL: http://svn.apache.org/r1544423
Log:
Implement file node handling, using standard primitives for extensibility.

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/FileLockManager.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java?rev=1544423&r1=1544422&r2=1544423&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/BaseLockManager.java
Fri Nov 22 07:07:49 2013
@@ -68,6 +68,34 @@ public class BaseLockManager implements 
   {
   }
 
+  // Node synchronization
+  
+  // The node synchronization model involves keeping track of active agents entities, so
that other entities
+  // can perform any necessary cleanup if one of the agents processes goes away unexpectedly.
 There is a
+  // registration primitive (which can fail if the same guid is used as is already registered
and active), a
+  // shutdown primitive (which makes a process id go inactive), and various inspection primitives.
+  // Note well: The transient activity cycle MUST be done within a single thread using a
single
+  // ILockManager implementation - much like a lock.
+  
+  // This implementation of the node infrastructure uses other primitives implemented by
the lock
+  // manager for the implementation.  Specifically, instead of synchronizers, we use a write
lock
+  // to prevent conflicts, and we use flags to determine whether a service is active or not.
 The
+  // tricky thing, though, is the global registry - which must be able to list its contents.
 To acheive
+  // that, we use data with a counter scheme; if the data is not found, it's presumed we
are at the
+  // end of the list.
+  //
+  // By building on other primitives in this way, the same implementation will suffice for
many derived
+  // lockmanager implementations - although ZooKeeper will want a native form.
+
+  /** The global write lock to control sync */
+  protected final static String serviceLock = "_SERVICELOCK_";
+  /** A data name prefix, followed by the service type, and then followed by "_" and the
instance number */
+  protected final static String serviceListPrefix = "_SERVICELIST_";
+  /** A flag prefix, followed by the service type, and then followed by "_" and the service
name */
+  protected final static String servicePrefix = "_SERVICE_";
+  /** A flag prefix, followed by the service type, and then followed by "_" and the service
name */
+  protected final static String activePrefix = "_ACTIVE_";
+
   /** Register a service and begin service activity.
   * This atomic operation creates a permanent registration entry for a service.
   * If the permanent registration entry already exists, this method will not create it or
@@ -83,25 +111,50 @@ public class BaseLockManager implements 
   public void registerServiceBeginServiceActivity(String serviceType, String serviceName)
     throws ManifoldCFException
   {
-    // Use services to lock both services and active services
-    synchronized (services)
+    enterWriteLock(serviceLock);
+    try
     {
-      Set<String> typedActiveServices = activeServices.get(serviceType);
-      if (typedActiveServices != null && typedActiveServices.contains(serviceName))
-        throw new ManifoldCFException("There is already an active service of type '"+serviceType+"'
called '"+serviceName+"'");
-      if (typedActiveServices == null)
-      {
-        typedActiveServices = new HashSet<String>();
-        activeServices.put(serviceType, typedActiveServices);
-      }
-      typedActiveServices.add(serviceName);
-      Set<String> typedRegisteredServices = services.get(serviceType);
-      if (typedRegisteredServices == null)
+      // First, do an active check
+      String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
+      if (checkGlobalFlag(serviceActiveFlag))
+        throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"'
is already active");
+      // First, register the service
+      int i = 0;
+      while (true)
       {
-        typedRegisteredServices = new HashSet<String>();
-        services.put(serviceType, typedRegisteredServices);
+        String resourceName = buildServiceListEntry(serviceType, i);
+        String x = readServiceName(resourceName);
+        if (x == null)
+        {
+          writeServiceName(resourceName, serviceName);
+          try
+          {
+            setGlobalFlag(makeRegisteredServiceFlagName(serviceType, serviceName));
+          }
+          catch (Throwable e)
+          {
+            writeServiceName(resourceName, null);
+            if (e instanceof Error)
+              throw (Error)e;
+            if (e instanceof RuntimeException)
+              throw (RuntimeException)e;
+            if (e instanceof ManifoldCFException)
+              throw (ManifoldCFException)e;
+            else
+              throw new RuntimeException("Unknown exception of type: "+e.getClass().getName()+":
"+e.getMessage(),e);
+          }
+          break;
+        }
+        if (x.equals(serviceName))
+          break;
+        i++;
       }
-      typedRegisteredServices.add(serviceName);
+      // Now, set the appropriate active flag
+      setGlobalFlag(serviceActiveFlag);
+    }
+    finally
+    {
+      leaveWriteLock(serviceLock);
     }
   }
   
@@ -115,14 +168,55 @@ public class BaseLockManager implements 
   public void unregisterService(String serviceType, String serviceName)
     throws ManifoldCFException
   {
-    synchronized (services)
+    enterWriteLock(serviceLock);
+    try
     {
-      Set<String> typedActiveServices = activeServices.get(serviceType);
-      if (typedActiveServices != null && typedActiveServices.contains(serviceName))
-        throw new ManifoldCFException("Cannot unregister an active service; type '"+serviceType+"'
name '"+serviceName+"'");
-      Set<String> typedRegisteredServices = services.get(serviceType);
-      if (typedRegisteredServices != null)
-        typedRegisteredServices.remove(serviceName);
+      // First, do an active check
+      String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
+      if (checkGlobalFlag(serviceActiveFlag))
+        throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"'
is still active; can't unregister");
+      String serviceRegisteredFlag = makeRegisteredServiceFlagName(serviceType, serviceName);
+      clearGlobalFlag(serviceRegisteredFlag);
+      int i = 0;
+      String removalEntry = null;
+      String lastEntry = null;
+      String lastService = null;
+      while (true)
+      {
+        String resourceName = buildServiceListEntry(serviceType, i);
+        String x = readServiceName(resourceName);
+        if (x == null)
+        {
+          if (lastEntry != null)
+            writeServiceName(lastEntry, null);
+          try
+          {
+            if (removalEntry != null && !lastEntry.equals(removalEntry))
+              writeServiceName(removalEntry, lastService);
+          }
+          catch (Throwable e)
+          {
+            writeServiceName(lastEntry, lastService);
+            if (e instanceof Error)
+              throw (Error)e;
+            if (e instanceof RuntimeException)
+              throw (RuntimeException)e;
+            if (e instanceof ManifoldCFException)
+              throw (ManifoldCFException)e;
+            throw new RuntimeException("Unknown exception of type "+e.getClass().getName()+":
"+e.getMessage(),e);
+          }
+          break;
+        }
+        lastEntry = resourceName;
+        lastService = x;
+        if (x.equals(serviceName))
+          removalEntry = resourceName;
+        i++;
+      }
+    }
+    finally
+    {
+      leaveWriteLock(serviceLock);
     }
   }
     
@@ -134,21 +228,35 @@ public class BaseLockManager implements 
   public String[] getRegisteredServices(String serviceType)
     throws ManifoldCFException
   {
-    synchronized (services)
+    enterWriteLock(serviceLock);
+    try
     {
-      Set<String> typedRegisteredServices = services.get(serviceType);
-      if (typedRegisteredServices == null)
-        return new String[0];
-      String[] rval = new String[typedRegisteredServices.size()];
       int i = 0;
-      for (String s : typedRegisteredServices)
+      List<String> services = new ArrayList<String>();
+      while (true)
+      {
+        String resourceName = buildServiceListEntry(serviceType, i);
+        String x = readServiceName(resourceName);
+        if (x == null)
+          break;
+        services.add(x);
+        i++;
+      }
+      String[] rval = new String[services.size()];
+      i = 0;
+      for (String x : services)
       {
-        rval[i++] = s;
+        rval[i++] = x;
       }
       return rval;
     }
+    finally
+    {
+      leaveWriteLock(serviceLock);
+    }
   }
-    
+  
+  
   /** End service activity.
   * This operation exits the "active" zone for the service.  This must take place using the
same ILockManager
   * object that was used to registerServiceBeginServiceActivity() - which implies that it
is the same thread.
@@ -159,12 +267,17 @@ public class BaseLockManager implements 
   public void endServiceActivity(String serviceType, String serviceName)
     throws ManifoldCFException
   {
-    synchronized (services)
+    enterWriteLock(serviceLock);
+    try
     {
-      Set<String> typedActiveServices = activeServices.get(serviceType);
-      if (typedActiveServices == null || !typedActiveServices.contains(serviceName))
-        throw new ManifoldCFException("Service of type '"+serviceType+"', name '"+serviceName+"'
is not currently active");
-      typedActiveServices.remove(serviceName);
+      String serviceActiveFlag = makeActiveServiceFlagName(serviceType, serviceName);
+      if (!checkGlobalFlag(serviceActiveFlag))
+        throw new ManifoldCFException("Service '"+serviceName+"' of type '"+serviceType+"
is not active");
+      clearGlobalFlag(serviceActiveFlag);
+    }
+    finally
+    {
+      leaveWriteLock(serviceLock);
     }
   }
     
@@ -179,16 +292,59 @@ public class BaseLockManager implements 
   public boolean checkServiceActive(String serviceType, String serviceName)
     throws ManifoldCFException
   {
-    synchronized (services)
+    enterWriteLock(serviceLock);
+    try
     {
-      Set<String> typedRegisteredServices = services.get(serviceType);
-      if (typedRegisteredServices == null || !typedRegisteredServices.contains(serviceName))
-        throw new ManifoldCFException("Service of type '"+serviceType+"' name '"+serviceName+"'
does not exist");
-      Set<String> typedActiveServices = activeServices.get(serviceType);
-      if (typedActiveServices == null)
-        return false;
-      return typedActiveServices.contains(serviceName);
+      return checkGlobalFlag(makeActiveServiceFlagName(serviceType, serviceName));
     }
+    finally
+    {
+      leaveWriteLock(serviceLock);
+    }
+  }
+
+  protected static String makeActiveServiceFlagName(String serviceType, String serviceName)
+  {
+    return activePrefix + serviceType + "_" + serviceName;
+  }
+  
+  protected static String makeRegisteredServiceFlagName(String serviceType, String serviceName)
+  {
+    return servicePrefix + serviceType + "_" + serviceName;
+  }
+
+  protected String readServiceName(String resourceName)
+    throws ManifoldCFException
+  {
+    byte[] bytes = readData(resourceName);
+    if (bytes == null)
+      return null;
+    try
+    {
+      return new String(bytes, "utf-8");
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      throw new RuntimeException("Unsupported encoding: "+e.getMessage(),e);
+    }
+  }
+  
+  protected void writeServiceName(String resourceName, String serviceName)
+    throws ManifoldCFException
+  {
+    try
+    {
+      writeData(resourceName, (serviceName==null)?null:serviceName.getBytes("utf-8"));
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      throw new RuntimeException("Unsupported encoding: "+e.getMessage(),e);
+    }
+  }
+  
+  protected static String buildServiceListEntry(String serviceType, int i)
+  {
+    return serviceListPrefix + serviceType + "_" + i;
   }
 
   /** Get the current shared configuration.  This configuration is available in common among
all nodes,

Modified: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/FileLockManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/FileLockManager.java?rev=1544423&r1=1544422&r2=1544423&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/FileLockManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/lockmanager/FileLockManager.java
Fri Nov 22 07:07:49 2013
@@ -78,7 +78,7 @@ public class FileLockManager extends Bas
   {
     return "flag-"+flagName;
   }
-  
+    
   /** Raise a flag.  Use this method to assert a condition, or send a global signal.  The
flag will be reset when the
   * entire system is restarted.
   *@param flagName is the name of the flag to set.



Mime
View raw message