manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1610713 [2/2] - in /manifoldcf/trunk: ./ connectors/documentum/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/DCTM/ connectors/filenet/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/filenet/ connectors/h...
Date Tue, 15 Jul 2014 14:25:40 GMT
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1610713&r1=1610712&r2=1610713&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Tue Jul 15 14:25:39 2014
@@ -5188,6 +5188,85 @@ public class JobManager implements IJobM
       new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
   }
 
+  /** Undo the addition of child documents to the queue, for a set of documents.
+  * This method is called at the end of document processing, to back out any incomplete additions to the queue, and restore
+  * the status quo ante prior to the incomplete additions.  Call this method instead of finishDocuments() if the
+  * addition of documents was not completed.
+  *@param jobID is the job identifier.
+  *@param legalLinkTypes is the set of legal link types that this connector generates.
+  *@param parentIdentifierHashes are the hashes of the document identifiers for whom child link extraction just took place.
+  */
+  @Override
+  public void revertDocuments(Long jobID, String[] legalLinkTypes,
+    String[] parentIdentifierHashes)
+    throws ManifoldCFException
+  {
+    if (parentIdentifierHashes.length == 0)
+      return;
+    
+    if (legalLinkTypes.length == 0)
+    {
+      while (true)
+      {
+        long sleepAmt = 0L;
+        database.beginTransaction(database.TRANSACTION_SERIALIZED);
+        try
+        {
+          // Revert carrydown records
+          carryDown.revertRecords(jobID,parentIdentifierHashes);
+          database.performCommit();
+          break;
+        }
+        catch (Error e)
+        {
+          database.signalRollback();
+          throw e;
+        }
+        catch (RuntimeException e)
+        {
+          database.signalRollback();
+          throw e;
+        }
+        finally
+        {
+          database.endTransaction();
+          sleepFor(sleepAmt);
+        }
+      }
+    }
+    else
+    {
+      // Revert both hopcount and carrydown
+      while (true)
+      {
+        long sleepAmt = 0L;
+        database.beginTransaction(database.TRANSACTION_SERIALIZED);
+        try
+        {
+          carryDown.revertRecords(jobID,parentIdentifierHashes);
+          hopCount.revertParents(jobID,parentIdentifierHashes);
+          database.performCommit();
+          break;
+        }
+        catch (Error e)
+        {
+          database.signalRollback();
+          throw e;
+        }
+        catch (RuntimeException e)
+        {
+          database.signalRollback();
+          throw e;
+        }
+        finally
+        {
+          database.endTransaction();
+          sleepFor(sleepAmt);
+        }
+      }
+    }
+  }
+
   /** Complete adding child documents to the queue, for a set of documents.
   * This method is called at the end of document processing, to help the hopcount tracking engine do its bookkeeping.
   *@param jobID is the job identifier.
@@ -5240,6 +5319,11 @@ public class JobManager implements IJobM
           database.signalRollback();
           throw e;
         }
+        catch (RuntimeException e)
+        {
+          database.signalRollback();
+          throw e;
+        }
         finally
         {
           database.endTransaction();
@@ -5299,6 +5383,11 @@ public class JobManager implements IJobM
           database.signalRollback();
           throw e;
         }
+        catch (RuntimeException e)
+        {
+          database.signalRollback();
+          throw e;
+        }
         finally
         {
           database.endTransaction();

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610713&r1=1610712&r2=1610713&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Jul 15 14:25:39 2014
@@ -80,9 +80,8 @@ public class WorkerThread extends Thread
 
       IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
       
-      List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
+      // This is the set of documents that we will either be marking as complete, or requeued, depending on the kind of crawl.
       List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
-      Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
 
       // This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
       List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();
@@ -172,7 +171,6 @@ public class WorkerThread extends Thread
             }
 
             // Clear out all of our disposition lists
-            fetchList.clear();
             finishList.clear();
             deleteList.clear();
             ingesterCheckList.clear();
@@ -284,503 +282,375 @@ public class WorkerThread extends Thread
                   // Check for interruption before we start fetching
                   if (Thread.currentThread().isInterrupted())
                     throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
-
-                  if (activeDocuments.size() > 0)
+                  
+                  // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
+                  // We put this in a map so it can be looked up by document identifier.
+                  // Create a full PipelineSpecification, including description strings.  (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
+                  IPipelineSpecification pipelineSpecification;
+                  try
                   {
-                    // === Fetch document versions ===
-                    String[] currentDocIDHashArray = new String[activeDocuments.size()];
-                    String[] currentDocIDArray = new String[activeDocuments.size()];
-                    // We used to feed the old document version back to the repository connector so that it could
-                    // make decisions about whether to fetch, or just to call documentRecord().  The problem in a
-                    // multi-output world is that we may have had an error, and successfully output a document to
-                    // some outputs but not to others.  But we do this in a specific order.  It should be always safe
-                    // to get the document version from the *last* output in the sequence.  The problem is, we need
-                    // to be able to figure out what that is, and it is currently an implementation detail of
-                    // IncrementalIngester.  We solve this by allowing IncrementalIngester to make the decision.
-
-                    String[] oldVersionStringArray = new String[activeDocuments.size()];
+                    pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                  }
+                  catch (ServiceInterruption e)
+                  {
+                    // Handle service interruption from pipeline
+                    if (!e.jobInactiveAbort())
+                      Logging.jobs.warn("Service interruption reported for job "+
+                      job.getID()+" connection '"+job.getConnectionName()+"': "+
+                      e.getMessage());
+
+                    // All documents get requeued, because we never got far enough to make distinctions.  All we have to decide
+                    // is whether to requeue or abort.
+                    List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
 
-                    for (int i = 0; i < activeDocuments.size(); i++)
+                    for (QueuedDocument qd : activeDocuments)
                     {
-                      QueuedDocument qd = activeDocuments.get(i);
-                      currentDocIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
-                      currentDocIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
-                      DocumentIngestStatus dis = qd.getLastIngestedStatus(lastIndexedOutputConnectionName);
-                      if (dis == null)
-                        oldVersionStringArray[i] = null;
+                      DocumentDescription dd = qd.getDocumentDescription();
+                      // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
+                      if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+                        dd.getFailRetryCount() == 0))
+                      {
+                        // Treat this as a hard failure.
+                        if (e.isAbortOnFail())
+                        {
+                          rescanList.add(qd);
+                          abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+                        }
+                        else
+                        {
+                          requeueList.add(qd);
+                        }
+                      }
                       else
                       {
-                        oldVersionStringArray[i] = dis.getDocumentVersion();
-                        if (oldVersionStringArray[i] == null)
-                          oldVersionStringArray[i] = "";
+                        requeueList.add(qd);
                       }
                     }
+                      
+                    requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+                      e.getFailRetryCount());
+                      
+                    activeDocuments.clear();
+                    pipelineSpecification = null;
+                  }
 
-                    // Create a full PipelineSpecification, including description strings.  (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
-                    IPipelineSpecification pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                  if (activeDocuments.size() > 0)
+                  {
                     
-                    Set<String> abortSet = new HashSet<String>();
-                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,pipelineSpecification,connMgr,jobManager,ingester,abortSet,ingestLogger);
-
+                    // **** New worker thread code starts here!!! ****
+                    
+                    IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments);
                     String aclAuthority = connection.getACLAuthority();
                     if (aclAuthority == null)
                       aclAuthority = "";
                     boolean isDefaultAuthority = (aclAuthority.length() == 0);
 
-                    if (Logging.threads.isDebugEnabled())
-                      Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
-                    // === Fetch documents ===
-                    // We start by getting the document version string.
-                    DocumentVersions documentVersions = new DocumentVersions();
-                    boolean successfulVersions = false;
-                    try
+                    // Build the processActivity object
+                    
+                    
+                    Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
+                    String[] documentIDs = new String[activeDocuments.size()];
+                    int k = 0;
+                    for (QueuedDocument qd : activeDocuments)
                     {
-                      connector.getDocumentVersions(documentVersions,currentDocIDArray,oldVersionStringArray,
-                        versionActivity,spec,jobType,isDefaultAuthority);
-                      successfulVersions = true;
-                      
-                      if (Logging.threads.isDebugEnabled())
-                        Logging.threads.debug("Worker thread done getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
+                      fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
+                        new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+                      documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
                     }
-                    catch (ServiceInterruption e)
+                    
+                    ProcessActivity activity = new ProcessActivity(job.getID(),processID,
+                      threadContext,rt,jobManager,ingester,
+                      connectionName,pipelineSpecification,
+                      fetchPipelineSpecifications,
+                      currentTime,
+                      job.getExpiration(),
+                      job.getForcedMetadata(),
+                      job.getInterval(),
+                      job.getMaxInterval(),
+                      job.getHopcountMode(),
+                      connection,connector,connMgr,legalLinkTypes,ingestLogger,
+                      newParameterVersion);
+                    try
                     {
-                      // This service interruption comes from a point where we
-                      // know that no documents were ingested.
-                      // Therefore, active -> pending and activepurgatory -> pendingpurgatory
+                      if (Logging.threads.isDebugEnabled())
+                        Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents");
 
-                      if (!e.jobInactiveAbort())
+                      // Now, process in bulk -- catching and handling ServiceInterruptions
+                      ServiceInterruption serviceInterruption = null;
+                      try
                       {
-                        Logging.jobs.warn("Pre-ingest service interruption reported for job "+
+                        connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
+                      }
+                      catch (ServiceInterruption e)
+                      {
+                        serviceInterruption = e;
+                        if (!e.jobInactiveAbort())
+                          Logging.jobs.warn("Service interruption reported for job "+
                           job.getID()+" connection '"+job.getConnectionName()+"': "+
                           e.getMessage());
                       }
 
-                      if (!e.jobInactiveAbort() && e.isAbortOnFail())
-                        abortOnFail = new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-                        
-                      // Mark the current documents to be recrawled at the
-                      // time specified, with the proper error handling.
-                      List<QueuedDocument> newActiveList = new ArrayList<QueuedDocument>(activeDocuments.size());
-                      for (int i = 0; i < activeDocuments.size(); i++)
+                      // Flush remaining references into the database!
+                      activity.flush();
+
+                      if (Logging.threads.isDebugEnabled())
+                        Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents");
+
+                      // Either way, handle the documents we were supposed to process.  But if there was a service interruption,
+                      // and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally.
+                      List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
+
+                      for (QueuedDocument qd : activeDocuments)
                       {
-                        QueuedDocument qd = activeDocuments.get(i);
-                        DocumentDescription dd = qd.getDocumentDescription();
-                        // If either we are going to be requeuing beyond the fail time, OR
-                        // the number of retries available has hit 0, THEN we treat this
-                        // as either an "ignore" or a hard error.
-                        if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
-                          dd.getFailRetryCount() == 0))
+                        // If this document was aborted, then treat it specially.
+                        if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
-                          // Treat this as a hard failure.
-                          if (e.isAbortOnFail())
-                          {
-                            rescanList.add(qd);
-                          }
-                          // We want this particular document to be not included in the
-                          // reprocessing.  Therefore, we do the same thing as we would
-                          // if we got back a null version.
-                          deleteList.add(qd);
+                          // Special treatment for aborted documents.
+                          // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
+                          // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
+                          // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
+                          // unconditional requeue.
+                          finishList.add(qd);
                         }
-                        else
+                        else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
-                          // Retry this document according to the parameters provided.
-                          jobManager.resetDocument(dd,e.getRetryTime(),
-                            IJobManager.ACTION_RESCAN,e.getFailTime(),e.getFailRetryCount());
-                          qd.setProcessed();
+                          deleteList.add(qd);
                         }
-                      }
-                      
-                      // All active documents have been removed from the list
-                      activeDocuments.clear();
-                      
-                    }
-
-                    // If version fetch was successful, the go on to processing phase
-                    if (successfulVersions)
-                    {
-                      // This try{ } is for releasing document versions at the connector level.
-                      try
-                      {
-
-                        // Loop through documents now, and amass what we need to fetch.
-                        // We also need to tally: (1) what needs to be marked as deleted via
-                        //   jobManager.markDocumentDeleted();
-                        // (2) what needs to be noted as a deletion to ingester
-                        // (3) what needs to be noted as a check for the ingester
-                        for (int i = 0; i < activeDocuments.size(); i++)
+                        else if (serviceInterruption != null)
                         {
-                          QueuedDocument qd = activeDocuments.get(i);
+                          // Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document
+                          // for references
                           DocumentDescription dd = qd.getDocumentDescription();
-                          // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
-                          if (abortSet.contains(dd.getDocumentIdentifier()))
+                          // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
+                          if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() ||
+                            dd.getFailRetryCount() == 0))
                           {
-                            // Special treatment for aborted documents.
-                            // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
-                            // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
-                            // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
-                            // unconditional requeue.
-                            finishList.add(qd);
-                          }
-                          else
-                          {
-                            // Compare against old version.
-                            // We call the incremental ingester to make the decision for us as to whether we refetch a document or not.
-                            
-                            String documentIDHash = dd.getDocumentIdentifierHash();
-                            VersionContext newDocContext = documentVersions.getDocumentVersion(dd.getDocumentIdentifier());
-
-                            if (newDocContext == null)
+                            // Treat this as a hard failure.
+                            if (serviceInterruption.isAbortOnFail())
                             {
-                              deleteList.add(qd);
+                              // Make sure that the job aborts.
+                              abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
+                              rescanList.add(qd);
                             }
                             else
                             {
-                              // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
-                              finishList.add(qd);
-
-                              // See if we need to add, or update.
-                              IPipelineSpecificationWithVersions specWithVersions = new PipelineSpecificationWithVersions(pipelineSpecification,qd);
-                              boolean allowIngest = ingester.checkFetchDocument(specWithVersions,
-                                newDocContext.getVersionString(),
-                                newParameterVersion,
-                                aclAuthority);
-
-                              fetchList.add(new DocumentToProcess(qd,!allowIngest));
-                              if (!allowIngest)
-                                ingesterCheckList.add(documentIDHash);
+                              // Skip the document, rather than failing.
+                              // We want this particular document to be not included in the
+                              // reprocessing.  Therefore, we do the same thing as we would
+                              // if we got back a null version.
+                              deleteList.add(qd);
                             }
                           }
-
-                        }
-                        activeDocuments.clear();
-
-                        // We are done transfering activeDocuments documents to the other lists for processing.
-                        // Those lists will all need to be processed, but the processList is special because it
-                        // must be processed in the same context as the version fetch.
-
-                        // Note the documents that have been checked but not reingested.  This should happen BEFORE we need
-                        // the statistics (which are calculated during the finishlist step below)
-                        if (ingesterCheckList.size() > 0)
-                        {
-                          String[] checkClasses = new String[ingesterCheckList.size()];
-                          String[] checkIDs = new String[ingesterCheckList.size()];
-                          for (int i = 0; i < checkIDs.length; i++)
+                          else
                           {
-                            checkClasses[i] = connectionName;
-                            checkIDs[i] = ingesterCheckList.get(i);
+                            // Not a hard failure.  Requeue.
+                            requeueList.add(qd);
                           }
-                          ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
                         }
+                        else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier()))
+                        {
+                          finishList.add(qd);
+                          ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+                        }
+                        else
+                        {
+                          // All documents not specifically called out above are simply finished, since we know they haven't been deleted.
+                          finishList.add(qd);
+                        }
+                      }
 
-                        // First, make the things we will need for all subsequent steps.
-                        // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
-                        // We put this in a map so it can be looked up by document identifier.
-                        Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
-                        for (int i = 0; i < fetchList.size(); i++)
+                      if (serviceInterruption != null)
+                      {
+                        // Requeue the documents we've identified as needing to be repeated
+                        requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
+                          serviceInterruption.getFailRetryCount());
+                      }
+                      
+                      // Note the documents that have been checked but not reingested.  This should happen BEFORE we need
+                      // the statistics (which are calculated during the finishlist step below)
+                      if (ingesterCheckList.size() > 0)
+                      {
+                        String[] checkClasses = new String[ingesterCheckList.size()];
+                        String[] checkIDs = new String[ingesterCheckList.size()];
+                        for (int i = 0; i < checkIDs.length; i++)
                         {
-                          QueuedDocument qd = fetchList.get(i).getDocument();
-                          fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
-                            new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+                          checkClasses[i] = connectionName;
+                          checkIDs[i] = ingesterCheckList.get(i);
                         }
-                        
-                        ProcessActivity activity = new ProcessActivity(job.getID(),processID,
-                          threadContext,rt,jobManager,ingester,
-                          connectionName,pipelineSpecification,
-                          fetchPipelineSpecifications,
-                          currentTime,
-                          job.getExpiration(),
-                          job.getForcedMetadata(),
-                          job.getInterval(),
-                          job.getMaxInterval(),
-                          job.getHopcountMode(),
-                          connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,
-                          newParameterVersion);
-                        try
+                        ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
+                      }
+
+                      // Process the finish list!
+                      if (finishList.size() > 0)
+                      {
+                        // "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
+                        // This can ONLY be done on fully-completed documents; everything else should be left in a dangling
+                        // state (which we know is OK because it will be fixed the next time the document is attempted).
+                        String[] documentIDHashes = new String[finishList.size()];
+                        k = 0;
+                        for (QueuedDocument qd : finishList)
                         {
+                          documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
+                        }
+                        DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
+                        ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
 
-                          // Finishlist and Fetchlist are parallel.  Fetchlist contains what we need to process.
-                          if (fetchList.size() > 0)
+                        // In both job types, we have to go through the finishList to figure out what to do with the documents.
+                        // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
+                        switch (job.getType())
+                        {
+                        case IJobDescription.TYPE_CONTINUOUS:
                           {
-                            // Build a list of id's and flags
-                            String[] processIDs = new String[fetchList.size()];
-                            String[] processIDHashes = new String[fetchList.size()];
-                            boolean[] scanOnly = new boolean[fetchList.size()];
-
-                            for (int i = 0; i < fetchList.size(); i++)
+                            // We need to populate timeArray
+                            String[] timeIDClasses = new String[finishList.size()];
+                            String[] timeIDHashes = new String[finishList.size()];
+                            for (int i = 0; i < timeIDHashes.length; i++)
                             {
-                              DocumentToProcess dToP = fetchList.get(i);
-                              DocumentDescription dd = dToP.getDocument().getDocumentDescription();
-                              processIDs[i] = dd.getDocumentIdentifier();
-                              processIDHashes[i] = dd.getDocumentIdentifierHash();
-                              scanOnly[i] = dToP.getScanOnly();
+                              QueuedDocument qd = (QueuedDocument)finishList.get(i);
+                              DocumentDescription dd = qd.getDocumentDescription();
+                              String documentIDHash = dd.getDocumentIdentifierHash();
+                              timeIDClasses[i] = connectionName;
+                              timeIDHashes[i] = documentIDHash;
                             }
-
-                            if (Thread.currentThread().isInterrupted())
-                              throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
-
-                            if (Logging.threads.isDebugEnabled())
-                              Logging.threads.debug("Worker thread about to process "+Integer.toString(processIDs.length)+" documents");
-
-                            // Now, process in bulk
-                            try
+                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
+                            Long[] recheckTimeArray = new Long[timeArray.length];
+                            int[] actionArray = new int[timeArray.length];
+                            DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
+                            for (int i = 0; i < finishList.size(); i++)
                             {
+                              QueuedDocument qd = finishList.get(i);
+                              recrawlDocs[i] = qd.getDocumentDescription();
+                              String documentID = recrawlDocs[i].getDocumentIdentifier();
 
-                              connector.processDocuments(processIDs,documentVersions,activity,scanOnly,jobType);
-
-                              // Flush remaining references into the database!
-                              activity.flush();
-
-                              // "Finish" the documents (removing unneeded carrydown info, etc.)
-                              DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
+                              // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
+                              boolean wasAborted = activity.wasDocumentAborted(documentID);
+                              if (wasAborted)
+                              {
+                                // Requeue for immediate reprocessing
+                                if (Logging.scheduling.isDebugEnabled())
+                                  Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
 
-                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
-                                requeueCandidates,connector,connection,rt,currentTime);
+                                actionArray[i] = IJobManager.ACTION_RESCAN;
+                                recheckTimeArray[i] = new Long(0L);     // Must not use null; that means 'never'.
+                              }
+                              else
+                              {
+                                // Calculate the next time to run, or time to expire.
 
-                              if (Logging.threads.isDebugEnabled())
-                                Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
+                                // For run time, the formula is to calculate the running avg interval between changes,
+                                // add an additional interval (which comes from the job description),
+                                // and add that to the current time.
+                                // One caveat: we really want to calculate the interval from the last
+                                // time change was detected, but this is not implemented yet.
+                                long timeAmt = timeArray[i];
+                                // null value indicates never to schedule
 
-                            }
-                            catch (ServiceInterruption e)
-                            {
-                              // This service interruption could have resulted
-                              // after some or all of the documents ingested.
-                              // They will therefore need to go into the PENDINGPURGATORY
-                              // state.
-
-                              if (!e.jobInactiveAbort())
-                                Logging.jobs.warn("Service interruption reported for job "+
-                                  job.getID()+" connection '"+job.getConnectionName()+"': "+
-                                  e.getMessage());
-
-                              if (!e.jobInactiveAbort() && e.isAbortOnFail())
-                                abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-
-                              // Mark the current documents to be recrawled in the
-                              // time specified, except for the ones beyond their limits.
-                              // Those will either be deleted, or an exception will be thrown that
-                              // will abort the current job.
+                                Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
+                                Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
 
-                              deleteList.clear();
-                              ArrayList requeueList = new ArrayList();
 
-                              Set<String> fetchDocuments = new HashSet<String>();
-                              for (int i = 0; i < fetchList.size(); i++)
-                              {
-                                fetchDocuments.add(fetchList.get(i).getDocument().getDocumentDescription().getDocumentIdentifierHash());
-                              }
-                              List<QueuedDocument> newFinishList = new ArrayList<QueuedDocument>();
-                              for (int i = 0; i < finishList.size(); i++)
-                              {
-                                QueuedDocument qd = finishList.get(i);
-                                if (fetchDocuments.contains(qd.getDocumentDescription().getDocumentIdentifierHash()))
+                                // Merge the two times together.  We decide on the action based on the action with the lowest time.
+                                if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
                                 {
-                                  DocumentDescription dd = qd.getDocumentDescription();
-                                  // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
-                                  if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
-                                    dd.getFailRetryCount() == 0))
-                                  {
-                                    // Treat this as a hard failure.
-                                    if (e.isAbortOnFail())
-                                    {
-                                      rescanList.add(qd);
-                                    }
-                                    else
-                                    {
-                                      // We want this particular document to be not included in the
-                                      // reprocessing.  Therefore, we do the same thing as we would
-                                      // if we got back a null version.
-                                      deleteList.add(qd);
-                                    }
-                                  }
-                                  else
-                                  {
-                                    requeueList.add(qd);
-                                  }
+                                  if (Logging.scheduling.isDebugEnabled())
+                                    Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+                                  recheckTimeArray[i] = recrawlTime;
+                                  actionArray[i] = IJobManager.ACTION_RESCAN;
+                                }
+                                else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
+                                {
+                                  if (Logging.scheduling.isDebugEnabled())
+                                    Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
+                                  recheckTimeArray[i] = expireTime;
+                                  actionArray[i] = IJobManager.ACTION_REMOVE;
                                 }
                                 else
-                                  newFinishList.add(qd);
+                                {
+                                  // Default activity if conflict will be rescan
+                                  if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
+                                    Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+                                  recheckTimeArray[i] = recrawlTime;
+                                  actionArray[i] = IJobManager.ACTION_RESCAN;
+                                }
                               }
-
-                              // Requeue the documents we've identified
-                              requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
-                                e.getFailRetryCount());
-
-                              // We've disposed of all the documents, so finishlist is now clear
-                              finishList = newFinishList;
                             }
-                          } // End of fetching
 
-                          if (finishList.size() > 0)
+                            jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
+
+                          }
+                          break;
+                        case IJobDescription.TYPE_SPECIFIED:
                           {
-                            // In both job types, we have to go through the finishList to figure out what to do with the documents.
-                            // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
-                            switch (job.getType())
+                            // Separate the ones we actually finished from the ones we need to requeue because they were aborted
+                            List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+                            List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
+                            for (int i = 0; i < finishList.size(); i++)
                             {
-                            case IJobDescription.TYPE_CONTINUOUS:
+                              QueuedDocument qd = finishList.get(i);
+                              DocumentDescription dd = qd.getDocumentDescription();
+                              if (activity.wasDocumentAborted(dd.getDocumentIdentifier()))
                               {
-                                // We need to populate timeArray
-                                String[] timeIDClasses = new String[finishList.size()];
-                                String[] timeIDHashes = new String[finishList.size()];
-                                for (int i = 0; i < timeIDHashes.length; i++)
-                                {
-                                  QueuedDocument qd = (QueuedDocument)finishList.get(i);
-                                  DocumentDescription dd = qd.getDocumentDescription();
-                                  String documentIDHash = dd.getDocumentIdentifierHash();
-                                  timeIDClasses[i] = connectionName;
-                                  timeIDHashes[i] = documentIDHash;
-                                }
-                                long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
-                                Long[] recheckTimeArray = new Long[timeArray.length];
-                                int[] actionArray = new int[timeArray.length];
-                                DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
-                                for (int i = 0; i < finishList.size(); i++)
-                                {
-                                  QueuedDocument qd = finishList.get(i);
-                                  recrawlDocs[i] = qd.getDocumentDescription();
-                                  String documentID = recrawlDocs[i].getDocumentIdentifier();
-
-                                  // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
-                                  boolean wasAborted = abortSet.contains(documentID);
-                                  if (wasAborted)
-                                  {
-                                    // Requeue for immediate reprocessing
-                                    if (Logging.scheduling.isDebugEnabled())
-                                      Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
-
-                                    actionArray[i] = IJobManager.ACTION_RESCAN;
-                                    recheckTimeArray[i] = new Long(0L);     // Must not use null; that means 'never'.
-                                  }
-                                  else
-                                  {
-                                    // Calculate the next time to run, or time to expire.
-
-                                    // For run time, the formula is to calculate the running avg interval between changes,
-                                    // add an additional interval (which comes from the job description),
-                                    // and add that to the current time.
-                                    // One caveat: we really want to calculate the interval from the last
-                                    // time change was detected, but this is not implemented yet.
-                                    long timeAmt = timeArray[i];
-                                    // null value indicates never to schedule
-
-                                    Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
-                                    Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
-
-
-                                    // Merge the two times together.  We decide on the action based on the action with the lowest time.
-                                    if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
-                                    {
-                                      if (Logging.scheduling.isDebugEnabled())
-                                        Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
-                                      recheckTimeArray[i] = recrawlTime;
-                                      actionArray[i] = IJobManager.ACTION_RESCAN;
-                                    }
-                                    else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
-                                    {
-                                      if (Logging.scheduling.isDebugEnabled())
-                                        Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
-                                      recheckTimeArray[i] = expireTime;
-                                      actionArray[i] = IJobManager.ACTION_REMOVE;
-                                    }
-                                    else
-                                    {
-                                      // Default activity if conflict will be rescan
-                                      if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
-                                        Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
-                                      recheckTimeArray[i] = recrawlTime;
-                                      actionArray[i] = IJobManager.ACTION_RESCAN;
-                                    }
-                                  }
-                                }
-
-                                jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
-
+                                // The document was aborted, so put it into the abortedList
+                                abortedList.add(dd);
                               }
-                              break;
-                            case IJobDescription.TYPE_SPECIFIED:
+                              else
                               {
-                                // Separate the ones we actually finished from the ones we need to requeue because they were aborted
-                                List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
-                                List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
-                                for (int i = 0; i < finishList.size(); i++)
-                                {
-                                  QueuedDocument qd = finishList.get(i);
-                                  DocumentDescription dd = qd.getDocumentDescription();
-                                  if (abortSet.contains(dd.getDocumentIdentifier()))
-                                  {
-                                    // The document was aborted, so put it into the abortedList
-                                    abortedList.add(dd);
-                                  }
-                                  else
-                                  {
-                                    // The document was completed.
-                                    completedList.add(dd);
-                                  }
-                                }
-
-                                // Requeue the ones that must be repeated
-                                if (abortedList.size() > 0)
-                                {
-                                  DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
-                                  Long[] recheckTimeArray = new Long[docDescriptions.length];
-                                  int[] actionArray = new int[docDescriptions.length];
-                                  for (int i = 0; i < docDescriptions.length; i++)
-                                  {
-                                    docDescriptions[i] = abortedList.get(i);
-                                    recheckTimeArray[i] = new Long(0L);
-                                    actionArray[i] = IJobManager.ACTION_RESCAN;
-                                  }
-
-                                  jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
-                                }
-
-                                // Mark the ones completed that were actually completed.
-                                if (completedList.size() > 0)
-                                {
-                                  DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
-                                  for (int i = 0; i < docDescriptions.length; i++)
-                                  {
-                                    docDescriptions[i] = (DocumentDescription)completedList.get(i);
-                                  }
-
-                                  jobManager.markDocumentCompletedMultiple(docDescriptions);
-                                }
+                                // The document was completed.
+                                completedList.add(dd);
                               }
-                              break;
-                            default:
-                              throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
                             }
 
-                            // Finally, if we're still alive, mark everything as "processed".
-                            for (int i = 0; i < finishList.size(); i++)
+                            // Requeue the ones that must be repeated
+                            if (abortedList.size() > 0)
                             {
-                              QueuedDocument qd = finishList.get(i);
-                              qd.setProcessed();
+                              DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
+                              Long[] recheckTimeArray = new Long[docDescriptions.length];
+                              int[] actionArray = new int[docDescriptions.length];
+                              for (int i = 0; i < docDescriptions.length; i++)
+                              {
+                                docDescriptions[i] = abortedList.get(i);
+                                recheckTimeArray[i] = new Long(0L);
+                                actionArray[i] = IJobManager.ACTION_RESCAN;
+                              }
+
+                              jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
                             }
 
+                            // Mark the ones completed that were actually completed.
+                            if (completedList.size() > 0)
+                            {
+                              DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
+                              for (int i = 0; i < docDescriptions.length; i++)
+                              {
+                                docDescriptions[i] = (DocumentDescription)completedList.get(i);
+                              }
+
+                              jobManager.markDocumentCompletedMultiple(docDescriptions);
+                            }
                           }
-                        
+                          break;
+                        default:
+                          throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
                         }
-                        finally
+
+                        // Finally, if we're still alive, mark everything we finished as "processed".
+                        for (QueuedDocument qd : finishList)
                         {
-                          // Make sure we don't leave any dangling carrydown files
-                          activity.discard();
+                          qd.setProcessed();
                         }
-                          
-                        // Successful processing of the set
-                        // We count 'get version' time in the average, so even if we decide not to process a doc
-                        // it still counts.
-                        queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
-
                       }
-                      finally
-                      {
-                        // Release any document temporary storage held by the connector
-                        connector.releaseDocumentVersions(currentDocIDArray,documentVersions);
-                      }
-                    
                     }
+                    finally
+                    {
+                      // Make sure we don't leave any dangling carrydown files
+                      activity.discard();
+                    }
+                    
+                    // Successful processing of the set
+                    // We count 'get version' time in the average, so even if we decide not to process a doc
+                    // it still counts.
+                    queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
+
                   }
                   
                   // Now, handle the delete list
@@ -1110,9 +980,8 @@ public class WorkerThread extends Thread
         requeueCandidates,connector,connection,rt,currentTime);
 
       // Mark all these as done
-      for (int i = 0; i < jobmanagerRemovalList.size(); i++)
+      for (QueuedDocument qd : jobmanagerRemovalList)
       {
-        QueuedDocument qd = jobmanagerRemovalList.get(i);
         qd.setProcessed();
       }
     }
@@ -1132,34 +1001,22 @@ public class WorkerThread extends Thread
     {
       DocumentDescription[] requeueDocs = new DocumentDescription[requeueList.size()];
 
-      int i = 0;
-      while (i < requeueDocs.length)
+      for (int i = 0; i < requeueDocs.length; i++)
       {
         QueuedDocument qd = requeueList.get(i);
         DocumentDescription dd = qd.getDocumentDescription();
         requeueDocs[i] = dd;
-        i++;
       }
 
       jobManager.resetDocumentMultiple(requeueDocs,retryTime,IJobManager.ACTION_RESCAN,failTime,failCount);
 
-      i = 0;
-      while (i < requeueList.size())
+      for (QueuedDocument qd : requeueList)
       {
-        QueuedDocument qd = requeueList.get(i++);
         qd.setProcessed();
       }
     }
   }
 
-  protected static String packTransformations(String[] transformationNames, String[] transformationDescriptionStrings)
-  {
-    StringBuilder sb = new StringBuilder();
-    packList(sb,transformationNames,'+');
-    packList(sb,transformationDescriptionStrings,'!');
-    return sb.toString();
-  }
-  
   /** Another stuffer for packing lists of variable length */
   protected static void packList(StringBuilder output, String[] values, char delimiter)
   {
@@ -1219,231 +1076,6 @@ public class WorkerThread extends Thread
 
   // Nested classes
 
-  /** Version activity class wraps access to activity history.
-  */
-  protected static class VersionActivity implements IVersionActivity
-  {
-    protected final Long jobID;
-    protected final String processID;
-    protected final String connectionName;
-    protected final IPipelineSpecification pipelineSpecification;
-    protected final IRepositoryConnectionManager connMgr;
-    protected final IJobManager jobManager;
-    protected final IIncrementalIngester ingester;
-    protected final Set<String> abortSet;
-    protected final CheckActivity checkActivity;
-    /** Constructor.
-    */
-    public VersionActivity(Long jobID, String processID,
-      String connectionName, IPipelineSpecification pipelineSpecification,
-      IRepositoryConnectionManager connMgr,
-      IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
-      CheckActivity checkActivity)
-    {
-      this.jobID = jobID;
-      this.processID = processID;
-      this.connectionName = connectionName;
-      this.pipelineSpecification = pipelineSpecification;
-      this.connMgr = connMgr;
-      this.jobManager = jobManager;
-      this.ingester = ingester;
-      this.abortSet = abortSet;
-      this.checkActivity = checkActivity;
-    }
-
-    /** Check whether a mime type is indexable by the currently specified output connector.
-    *@param mimeType is the mime type to check, not including any character set specification.
-    *@return true if the mime type is indexable.
-    */
-    @Override
-    public boolean checkMimeTypeIndexable(String mimeType)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return ingester.checkMimeTypeIndexable(
-        pipelineSpecification,
-        mimeType,
-        checkActivity);
-    }
-
-    /** Check whether a document is indexable by the currently specified output connector.
-    *@param localFile is the local copy of the file to check.
-    *@return true if the document is indexable.
-    */
-    @Override
-    public boolean checkDocumentIndexable(File localFile)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return ingester.checkDocumentIndexable(
-        pipelineSpecification,
-        localFile,
-        checkActivity);
-    }
-
-    /** Check whether a document of a specified length is indexable by the currently specified output connector.
-    *@param length is the length to check.
-    *@return true if the document is indexable.
-    */
-    @Override
-    public boolean checkLengthIndexable(long length)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return ingester.checkLengthIndexable(
-        pipelineSpecification,
-        length,
-        checkActivity);
-    }
-
-    /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
-    * to help filter out documents that are not worth indexing.
-    *@param url is the URL of the document.
-    *@return true if the file is indexable.
-    */
-    @Override
-    public boolean checkURLIndexable(String url)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return ingester.checkURLIndexable(
-        pipelineSpecification,
-        url,
-        checkActivity);
-    }
-
-    /** Record time-stamped information about the activity of the connector.
-    *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970).  Every
-    *       activity has an associated time; the startTime field records when the activity began.  A null value
-    *       indicates that the start time and the finishing time are the same.
-    *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
-    *       used to categorize what kind of activity is being recorded.  For example, a web connector might record a
-    *       "fetch document" activity.  Cannot be null.
-    *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
-    *@param entityIdentifier is a (possibly long) string which identifies the object involved in the history record.
-    *       The interpretation of this field will differ from connector to connector.  May be null.
-    *@param resultCode contains a terse description of the result of the activity.  The description is limited in
-    *       size to 255 characters, and can be interpreted only in the context of the current connector.  May be null.
-    *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
-    *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
-    *@param childIdentifiers is a set of child entity identifiers associated with this activity.  May be null.
-    */
-    @Override
-    public void recordActivity(Long startTime, String activityType, Long dataSize,
-      String entityIdentifier, String resultCode, String resultDescription, String[] childIdentifiers)
-      throws ManifoldCFException
-    {
-      connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityIdentifier,resultCode,
-        resultDescription,childIdentifiers);
-    }
-
-    /** Retrieve data passed from parents to a specified child document.
-    *@param localIdentifier is the document identifier of the document we want the recorded data for.
-    *@param dataName is the name of the data items to retrieve.
-    *@return an array containing the unique data values passed from ALL parents.  Note that these are in no particular order, and there will not be any duplicates.
-    */
-    @Override
-    public String[] retrieveParentData(String localIdentifier, String dataName)
-      throws ManifoldCFException
-    {
-      return jobManager.retrieveParentData(jobID,ManifoldCF.hash(localIdentifier),dataName);
-    }
-
-    /** Retrieve data passed from parents to a specified child document.
-    *@param localIdentifier is the document identifier of the document we want the recorded data for.
-    *@param dataName is the name of the data items to retrieve.
-    *@return an array containing the unique data values passed from ALL parents.  Note that these are in no particular order, and there will not be any duplicates.
-    */
-    @Override
-    public CharacterInput[] retrieveParentDataAsFiles(String localIdentifier, String dataName)
-      throws ManifoldCFException
-    {
-      return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName);
-    }
-
-    /** Check whether current job is still active.
-    * This method is provided to allow an individual connector that needs to wait on some long-term condition to give up waiting due to the job
-    * itself being aborted.  If the connector should abort, this method will raise a properly-formed ServiceInterruption, which if thrown to the
-    * caller, will signal that the current versioning activity remains incomplete and must be retried when the job is resumed.
-    */
-    @Override
-    public void checkJobStillActive()
-      throws ManifoldCFException, ServiceInterruption
-    {
-      if (jobManager.checkJobActive(jobID) == false)
-        throw new ServiceInterruption("Job no longer active",System.currentTimeMillis(),true);
-    }
-
-    /** Begin an event sequence.
-    * This method should be called by a connector when a sequencing event should enter the "pending" state.  If the event is already in that state,
-    * this method will return false, otherwise true.  The connector has the responsibility of appropriately managing sequencing given the response
-    * status.
-    *@param eventName is the event name.
-    *@return false if the event is already in the "pending" state.
-    */
-    @Override
-    public boolean beginEventSequence(String eventName)
-      throws ManifoldCFException
-    {
-      return jobManager.beginEventSequence(processID,eventName);
-    }
-
-    /** Complete an event sequence.
-    * This method should be called to signal that an event is no longer in the "pending" state.  This can mean that the prerequisite processing is
-    * completed, but it can also mean that prerequisite processing was aborted or cannot be completed.
-    * Note well: This method should not be called unless the connector is CERTAIN that an event is in progress, and that the current thread has
-    * the sole right to complete it.  Otherwise, race conditions can develop which would be difficult to diagnose.
-    *@param eventName is the event name.
-    */
-    @Override
-    public void completeEventSequence(String eventName)
-      throws ManifoldCFException
-    {
-      jobManager.completeEventSequence(eventName);
-    }
-
-    /** Abort processing a document (for sequencing reasons).
-    * This method should be called in order to cause the specified document to be requeued for later processing.  While this is similar in some respects
-    * to the semantics of a ServiceInterruption, it is applicable to only one document at a time, and also does not specify any delay period, since it is
-    * presumed that the reason for the requeue is because of sequencing issues synchronized around an underlying event.
-    *@param localIdentifier is the document identifier to requeue
-    */
-    @Override
-    public void retryDocumentProcessing(String localIdentifier)
-      throws ManifoldCFException
-    {
-      // Accumulate aborts
-      abortSet.add(localIdentifier);
-    }
-
-    /** Create a global string from a simple string.
-    *@param simpleString is the simple string.
-    *@return a global string.
-    */
-    @Override
-    public String createGlobalString(String simpleString)
-    {
-      return ManifoldCF.createGlobalString(simpleString);
-    }
-
-    /** Create a connection-specific string from a simple string.
-    *@param simpleString is the simple string.
-    *@return a connection-specific string.
-    */
-    @Override
-    public String createConnectionSpecificString(String simpleString)
-    {
-      return ManifoldCF.createConnectionSpecificString(connectionName,simpleString);
-    }
-
-    /** Create a job-based string from a simple string.
-    *@param simpleString is the simple string.
-    *@return a job-specific string.
-    */
-    @Override
-    public String createJobSpecificString(String simpleString)
-    {
-      return ManifoldCF.createJobSpecificString(jobID,simpleString);
-    }
-
-  }
-
   /** Process activity class wraps access to the ingester and job queue.
   */
   protected static class ProcessActivity implements IProcessActivity
@@ -1469,7 +1101,6 @@ public class WorkerThread extends Thread
     protected final String[] legalLinkTypes;
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
-    protected final Set<String> abortSet;
     protected final String parameterVersion;
     
     // We submit references in bulk, because that's way more efficient.
@@ -1484,6 +1115,15 @@ public class WorkerThread extends Thread
     // Origination times
     protected final Map<String,Long> originationTimes = new HashMap<String,Long>();
 
+    // Whether the document was aborted or not
+    protected final Set<String> abortSet = new HashSet<String>();
+
+    // Whether the document was checked or not
+    protected final Set<String> documentCheckedSet = new HashSet<String>();
+    
+    // Whether document was deleted
+    protected final Set<String> documentDeletedSet = new HashSet<String>();
+    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
@@ -1503,7 +1143,6 @@ public class WorkerThread extends Thread
       int hopcountMode,
       IRepositoryConnection connection, IRepositoryConnector connector,
       IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
-      Set<String> abortSet,
       String parameterVersion)
     {
       this.jobID = jobID;
@@ -1526,7 +1165,6 @@ public class WorkerThread extends Thread
       this.connMgr = connMgr;
       this.legalLinkTypes = legalLinkTypes;
       this.ingestLogger = ingestLogger;
-      this.abortSet = abortSet;
       this.parameterVersion = parameterVersion;
     }
 
@@ -1541,6 +1179,27 @@ public class WorkerThread extends Thread
       referenceList.clear();
     }
 
+    /** Check whether a document (and its version string) was unchanged or not.
+    */
+    public boolean wasDocumentUnchanged(String documentIdentifier)
+    {
+      return documentCheckedSet.contains(documentIdentifier);
+    }
+    
+    /** Check whether document was deleted or not.
+    */
+    public boolean wasDocumentDeleted(String documentIdentifier)
+    {
+      return documentDeletedSet.contains(documentIdentifier);
+    }
+    
+    /** Check whether a document was aborted or not.
+    */
+    public boolean wasDocumentAborted(String documentIdentifier)
+    {
+      return abortSet.contains(documentIdentifier);
+    }
+    
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1642,6 +1301,23 @@ public class WorkerThread extends Thread
       existingDr.addPrerequisiteEvents(prereqEventNames);
     }
 
+    /** Check if a document needs to be reindexed, based on a computed version string.
+    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+    * string.  This method will return "true" if the document needs to be re-indexed.
+    *@param documentIdentifier is the document identifier.
+    *@param newVersionString is the newly-computed version string.
+    *@return true if the document needs to be reindexed.
+    */
+    @Override
+    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+      String newVersionString)
+      throws ManifoldCFException
+    {
+      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      IPipelineSpecificationWithVersions spec = fetchPipelineSpecifications.get(documentIdentifierHash);
+      return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
+    }
+
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1733,20 +1409,32 @@ public class WorkerThread extends Thread
       return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName);
     }
 
+    /** Note the fact that a document exists but is unchanged, and nothing further
+    * needs to be done to it.
+    * Call this method if it is determined that the document in question is identical to
+    * the formerly indexed document, AND when the version string for the document
+    * has not changed either.
+    */
+    @Override
+    public void noteUnchangedDocument(String documentIdentifier)
+      throws ManifoldCFException
+    {
+      documentCheckedSet.add(documentIdentifier);
+    }
+
     /** Record a document version, but don't ingest it.
-    * ServiceInterruption is thrown if this action must be rescheduled.
     *@param documentIdentifier is the document identifier.
     *@param version is the document version.
     */
     @Override
     public void recordDocument(String documentIdentifier, String version)
-      throws ManifoldCFException, ServiceInterruption
+      throws ManifoldCFException
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       ingester.documentRecord(
         pipelineSpecification.getBasicPipelineSpecification(),
         connectionName,documentIdentifierHash,
-        version,currentTime,ingestLogger);
+        version,currentTime);
     }
 
     /** Ingest the current document.
@@ -1821,21 +1509,20 @@ public class WorkerThread extends Thread
         data,currentTime,
         documentURI,
         ingestLogger);
+      
     }
 
-    /** Delete the current document from the search engine index, while keeping track of the version information
+    /** Remove the specified document from the search engine index, while keeping track of the version information
     * for it (to reduce churn).
     *@param documentIdentifier is the document's local identifier.
-    *@param version is the version of the document, as reported by the getDocumentVersions() method of the
-    *       corresponding repository connector.
+    *@param version is the version string to be recorded for the document.
     */
-    @Override
-    public void deleteDocument(String documentIdentifier, String version)
+    public void noDocument(String documentIdentifier, String version)
       throws ManifoldCFException, ServiceInterruption
     {
-      if (version.length() == 0)
-        deleteDocument(documentIdentifier);
-      else
+      // Special interpretation for empty version string; treat as if the document doesn't exist
+      // (by ignoring it and allowing it to be deleted later)
+      if (version.length() > 0)
       {
         try
         {
@@ -1847,22 +1534,35 @@ public class WorkerThread extends Thread
           throw new IllegalStateException("IngestDocumentWithException threw an illegal IOException: "+e.getMessage(),e);
         }
       }
+
     }
 
-    /** Delete the current document from the search engine index.  This method does NOT keep track of version
-    * information for the document and thus can lead to "churn", whereby the same document is queued, versioned,
-    * and removed on subsequent crawls.  It therefore should be considered to be deprecated, in favor of
-    * deleteDocument(String localIdentifier, String version).
+    /** Delete the current document from the search engine index, while keeping track of the version information
+    * for it (to reduce churn).
+    * Use noDocument() above instead.
     *@param documentIdentifier is the document's local identifier.
+    *@param version is the version string to be recorded for the document.
     */
     @Override
-    public void deleteDocument(String documentIdentifier)
+    @Deprecated
+    public void deleteDocument(String documentIdentifier, String version)
       throws ManifoldCFException, ServiceInterruption
     {
-      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      ingester.documentDelete(pipelineSpecification.getBasicPipelineSpecification(),
-        connectionName,documentIdentifierHash,
-        ingestLogger);
+      noDocument(documentIdentifier,version);
+    }
+
+    /** Delete the specified document from the search engine index, and from the status table.  This
+    *  method does NOT keep track of version
+    * information for the document and thus can lead to "churn", whereby the same document is queued, processed,
+    * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
+    * in any case where the same decision will need to be made over and over.
+    *@param documentIdentifier is the document's identifier.
+    */
+    @Override
+    public void deleteDocument(String documentIdentifier)
+      throws ManifoldCFException
+    {
+      documentDeletedSet.add(documentIdentifier);
     }
 
     /** Override the schedule for the next time a document is crawled.
@@ -2574,6 +2274,38 @@ public class WorkerThread extends Thread
     
   }
   
+  /** The implementation of the IExistingVersions interface.
+  */
+  protected static class ExistingVersions implements IExistingVersions
+  {
+    protected final Map<String,QueuedDocument> map;
+    protected final String lastOutputConnectionName;
+    
+    public ExistingVersions(String lastOutputConnectionName, List<QueuedDocument> list)
+    {
+      this.lastOutputConnectionName = lastOutputConnectionName;
+      this.map = new HashMap<String,QueuedDocument>();
+      for (QueuedDocument qd : list)
+      {
+        map.put(qd.getDocumentDescription().getDocumentIdentifier(),qd);
+      }
+    }
+    
+    /** Retrieve an existing version string given a document identifier.
+    *@param documentIdentifier is the document identifier.
+    *@return the document version string, or null if the document was never previously indexed.
+    */
+    public String getIndexedVersionString(String documentIdentifier)
+    {
+      QueuedDocument qd = map.get(documentIdentifier);
+      DocumentIngestStatus status = qd.getLastIngestedStatus(lastOutputConnectionName);
+      if (status == null)
+        return null;
+      return status.getDocumentVersion();
+    }
+
+  }
+  
   /** The ingest logger class */
   protected static class OutputActivity extends CheckActivity implements IOutputActivity
   {



Mime
View raw message