manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1601324 - in /manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/IJobManager.java jobs/JobManager.java jobs/Jobs.java system/AssessmentThread.java system/CrawlerAgent.java
Date Mon, 09 Jun 2014 08:47:33 GMT
Author: kwright
Date: Mon Jun  9 08:47:32 2014
New Revision: 1601324

URL: http://svn.apache.org/r1601324
Log:
Add assessment thread, and revamp how registration/deregistration works

Added:
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
  (with props)
Modified:
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1601324&r1=1601323&r2=1601324&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
Mon Jun  9 08:47:32 2014
@@ -978,6 +978,17 @@ public interface IJobManager
   public void noteTransformationConnectorRegistration(String[] connectionNames)
     throws ManifoldCFException;
 
+  /** Note a change in transformation connection configuration.
+  * This method will be called whenever a connection's configuration is modified.
+  */
+  public void noteTransformationConnectionChange(String connectionName)
+    throws ManifoldCFException;
+
+  /** Assess jobs marked to be in need of assessment for connector status changes.
+  */
+  public void assessMarkedJobs()
+    throws ManifoldCFException;
+
   /** Delete jobs in need of being deleted (which are marked "ready for delete").
   * This method is meant to be called periodically to perform delete processing on jobs.
   */

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1601324&r1=1601323&r2=1601324&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
(original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Mon Jun  9 08:47:32 2014
@@ -641,6 +641,24 @@ public class JobManager implements IJobM
     jobs.noteOutputConnectionChange(connectionName);
   }
 
+  /** Note a change in transformation connection configuration.
+  * This method will be called whenever a connection's configuration is modified.
+  */
+  @Override
+  public void noteTransformationConnectionChange(String connectionName)
+    throws ManifoldCFException
+  {
+    jobs.noteTransformationConnectionChange(connectionName);
+  }
+
+  /** Assess jobs marked to be in need of assessment for connector status changes.
+  */
+  public void assessMarkedJobs()
+    throws ManifoldCFException
+  {
+    // MHL
+  }
+
   /** Load a sorted list of job descriptions.
   *@return the list, sorted by description.
   */

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1601324&r1=1601323&r2=1601324&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Mon Jun  9 08:47:32 2014
@@ -878,12 +878,19 @@ public class Jobs extends org.apache.man
 
               IResultRow row = set.getRow(0);
 
-              boolean isSame = true;
-
               // Determine whether we need to reset the scan time for documents.
               // Basically, any change to job parameters that could affect ingestion should
clear isSame so that we
               // relook at all the documents, not just the recent ones.
 
+              boolean isSame = pipelineManager.compareRows(id,jobDescription);
+              if (!isSame)
+              {
+                int currentStatus = stringToStatus((String)row.getValue(statusField));
+                if (currentStatus == STATUS_ACTIVE || currentStatus == STATUS_ACTIVESEEDING
||
+                  currentStatus == STATUS_ACTIVE_UNINSTALLED || currentStatus == STATUS_ACTIVESEEDING_UNINSTALLED)
+                  values.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+              }
+
               if (isSame)
               {
                 String oldOutputSpecXML = (String)row.getValue(outputSpecField);
@@ -899,9 +906,6 @@ public class Jobs extends org.apache.man
               }
 
               if (isSame)
-                isSame = pipelineManager.compareRows(id,jobDescription);
-
-              if (isSame)
                 isSame = hopFilterManager.compareRows(id,jobDescription);
 
               if (isSame)
@@ -1308,6 +1312,25 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+query,list,null);
   }
 
+  /** Invalidate current state with respect to installed connectors.
+  */
+  public void invalidateCurrentState(Long jobID, int oldStatusValue)
+    throws ManifoldCFException
+  {
+    // If we are in a state that cares about the connector state, then we have to signal
we need an assessment.
+    if (oldStatusValue == STATUS_ACTIVE || oldStatusValue == STATUS_ACTIVESEEDING ||
+      oldStatusValue == STATUS_ACTIVE_UNINSTALLED || oldStatusValue == STATUS_ACTIVESEEDING_UNINSTALLED)
+    {
+      // Assessment state is not cached, so no cache invalidation needed
+      ArrayList list = new ArrayList();
+      String query = buildConjunctionClause(list,new ClauseDescription[]{
+        new UnitaryClause(idField,jobID)});
+      HashMap map = new HashMap();
+      map.put(assessmentStateField,assessmentStateToString(ASSESSMENT_UNKNOWN));
+      performUpdate(map,"WHERE "+query,list,null);
+    }
+  }
+  
   /** Signal to a job that an underlying transformation connector has gone away.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1315,7 +1338,7 @@ public class Jobs extends org.apache.man
   public void noteTransformationConnectorDeregistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
-    // MHL
+    deregisterConnector(jobID,oldStatusValue);
   }
 
   /** Signal to a job that an underlying transformation connector has been registered.
@@ -1325,7 +1348,7 @@ public class Jobs extends org.apache.man
   public void noteTransformationConnectorRegistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
-    // MHL
+    invalidateCurrentState(jobID,oldStatusValue);
   }
 
   /** Signal to a job that its underlying output connector has gone away.
@@ -1335,6 +1358,10 @@ public class Jobs extends org.apache.man
   public void noteOutputConnectorDeregistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
+    deregisterConnector(jobID,oldStatusValue);
+  }
+  
+  /* ???
     int newStatusValue;
     // The following states are special, in that when the underlying connector goes away,
the jobs
     // in such states are switched away to something else.  There are TWO reasons that a
state may be in
@@ -1375,7 +1402,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
-
+  */
+  
   /** Signal to a job that its underlying output connector has returned.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1383,6 +1411,10 @@ public class Jobs extends org.apache.man
   public void noteOutputConnectorRegistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
+    invalidateCurrentState(jobID,oldStatusValue);
+  }
+  
+  /*
     int newStatusValue;
     // The following states are special, in that when the underlying connector returns, the
jobs
     // in such states are switched back to their connector-installed value.
@@ -1419,7 +1451,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
-
+  */
+  
   /** Signal to a job that its underlying connector has gone away.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1427,6 +1460,10 @@ public class Jobs extends org.apache.man
   public void noteConnectorDeregistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
+    deregisterConnector(jobID,oldStatusValue);
+  }
+  
+  /*
     int newStatusValue;
     // The following states are special, in that when the underlying connector goes away,
the jobs
     // in such states are switched away to something else.  There are TWO reasons that a
state may be in
@@ -1464,7 +1501,8 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
-
+  */
+  
   /** Signal to a job that its underlying connector has returned.
   *@param jobID is the identifier of the job.
   *@param oldStatusValue is the current status value for the job.
@@ -1472,6 +1510,10 @@ public class Jobs extends org.apache.man
   public void noteConnectorRegistration(Long jobID, int oldStatusValue)
     throws ManifoldCFException
   {
+    invalidateCurrentState(jobID,oldStatusValue);
+  }
+  
+  /*
     int newStatusValue;
     // The following states are special, in that when the underlying connector returns, the
jobs
     // in such states are switched back to their connector-installed value.
@@ -1505,7 +1547,45 @@ public class Jobs extends org.apache.man
       new UnitaryClause(idField,jobID)});
     performUpdate(newValues,"WHERE "+query,list,invKey);
   }
+  */
+  
+  protected void deregisterConnector(Long jobID, int oldStatusValue)
+    throws ManifoldCFException
+  {
+    // Make sure we transition to an uninstalled state, if we aren't there already.
+    int newStatusValue;
+    switch (oldStatusValue)
+    {
+    case STATUS_ACTIVE:
+      newStatusValue = STATUS_ACTIVE_UNINSTALLED;
+      break;
+    case STATUS_ACTIVESEEDING:
+      newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
+      break;
+    case STATUS_ACTIVE_NOOUTPUT:
+      newStatusValue = STATUS_ACTIVE_UNINSTALLED;
+      break;
+    case STATUS_ACTIVESEEDING_NOOUTPUT:
+      newStatusValue = STATUS_ACTIVESEEDING_UNINSTALLED;
+      break;
+    default:
+      newStatusValue = oldStatusValue;
+      break;
+    }
+    if (newStatusValue == oldStatusValue)
+      return;
+
+    StringSet invKey = new StringSet(getJobStatusKey());
 
+    HashMap newValues = new HashMap();
+    newValues.put(statusField,statusToString(newStatusValue));
+    newValues.put(assessmentStateField,assessmentStateToString(ASSESSMENT_KNOWN));
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,jobID)});
+    performUpdate(newValues,"WHERE "+query,list,invKey);
+  }
+  
   /** Note a change in connection configuration.
   * This method will be called whenever a connection's configuration is modified, or when
an external repository change
   * is signalled.
@@ -1537,6 +1617,22 @@ public class Jobs extends org.apache.man
       new UnitaryClause(outputNameField,connectionName)});
     performUpdate(newValues,"WHERE "+query,list,null);
   }
+
+  /** Note a change in transformation connection configuration.
+  * This method will be called whenever a connection's configuration is modified.
+  */
+  public void noteTransformationConnectionChange(String connectionName)
+    throws ManifoldCFException
+  {
+    // No cache keys need invalidation, since we're changing the start time, not the status.
+    HashMap newValues = new HashMap();
+    newValues.put(lastCheckTimeField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new JoinClause(getTableName()+"."+idField,pipelineManager.ownerIDField),
+      new UnitaryClause(pipelineManager.transformationNameField,connectionName)});
+    performUpdate(newValues,"WHERE EXISTS(SELECT 'x' FROM "+pipelineManager.getTableName()+"
WHERE "+query+")",list,null);
+  }
   
   /** Check whether a job's status indicates that it is in ACTIVE or ACTIVESEEDING state.
   */

Added: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java?rev=1601324&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
(added)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
Mon Jun  9 08:47:32 2014
@@ -0,0 +1,128 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This thread periodically checks for jobs whose assessment state is marked
+* "UNKNOWN", and assesses those jobs to make the proper state transitions occur.
+*/
+public class AssessmentThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // Local data
+  /** Process ID */
+  protected final String processID;
+
+  /** Constructor.
+  */
+  public AssessmentThread(String processID)
+    throws ManifoldCFException
+  {
+    super();
+    this.processID = processID;
+    setName("Assessment thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    Logging.threads.debug("Start up assessment thread");
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      IJobManager jobManager = JobManagerFactory.make(threadContext);
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          // Do the assessment
+          jobManager.assessMarkedJobs();
+          
+          // Sleep for the retry interval.
+          ManifoldCF.sleep(5000L);
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            Logging.threads.error("Assessment thread aborting and restarting due to database
connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("agents process ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("agents process could not start - shutting down");
+      Logging.threads.fatal("AssessmentThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+
+  }
+
+}

Propchange: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/AssessmentThread.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1601324&r1=1601323&r2=1601324&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
(original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
Mon Jun  9 08:47:32 2014
@@ -50,7 +50,8 @@ public class CrawlerAgent implements IAg
   protected IdleCleanupThread idleCleanupThread = null;
   protected SetPriorityThread setPriorityThread = null;
   protected HistoryCleanupThread historyCleanupThread = null;
-
+  protected AssessmentThread assessmentThread = null;
+  
   // Reset managers
   /** Worker thread pool reset manager */
   protected WorkerResetManager workerResetManager = null;
@@ -326,7 +327,9 @@ public class CrawlerAgent implements IAg
   public void noteTransformationConnectionChange(IThreadContext threadContext, String connectionName)
     throws ManifoldCFException
   {
-    // MHL
+    // Notify job manager
+    IJobManager jobManager = JobManagerFactory.make(threadContext);
+    jobManager.noteTransformationConnectionChange(connectionName);
   }
 
   /** Start everything.
@@ -420,6 +423,7 @@ public class CrawlerAgent implements IAg
     jobResetThread = new JobResetThread(processID);
     seedingThread = new SeedingThread(new SeedingResetManager(processID),processID);
     idleCleanupThread = new IdleCleanupThread(processID);
+    assessmentThread = new AssessmentThread(processID);
 
     // Start all the threads
     jobStartThread.start();
@@ -466,6 +470,7 @@ public class CrawlerAgent implements IAg
     jobResetThread.start();
     seedingThread.start();
     idleCleanupThread.start();
+    assessmentThread.start();
 
     Logging.root.info("Pull-agent started");
   }
@@ -481,7 +486,7 @@ public class CrawlerAgent implements IAg
       finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread
!= null || expireThreads != null ||
       deleteStufferThread != null || deleteThreads != null ||
       cleanupStufferThread != null || cleanupThreads != null ||
-      jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread
!= null || historyCleanupThread != null)
+      jobResetThread != null || seedingThread != null || idleCleanupThread != null || assessmentThread
!= null || setPriorityThread != null || historyCleanupThread != null)
     {
       // Send an interrupt to all threads that are still there.
       // In theory, this only needs to be done once.  In practice, I have seen cases where
the thread loses track of the fact that it has been
@@ -586,6 +591,10 @@ public class CrawlerAgent implements IAg
       {
         idleCleanupThread.interrupt();
       }
+      if (assessmentThread != null)
+      {
+        assessmentThread.interrupt();
+      }
 
       // Now, wait for all threads to die.
       try
@@ -751,6 +760,11 @@ public class CrawlerAgent implements IAg
         if (!idleCleanupThread.isAlive())
           idleCleanupThread = null;
       }
+      if (assessmentThread != null)
+      {
+        if (!assessmentThread.isAlive())
+          assessmentThread = null;
+      }
     }
 
     // Threads are down; release connectors



Mime
View raw message