manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1546392 - in /manifoldcf/branches/CONNECTORS-781/framework: agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ agents/src/main/java/org/apache/manifoldcf/agents/system/ pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/
Date Thu, 28 Nov 2013 16:08:14 GMT
Author: kwright
Date: Thu Nov 28 16:08:13 2013
New Revision: 1546392

URL: http://svn.apache.org/r1546392
Log:
Rearrange when reprioritization happens

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
    manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java?rev=1546392&r1=1546391&r2=1546392&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IAgent.java
Thu Nov 28 16:08:13 2013
@@ -70,8 +70,9 @@ public interface IAgent
   * Call this method to clean up dangling persistent state when a cluster is just starting
   * to come up.  This method CANNOT be called when there are any active agents
   * processes at all.
+  *@param processID is the current process ID.
   */
-  public void cleanUpAgentData(IThreadContext threadContext)
+  public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
     throws ManifoldCFException;
   
   /** Cleanup after agents process.
@@ -79,9 +80,10 @@ public interface IAgent
   * This method CANNOT be called when the agent is active, but it can
   * be called at any time and by any process in order to guarantee that a terminated
   * agent does not block other agents from completing their tasks.
-  *@param processID is the process ID of the agent to clean up after.
+  *@param currentProcessID is the current process ID.
+  *@param cleanupProcessID is the process ID of the agent to clean up after.
   */
-  public void cleanUpAgentData(IThreadContext threadContext, String processID)
+  public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String
cleanupProcessID)
     throws ManifoldCFException;
 
   /** Start the agent.  This method should spin up the agent threads, and

Modified: manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1546392&r1=1546391&r2=1546392&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
Thu Nov 28 16:08:13 2013
@@ -235,7 +235,7 @@ public class ManifoldCF extends org.apac
           {
             // Throw a lock, so that cleanup processes and startup processes don't collide.
             String serviceType = getAgentsClassServiceType(className);
-            lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext,
agent));
+            lockManager.registerServiceBeginServiceActivity(serviceType, processID, new CleanupAgent(threadContext,
agent, processID));
             // There is a potential race condition where the agent has been started but hasn't
yet appeared in runningHash.
             // But having runningHash be the synchronizer for this activity will prevent
any problems.
             agent.startAgent(threadContext, processID);
@@ -286,7 +286,7 @@ public class ManifoldCF extends org.apac
       for (String agentsClass : runningHash.keySet())
       {
         IAgent agent = runningHash.get(agentsClass);
-        IServiceCleanup cleanup = new CleanupAgent(threadContext, agent);
+        IServiceCleanup cleanup = new CleanupAgent(threadContext, agent, processID);
         String agentsClassServiceType = getAgentsClassServiceType(agentsClass);
         while (!lockManager.cleanupInactiveService(agentsClassServiceType, cleanup))
         {
@@ -325,11 +325,13 @@ public class ManifoldCF extends org.apac
   {
     protected final IAgent agent;
     protected final IThreadContext threadContext;
-    
-    public CleanupAgent(IThreadContext threadContext, IAgent agent)
+    protected final String processID;
+
+    public CleanupAgent(IThreadContext threadContext, IAgent agent, String processID)
     {
       this.agent = agent;
       this.threadContext = threadContext;
+      this.processID = processID;
     }
     
     /** Clean up after the specified service.  This method will block any startup of the
specified
@@ -340,7 +342,7 @@ public class ManifoldCF extends org.apac
     public void cleanUpService(String serviceName)
       throws ManifoldCFException
     {
-      agent.cleanUpAgentData(threadContext, serviceName);
+      agent.cleanUpAgentData(threadContext, processID, serviceName);
     }
 
     /** Clean up after ALL services of the type on the cluster.
@@ -349,7 +351,7 @@ public class ManifoldCF extends org.apac
     public void cleanUpAllServices()
       throws ManifoldCFException
     {
-      agent.cleanUpAgentData(threadContext);
+      agent.cleanUpAllAgentData(threadContext, processID);
     }
     
     /** Perform cluster initialization - that is, whatever is needed presuming that the
@@ -360,8 +362,7 @@ public class ManifoldCF extends org.apac
     public void clusterInit()
       throws ManifoldCFException
     {
-      // MHL - we really want a separate clusterInit in agents
-      agent.cleanUpAgentData(threadContext);
+      agent.clusterInit(threadContext);
     }
 
   }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1546392&r1=1546391&r2=1546392&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
Thu Nov 28 16:08:13 2013
@@ -140,13 +140,18 @@ public class CrawlerAgent implements IAg
   * Call this method to clean up dangling persistent state when a cluster is just starting
   * to come up.  This method CANNOT be called when there are any active agents
   * processes at all.
+  *@param processID is the current process ID.
   */
   @Override
-  public void cleanUpAgentData(IThreadContext threadContext)
+  public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(threadContext);
     jobManager.cleanupProcessData();
+    // What kind of reprioritization should be done here?
+    // There may be a dangling reprioritization, or even worse, someone may have blown
+    // away the lockmanager info.  So we need to do a complete reprioritization.
+    ManifoldCF.resetAllDocumentPriorities(threadContext,System.currentTimeMillis(),currentProcessID);
   }
   
   /** Cleanup after agents process.
@@ -154,14 +159,58 @@ public class CrawlerAgent implements IAg
   * This method CANNOT be called when the agent is active, but it can
   * be called at any time and by any process in order to guarantee that a terminated
   * agent does not block other agents from completing their tasks.
-  *@param processID is the process ID of the agent to clean up after.
+  *@param currentProcessID is the current process ID.
+  *@param cleanupProcessID is the process ID of the agent to clean up after.
   */
   @Override
-  public void cleanUpAgentData(IThreadContext threadContext, String processID)
+  public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String
cleanupProcessID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(threadContext);
-    jobManager.cleanupProcessData(processID);
+    jobManager.cleanupProcessData(cleanupProcessID);
+    ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+    String reproID = rt.isSpecifiedProcessReprioritizing(cleanupProcessID);
+    if (reproID != null)
+    {
+      // We have to take over the prioritization for the process, which apparently died
+      // in the middle.
+      IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
+
+      // Reprioritize all documents in the jobqueue, 1000 at a time
+
+      Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+      Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+      
+      // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing
will be assigned
+      // new priorities.  Already processed documents won't.  This guarantees that our bins
are appropriate for current thread
+      // activity.
+      // In order for this to be the correct functionality, ALL reseeding and requeuing operations
MUST reset the associated document
+      // priorities.
+      while (true)
+      {
+        long startTime = System.currentTimeMillis();
+
+        Long currentTimeValue = rt.checkReprioritizationInProgress();
+        if (currentTimeValue == null)
+        {
+          // Some other process or thread superceded us.
+          return;
+        }
+        long updateTime = currentTimeValue.longValue();
+        
+        DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime,
10000);
+        if (docs.length == 0)
+          break;
+
+        // Calculate new priorities for all these documents
+        ManifoldCF.writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+          rt,updateTime);
+
+        Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed
documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+      }
+      
+      rt.doneReprioritization(reproID);
+    }
   }
 
   /** Start the agent.  This method should spin up the agent threads, and
@@ -361,13 +410,13 @@ public class CrawlerAgent implements IAg
 
         // Call the database to get it ready
         jobManager.prepareForStart();
-        */
         
         Logging.threads.debug("Agents process reprioritizing documents...");
         // This needs to be moved to cleanup method(s)
         ManifoldCF.resetAllDocumentPriorities(threadContext,System.currentTimeMillis(),processID);
         Logging.threads.debug("Agents process initialization complete!");
-
+        */
+        
         // Start all the threads
         jobStartThread.start();
         startupThread.start();

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java?rev=1546392&r1=1546391&r2=1546392&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
Thu Nov 28 16:08:13 2013
@@ -154,10 +154,9 @@ public class ReprioritizationTracker
   
   /** Check if the specified processID is the one performing reprioritization.
   *@param processID is the process ID to check.
-  *@return true if the specified processID is declared as being responsible for the
-  * current reprioritization, false if no prioritization in progress or a different process
is involved.
+  *@return the repro ID if the processID is confirmed to be the one.
   */
-  public boolean isSpecifiedProcessReprioritizing(String processID)
+  public String isSpecifiedProcessReprioritizing(String processID)
     throws ManifoldCFException
   {
     lockManager.enterWriteLock(trackerWriteLock);
@@ -166,7 +165,9 @@ public class ReprioritizationTracker
       Long currentTime = readTime();
       String currentProcessID = readProcessID();
       String currentReproID = readReproID();
-      return (currentTime != null && currentProcessID != null && currentReproID
!= null && currentProcessID.equals(processID));
+      if (currentTime != null && currentProcessID != null && currentReproID
!= null && currentProcessID.equals(processID))
+        return currentReproID;
+      return null;
     }
     finally
     {



Mime
View raw message