manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1372225 [2/2] - in /manifoldcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/mani...
Date Mon, 13 Aug 2012 00:44:06 GMT
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Aug 13 00:44:05 2012
@@ -78,25 +78,23 @@ public class WorkerThread extends Thread
       List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
       Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
 
-      // This is the place we accumulate documents to be deleted via the ingester; each member is a
-      // document ID string.
-      List<String> ingesterDeleteList = new ArrayList<String>();
-      List<String> ingesterDeleteListUnhashed = new ArrayList<String>();
-
       // This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
-      // These MUST be cleaned up after the ingester delete is called, in order to guarantee
-      // cleanup happens!  Also note that there will be items in here that are NOT in the ingester
-      // delete list...
-      List<QueuedDocument> jobmanagerDeleteList = new ArrayList<QueuedDocument>();
+      List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();
 
+      // This is where we accumulate documents that need to be placed in the HOPCOUNTREMOVED
+      // state
+      List<QueuedDocument> hopcountremoveList = new ArrayList<QueuedDocument>();
+      
+      // This is where we accumulate documents that need to be rescanned
+      List<QueuedDocument> rescanList = new ArrayList<QueuedDocument>();
+      
       // This is where we store document ID strings of documents that need to be noted as having
       // been checked.
       List<String> ingesterCheckList = new ArrayList<String>();
 
-      // Temporary list
-      List<String> tempIDHashList = new ArrayList<String>();
-      List<String> tempIDList = new ArrayList<String>();
-
+      // Service interruption thrown with "abort on fail".
+      ManifoldCFException abortOnFail = null;
+      
       // Loop
       while (true)
       {
@@ -147,25 +145,30 @@ public class WorkerThread extends Thread
             
             OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr,outputName);
 
-            // Put together document id's into an array, and versions into a map
-            String[] docIDHashArray = new String[qds.getCount()];
-            String[] docIDArray = new String[qds.getCount()];
-            int i = 0;
-            while (i < docIDArray.length)
+            // The flow through this section of the code is as follows.
+            // (1) We start with a list of documents
+            // (2) We attempt to do various things to these documents
+            // (3) Based on what happens, and what errors we get, we progressively move documents out of the main list
+            //     and into secondary lists that will be all treated in the same way
+            
+            // First, initialize the active document set to contain everything.
+            List<QueuedDocument> activeDocuments = new ArrayList<QueuedDocument>(qds.getCount());
+            
+            for (int i = 0; i < qds.getCount(); i++)
             {
               QueuedDocument qd = qds.getDocument(i);
-              docIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
-              docIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
-              i++;
+              activeDocuments.add(qd);
             }
 
+            // Clear out all of our disposition lists
             fetchList.clear();
             finishList.clear();
             versionMap.clear();
-            ingesterDeleteList.clear();
-            ingesterDeleteListUnhashed.clear();
-            jobmanagerDeleteList.clear();
+            deleteList.clear();
             ingesterCheckList.clear();
+            hopcountremoveList.clear();
+            rescanList.clear(); //                  jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
+            abortOnFail = null;
 
             // Keep track of the starting processing time, for statistics calculation
             long processingStartTime = System.currentTimeMillis();
@@ -175,100 +178,79 @@ public class WorkerThread extends Thread
             {
               long currentTime = System.currentTimeMillis();
 
-              // Do the hopcount checks, if any.  This will iteratively reduce the viable list of
-              // document identifiers in need of having their versions fetched.
-              String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
-              // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
-              if (legalLinkTypes == null)
+              if (Logging.threads.isDebugEnabled())
+                Logging.threads.debug("Worker thread starting document count is "+Integer.toString(activeDocuments.size()));
+
+              // Get the legal link types.  This is needed for later hopcount checking.
+              String[] legalLinkTypes = null;
+              if (activeDocuments.size() > 0)
               {
-                i = 0;
-                while (i < qds.getCount())
+                legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+                // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
+                if (legalLinkTypes == null)
                 {
-                  QueuedDocument qd = qds.getDocument(i++);
-                  DocumentDescription dd = qd.getDocumentDescription();
-
-                  jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
-                  qd.setProcessed();
+                  // Failure here puts all remaining documents into rescan list
+                  moveList(activeDocuments,rescanList);
                 }
               }
-              else
+
+              if (Logging.threads.isDebugEnabled())
+                Logging.threads.debug(" Post-linktype document count is "+Integer.toString(activeDocuments.size()));
+              
+              // Do the hopcount checks, if any.  This will iteratively reduce the viable list of
+              // document identifiers in need of having their versions fetched.
+              if (legalLinkTypes != null && activeDocuments.size() > 0)
               {
-                // This is where to put the version answers.
-                String[] currentVersions = new String[docIDHashArray.length];
-                // This is a map of docIDHash to index, so I know where to put the versions as they
-                // come back from multiple sources.
-                idHashIndexMap.clear();
-                int z = 0;
-                while (z < docIDHashArray.length)
+                // Set up the current ID array
+                String[] currentDocIDHashArray = new String[activeDocuments.size()];
+                for (int i = 0; i < currentDocIDHashArray.length; i++)
                 {
-                  idHashIndexMap.put(docIDHashArray[z],new Integer(z));
-                  z++;
+                  currentDocIDHashArray[i] = activeDocuments.get(i).getDocumentDescription().getDocumentIdentifierHash();
                 }
-
-                String[] currentDocIDHashArray = docIDHashArray;
-                String[] currentDocIDArray = docIDArray;
-
                 Map filterMap = job.getHopCountFilters();
                 Iterator filterIter = filterMap.keySet().iterator();
+                // Array to accumulate hopcount results for all link types
+                boolean[] overallResults = new boolean[currentDocIDHashArray.length];
+                for (int i = 0; i < overallResults.length; i++)
+                {
+                  overallResults[i] = true;
+                }
+                // Calculate the hopcount result for each link type, and fold it in.
                 while (filterIter.hasNext())
                 {
                   String linkType = (String)filterIter.next();
                   int maxHop = (int)((Long)filterMap.get(linkType)).longValue();
                   boolean[] results = jobManager.findHopCounts(job.getID(),legalLinkTypes,currentDocIDHashArray,linkType,
                     maxHop,job.getHopcountMode());
-                  tempIDHashList.clear();
-                  tempIDList.clear();
-                  z = 0;
-                  while (z < currentDocIDHashArray.length)
-                  {
-                    if (results[z] == false)
-                    {
-                      // Chuck it
-                      currentVersions[idHashIndexMap.get(currentDocIDHashArray[z]).intValue()] = null;
-                    }
-                    else
-                    {
-                      tempIDHashList.add(currentDocIDHashArray[z]);
-                      tempIDList.add(currentDocIDArray[z]);
-                    }
-                    z++;
-                  }
-
-                  currentDocIDHashArray = new String[tempIDHashList.size()];
-                  currentDocIDArray = new String[tempIDList.size()];
-                  z = 0;
-                  while (z < currentDocIDHashArray.length)
+                  for (int i = 0; i < results.length; i++)
                   {
-                    currentDocIDHashArray[z] = tempIDHashList.get(z);
-                    currentDocIDArray[z] = tempIDList.get(z);
-                    z++;
+                    overallResults[i] = overallResults[i] && results[i];
                   }
                 }
-
-                if (Logging.threads.isDebugEnabled())
-                  Logging.threads.debug("Worker thread post-hopcount pruned document count is "+Integer.toString(currentDocIDHashArray.length));
-
-                // Set up the old version string array
-                String[] oldVersionStringArray = new String[currentDocIDHashArray.length];
-                z = 0;
-                while (z < oldVersionStringArray.length)
+                // Move all documents to the appropriate list
+                List<QueuedDocument> newActiveSet = new ArrayList<QueuedDocument>(activeDocuments.size());
+                for (int i = 0; i < overallResults.length; i++)
                 {
-                  String idHashValue = currentDocIDHashArray[z];
-                  QueuedDocument qd = qds.getDocument(idHashIndexMap.get(idHashValue).intValue());
-                  DocumentIngestStatus dis = qd.getLastIngestedStatus();
-                  if (dis == null)
-                    oldVersionStringArray[z] = null;
+                  if (overallResults[i] == false)
+                  {
+                    hopcountremoveList.add(activeDocuments.get(i));
+                  }
                   else
                   {
-                    oldVersionStringArray[z] = dis.getDocumentVersion();
-                    if (oldVersionStringArray[z] == null)
-                      oldVersionStringArray[z] = "";
+                    newActiveSet.add(activeDocuments.get(i));
                   }
-                  z++;
                 }
+                activeDocuments = newActiveSet;
+              }
 
-                // Grab a connector handle
-                IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,
+              if (Logging.threads.isDebugEnabled())
+                Logging.threads.debug(" Post-hopcount pruned document count is "+Integer.toString(activeDocuments.size()));
+              
+              // From here on down we need a connector instance, so get one.
+              IRepositoryConnector connector = null;
+              if (activeDocuments.size() > 0 || hopcountremoveList.size() > 0)
+              {
+                connector = RepositoryConnectorFactory.grab(threadContext,
                   connection.getClassName(),
                   connection.getConfigParams(),
                   connection.getMaxConnections());
@@ -278,52 +260,66 @@ public class WorkerThread extends Thread
                 // must be requeued for immediate reprocessing.  When the rest of the world figures out that the job that owns this
                 // document is in fact unable to function, we'll stop getting such documents handed to us, because the state of the
                 // job will be changed.
+
                 if (connector == null)
                 {
-                  i = 0;
-                  while (i < qds.getCount())
-                  {
-                    QueuedDocument qd = qds.getDocument(i++);
-                    DocumentDescription dd = qd.getDocumentDescription();
-
-                    jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
-                    qd.setProcessed();
-                  }
+                  // Failure here puts all remaining documents into rescan list
+                  moveList(activeDocuments,rescanList);
+                  moveList(hopcountremoveList,rescanList);
                 }
-                else
+              }
+              
+              if (connector != null)
+              {
+                // Open try/finally block to free the connector instance no matter what
+                try
                 {
-                  // System.out.println(" Got a connector in thread "+id);
-                  try
+                  // Check for interruption before we start fetching
+                  if (Thread.currentThread().isInterrupted())
+                    throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+                  if (activeDocuments.size() > 0)
                   {
+                    // === Fetch document versions ===
+                    String[] currentDocIDHashArray = new String[activeDocuments.size()];
+                    String[] currentDocIDArray = new String[activeDocuments.size()];
+                    String[] oldVersionStringArray = new String[activeDocuments.size()];
 
-                    if (Thread.currentThread().isInterrupted())
-                      throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+                    for (int i = 0; i < activeDocuments.size(); i++)
+                    {
+                      QueuedDocument qd = activeDocuments.get(i);
+                      currentDocIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
+                      currentDocIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
+                      DocumentIngestStatus dis = qd.getLastIngestedStatus();
+                      if (dis == null)
+                        oldVersionStringArray[i] = null;
+                      else
+                      {
+                        oldVersionStringArray[i] = dis.getDocumentVersion();
+                        if (oldVersionStringArray[i] == null)
+                          oldVersionStringArray[i] = "";
+                      }
+                    }
 
                     // Get the output version string.
                     String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
                       
                     HashMap abortSet = new HashMap();
-                    ProcessActivity activity;
                     VersionActivity versionActivity = new VersionActivity(connectionName,connMgr,jobManager,job,ingester,abortSet,outputVersion);
 
                     String aclAuthority = connection.getACLAuthority();
                     boolean isDefaultAuthority = (aclAuthority == null || aclAuthority.length() == 0);
 
-                    // Fetch documents (if we need to)
+                    if (Logging.threads.isDebugEnabled())
+                      Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
+
+                    // === Fetch documents ===
+                    // We start by getting the document version string.
+                    String[] newVersionStringArray = null;
                     try
                     {
-                      if (Logging.threads.isDebugEnabled())
-                        Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
-                      String[] newCurrentVersions = connector.getDocumentVersions(currentDocIDArray,oldVersionStringArray,
+                      newVersionStringArray = connector.getDocumentVersions(currentDocIDArray,oldVersionStringArray,
                         versionActivity,spec,jobType,isDefaultAuthority);
-                      z = 0;
-                      while (z < currentDocIDHashArray.length)
-                      {
-                        currentVersions[((Integer)idHashIndexMap.get(currentDocIDHashArray[z])).intValue()] =
-                          newCurrentVersions[z];
-                        z++;
-                      }
 
                       if (Logging.threads.isDebugEnabled())
                         Logging.threads.debug("Worker thread done getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
@@ -341,10 +337,10 @@ public class WorkerThread extends Thread
 
                       // Mark the current documents to be recrawled at the
                       // time specified, with the proper error handling.
-                      i = 0;
-                      while (i < qds.getCount())
+                      List<QueuedDocument> newActiveList = new ArrayList<QueuedDocument>(activeDocuments.size());
+                      for (int i = 0; i < activeDocuments.size(); i++)
                       {
-                        QueuedDocument qd = qds.getDocument(i++);
+                        QueuedDocument qd = activeDocuments.get(i);
                         DocumentDescription dd = qd.getDocumentDescription();
                         // If either we are going to be requeuing beyond the fail time, OR
                         // the number of retries available has hit 0, THEN we treat this
@@ -354,495 +350,473 @@ public class WorkerThread extends Thread
                         {
                           // Treat this as a hard failure.
                           if (e.isAbortOnFail())
-                            throw new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+                          {
+                            abortOnFail = new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+                            rescanList.add(qd);
+                          }
                           // We want this particular document to be not included in the
                           // reprocessing.  Therefore, we do the same thing as we would
                           // if we got back a null version.
-                          DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
-                          String documentIDHash = dd.getDocumentIdentifierHash();
-                          // See if we need to delete
-                          if (oldDocStatus != null)
-                          {
-                            // Queue up to issue deletion
-                            ingesterDeleteList.add(documentIDHash);
-                            ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
-                          }
-                          jobmanagerDeleteList.add(qd);
+                          deleteList.add(qd);
                         }
                         else
                         {
+                          // Retry this document according to the parameters provided.
                           jobManager.resetDocument(dd,e.getRetryTime(),
                             IJobManager.ACTION_RESCAN,e.getFailTime(),e.getFailRetryCount());
                           qd.setProcessed();
                         }
                       }
-
-                      processDeleteLists(outputName,connector,connection,jobManager,
-                        jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
-                        job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
-
-                      // Done processing this set of documents; they've either been
-                      // requeued, deleted, or an exception has been thrown...
-                      continue;
+                      
+                      // All active documents have been removed from the list
+                      activeDocuments.clear();
+                      
                     }
 
-                    // Organize what we need for document status comparison, and get it into a canonical form.
-                    String newOutputVersion = outputVersion;
-                    if (newOutputVersion == null)
-                      newOutputVersion = "";
-
-                    try
+                    // If version fetch was successful, the go on to processing phase
+                    if (newVersionStringArray != null)
                     {
-                      // Loop through documents now, and amass what we need to fetch.
-                      // We also need to tally: (1) what needs to be marked as deleted via
-                      //   jobManager.markDocumentDeleted();
-                      // (2) what needs to be noted as a deletion to ingester
-                      // (3) what needs to be noted as a check for the ingester
-                      i = 0;
-                      while (i < currentVersions.length)
+                      // This try{ } is for releasing document versions at the connector level.
+                      try
                       {
-                        QueuedDocument qd = qds.getDocument(i);
-                        DocumentDescription dd = qd.getDocumentDescription();
-                        // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
-                        if (abortSet.get(dd.getDocumentIdentifier()) != null)
-                        {
-                          // Special treatment for aborted documents.
-                          // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
-                          // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
-                          // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
-                          // unconditional requeue.
-                          finishList.add(qd);
-                        }
-                        else
+                        // Organize what we need for document status comparison, and get it into a canonical form.
+                        String newOutputVersion = outputVersion;
+                        if (newOutputVersion == null)
+                          newOutputVersion = "";
+
+                        // Loop through documents now, and amass what we need to fetch.
+                        // We also need to tally: (1) what needs to be marked as deleted via
+                        //   jobManager.markDocumentDeleted();
+                        // (2) what needs to be noted as a deletion to ingester
+                        // (3) what needs to be noted as a check for the ingester
+                        for (int i = 0; i < activeDocuments.size(); i++)
                         {
-                          DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
-                          String documentIDHash = dd.getDocumentIdentifierHash();
-                          String newDocVersion = currentVersions[i];
-                          String newAuthorityName = aclAuthority;
-                          if (newAuthorityName == null)
-                            newAuthorityName = "";
-
-                          versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
-
-
-                          if (newDocVersion == null)
+                          QueuedDocument qd = activeDocuments.get(i);
+                          DocumentDescription dd = qd.getDocumentDescription();
+                          // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
+                          if (abortSet.get(dd.getDocumentIdentifier()) != null)
                           {
-                            // See if we need to delete
-                            if (oldDocStatus != null)
-                            {
-                              // Queue up to issue deletion
-                              ingesterDeleteList.add(documentIDHash);
-                              ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
-                            }
-                            // We always add to the jobqueue delete list regardless.
-                            jobmanagerDeleteList.add(qd);
+                            // Special treatment for aborted documents.
+                            // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
+                            // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
+                            // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
+                            // unconditional requeue.
+                            finishList.add(qd);
                           }
                           else
                           {
-                            // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
-                            finishList.add(qd);
+                            DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
+                            String documentIDHash = dd.getDocumentIdentifierHash();
+                            String newDocVersion = newVersionStringArray[i];
+                            String newAuthorityName = aclAuthority;
+                            if (newAuthorityName == null)
+                              newAuthorityName = "";
 
-                            // See if we need to add, or update.
-                            boolean allowIngest = false;
-                            if (oldDocStatus == null)
+                            versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
+
+                            if (newDocVersion == null)
                             {
-                              // Add
-                              allowIngest = true;
-                              // Fall through to allow the processing
+                              deleteList.add(qd);
                             }
                             else
                             {
-                              // Update.  There are two possibilities here.  (1) the same version
-                              // that was there before is there now (which may mean a rescan),
-                              // or (2) there are different versions (which ALWAYS means a rescan).
-                              String oldDocVersion = oldDocStatus.getDocumentVersion();
-                              if (oldDocVersion == null)
-                                oldDocVersion = "";
-                              String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
-                              if (oldAuthorityName == null)
-                                oldAuthorityName = "";
-                              String oldOutputVersion = oldDocStatus.getOutputVersion();
-                              if (oldOutputVersion == null)
-                                oldOutputVersion = "";
+                              // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
+                              finishList.add(qd);
 
-                              // Start the comparison processing
-                              if (newDocVersion.length() == 0)
+                              // See if we need to add, or update.
+                              boolean allowIngest = false;
+                              if (oldDocStatus == null)
                               {
-                                // Always reingest
+                                // Add
                                 allowIngest = true;
+                                // Fall through to allow the processing
                               }
-                              else if (oldDocVersion.equals(newDocVersion) &&
-                                oldAuthorityName.equals(newAuthorityName) &&
-                                oldOutputVersion.equals(newOutputVersion))
+                              else
                               {
-                                // The old logic was as follows:
-                                //
-                                // If the crawl is an incremental crawl, then we do NOT add this
-                                // document to the fetch list, even for scanning and no ingestion.
-                                // But we *do* add it, scan only, if this was a "full crawl".
-                                //
-                                // Apparently this was designed to prevent a document that had
-                                // already been processed and had queued stuff from causing deletions
-                                // under 'full scan' conditions, because those child documents would
-                                // not be requeued then.  This contrasts with the incremental case,
-                                // where we really don't want to refetch the document simply to find
-                                // children - or do we?  The connector has to make that decision, it
-                                // seems to me.  If it's the kind of document that might have children,
-                                // then rescanning is warranted under ANY conditions; if it's not,
-                                // then the connector can decide to just do nothing.
-                                //
-                                // For the kinds of connectors where all documents have children,
-                                // preventing the fetch is not likely to help much.  These kinds of
-                                // connectors (rss and web) depend on the document checksum to
-                                // determine version anyway, so the document is fetched regardless.
-                                // At least we prevent the ingestion.
+                                // Update.  There are two possibilities here.  (1) the same version
+                                // that was there before is there now (which may mean a rescan),
+                                // or (2) there are different versions (which ALWAYS means a rescan).
+                                String oldDocVersion = oldDocStatus.getDocumentVersion();
+                                if (oldDocVersion == null)
+                                  oldDocVersion = "";
+                                String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
+                                if (oldAuthorityName == null)
+                                  oldAuthorityName = "";
+                                String oldOutputVersion = oldDocStatus.getOutputVersion();
+                                if (oldOutputVersion == null)
+                                  oldOutputVersion = "";
 
-                                // Fall through to allow the scanning, but not the ingest
+                                // Start the comparison processing
+                                if (newDocVersion.length() == 0)
+                                {
+                                  // Always reingest
+                                  allowIngest = true;
+                                }
+                                else if (oldDocVersion.equals(newDocVersion) &&
+                                  oldAuthorityName.equals(newAuthorityName) &&
+                                  oldOutputVersion.equals(newOutputVersion))
+                                {
+                                  // The old logic was as follows:
+                                  //
+                                  // If the crawl is an incremental crawl, then we do NOT add this
+                                  // document to the fetch list, even for scanning and no ingestion.
+                                  // But we *do* add it, scan only, if this was a "full crawl".
+                                  //
+                                  // Apparently this was designed to prevent a document that had
+                                  // already been processed and had queued stuff from causing deletions
+                                  // under 'full scan' conditions, because those child documents would
+                                  // not be requeued then.  This contrasts with the incremental case,
+                                  // where we really don't want to refetch the document simply to find
+                                  // children - or do we?  The connector has to make that decision, it
+                                  // seems to me.  If it's the kind of document that might have children,
+                                  // then rescanning is warranted under ANY conditions; if it's not,
+                                  // then the connector can decide to just do nothing.
+                                  //
+                                  // For the kinds of connectors where all documents have children,
+                                  // preventing the fetch is not likely to help much.  These kinds of
+                                  // connectors (rss and web) depend on the document checksum to
+                                  // determine version anyway, so the document is fetched regardless.
+                                  // At least we prevent the ingestion.
+
+                                  // Fall through to allow the scanning, but not the ingest
+                                }
+                                else
+                                  allowIngest = true;
                               }
-                              else
-                                allowIngest = true;
-                            }
 
-                            fetchList.add(new DocumentToProcess(qd,!allowIngest));
-                            if (!allowIngest)
-                              ingesterCheckList.add(documentIDHash);
+                              fetchList.add(new DocumentToProcess(qd,!allowIngest));
+                              if (!allowIngest)
+                                ingesterCheckList.add(documentIDHash);
+                            }
                           }
-                        }
-
-                        // Next version string!
-                        i++;
-                      }
 
+                        }
+                        activeDocuments.clear();
 
-                      // Done figuring out what to process.  Now, we need to process it.
+                        // We are done transfering activeDocuments documents to the other lists for processing.
+                        // Those lists will all need to be processed, but the processList is special because it
+                        // must be processed in the same context as the version fetch.
 
-                      // Clear the documents out of the queue
-                      processDeleteLists(outputName,connector,connection,jobManager,
-                        jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
-                        job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
-
-                      // First, make the things we will need.
-                      activity = new ProcessActivity(threadContext,queueTracker,jobManager,ingester,
-                        currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion);
-                      try
-                      {
-                        // Fetchlist contains what we need to process.
-                        if (fetchList.size() > 0)
+                        // Note the documents that have been checked but not reingested.  This should happen BEFORE we need
+                        // the statistics (which are calculated during the finishlist step below)
+                        if (ingesterCheckList.size() > 0)
                         {
-
-
-                          // Build a list of id's and flags
-                          String[] processIDs = new String[fetchList.size()];
-                          String[] processIDHashes = new String[fetchList.size()];
-                          String[] versions = new String[fetchList.size()];
-                          boolean[] scanOnly = new boolean[fetchList.size()];
-
-                          i = 0;
-                          while (i < fetchList.size())
+                          String[] checkClasses = new String[ingesterCheckList.size()];
+                          String[] checkIDs = new String[ingesterCheckList.size()];
+                          for (int i = 0; i < checkIDs.length; i++)
                           {
-                            DocumentToProcess dToP = fetchList.get(i);
-                            DocumentDescription dd = dToP.getDocument().getDocumentDescription();
-                            processIDs[i] = dd.getDocumentIdentifier();
-                            processIDHashes[i] = dd.getDocumentIdentifierHash();
-                            versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
-                            scanOnly[i] = dToP.getScanOnly();
-                            i++;
+                            checkClasses[i] = connectionName;
+                            checkIDs[i] = ingesterCheckList.get(i);
                           }
+                          ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
+                        }
 
-                          // Now, process in bulk
-                          try
+                        // First, make the things we will need for all subsequent steps.
+                        ProcessActivity activity = new ProcessActivity(threadContext,queueTracker,jobManager,ingester,
+                          currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion);
+                        try
+                        {
+
+                          // Fetchlist contains what we need to process.
+                          if (fetchList.size() > 0)
                           {
+                            // Build a list of id's and flags
+                            String[] processIDs = new String[fetchList.size()];
+                            String[] processIDHashes = new String[fetchList.size()];
+                            String[] versions = new String[fetchList.size()];
+                            boolean[] scanOnly = new boolean[fetchList.size()];
+
+                            for (int i = 0; i < fetchList.size(); i++)
+                            {
+                              DocumentToProcess dToP = fetchList.get(i);
+                              DocumentDescription dd = dToP.getDocument().getDocumentDescription();
+                              processIDs[i] = dd.getDocumentIdentifier();
+                              processIDHashes[i] = dd.getDocumentIdentifierHash();
+                              versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
+                              scanOnly[i] = dToP.getScanOnly();
+                            }
+
                             if (Thread.currentThread().isInterrupted())
                               throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
                             if (Logging.threads.isDebugEnabled())
                               Logging.threads.debug("Worker thread about to process "+Integer.toString(processIDs.length)+" documents");
 
-                            connector.processDocuments(processIDs,versions,activity,job.getSpecification(),scanOnly,jobType);
+                            // Now, process in bulk
+                            try
+                            {
 
-                            // Flush remaining references into the database!
-                            activity.flush();
+                              connector.processDocuments(processIDs,versions,activity,job.getSpecification(),scanOnly,jobType);
 
-                            // "Finish" the documents (removing unneeded carrydown info, etc.)
-                            DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
+                              // Flush remaining references into the database!
+                              activity.flush();
 
-                            ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+                              // "Finish" the documents (removing unneeded carrydown info, etc.)
+                              DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
 
-                            if (Logging.threads.isDebugEnabled())
-                              Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
+                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
 
-                          }
-                          catch (ServiceInterruption e)
-                          {
-                            // This service interruption could have resulted
-                            // after some or all of the documents ingested.
-                            // They will therefore need to go into the PENDINGPURGATORY
-                            // state.
-
-                            Logging.jobs.warn("Service interruption reported for job "+
-                              job.getID()+" connection '"+job.getConnectionName()+"': "+
-                              e.getMessage());
-
-                            // Mark the current documents to be recrawled in the
-                            // time specified, except for the ones beyond their limits.
-                            // Those will either be deleted, or an exception will be thrown that
-                            // will abort the current job.
-
-                            ingesterDeleteList.clear();
-                            ingesterDeleteListUnhashed.clear();
-                            jobmanagerDeleteList.clear();
-                            ArrayList requeueList = new ArrayList();
+                              if (Logging.threads.isDebugEnabled())
+                                Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
 
-                            i = 0;
-                            while (i < finishList.size())
+                            }
+                            catch (ServiceInterruption e)
                             {
-                              QueuedDocument qd = finishList.get(i++);
-                              DocumentDescription dd = qd.getDocumentDescription();
-                              if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
-                                dd.getFailRetryCount() == 0)
+                              // This service interruption could have resulted
+                              // after some or all of the documents ingested.
+                              // They will therefore need to go into the PENDINGPURGATORY
+                              // state.
+
+                              Logging.jobs.warn("Service interruption reported for job "+
+                                job.getID()+" connection '"+job.getConnectionName()+"': "+
+                                e.getMessage());
+
+                              // Mark the current documents to be recrawled in the
+                              // time specified, except for the ones beyond their limits.
+                              // Those will either be deleted, or an exception will be thrown that
+                              // will abort the current job.
+
+                              deleteList.clear();
+                              ArrayList requeueList = new ArrayList();
+
+                              for (int i = 0; i < finishList.size(); i++)
                               {
-                                // Treat this as a hard failure.
-                                if (e.isAbortOnFail())
-                                  throw new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-                                // We want this particular document to be not included in the
-                                // reprocessing.  Therefore, we do the same thing as we would
-                                // if we got back a null version.
-                                DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
-                                String documentIDHash = dd.getDocumentIdentifierHash();
-                                // See if we need to delete
-                                if (oldDocStatus != null)
+                                QueuedDocument qd = finishList.get(i);
+                                DocumentDescription dd = qd.getDocumentDescription();
+                                if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+                                  dd.getFailRetryCount() == 0)
                                 {
-                                  // Queue up to issue deletion
-                                  ingesterDeleteList.add(documentIDHash);
-                                  ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
+                                  // Treat this as a hard failure.
+                                  if (e.isAbortOnFail())
+                                  {
+                                    abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+                                    rescanList.add(qd);
+                                  }
+                                  // We want this particular document to be not included in the
+                                  // reprocessing.  Therefore, we do the same thing as we would
+                                  // if we got back a null version.
+                                  deleteList.add(qd);
+                                }
+                                else
+                                {
+                                  requeueList.add(qd);
                                 }
-                                jobmanagerDeleteList.add(qd);
-                              }
-                              else
-                              {
-                                requeueList.add(qd);
                               }
-                            }
-
-                            // Requeue the documents we've identified
-                            requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
-                              e.getFailRetryCount());
-
-                            // Process the deletions too.
-                            processDeleteLists(outputName,connector,connection,jobManager,
-                              jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
-                              job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
 
-                            continue;
-                          }
-                        }
+                              // Requeue the documents we've identified
+                              requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+                                e.getFailRetryCount());
 
-                        // Note the documents that have been checked but not reingested.  This should happen BEFORE we need
-                        // the statistics (which are calculated during the finishlist step below)
-                        if (ingesterCheckList.size() > 0)
-                        {
-                          String[] checkClasses = new String[ingesterCheckList.size()];
-                          String[] checkIDs = new String[ingesterCheckList.size()];
-                          i = 0;
-                          while (i < checkIDs.length)
-                          {
-                            checkClasses[i] = connectionName;
-                            checkIDs[i] = ingesterCheckList.get(i);
-                            i++;
-                          }
-                          ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
-                        }
+                            }
+                          } // End of fetching
 
-                        // Go through the finish list and either mark completed, or requeue, depending on the kind of job this is.
-                        if (finishList.size() > 0)
-                        {
-                          // In both job types, we have to go through the finishList to figure out what to do with the documents.
-                          // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
-                          switch (job.getType())
+                          if (finishList.size() > 0)
                           {
-                          case IJobDescription.TYPE_CONTINUOUS:
+                            // In both job types, we have to go through the finishList to figure out what to do with the documents.
+                            // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
+                            switch (job.getType())
                             {
-                              // We need to populate timeArray
-                              String[] timeIDClasses = new String[finishList.size()];
-                              String[] timeIDHashes = new String[finishList.size()];
-                              i = 0;
-                              while (i < timeIDHashes.length)
-                              {
-                                QueuedDocument qd = (QueuedDocument)finishList.get(i);
-                                DocumentDescription dd = qd.getDocumentDescription();
-                                String documentIDHash = dd.getDocumentIdentifierHash();
-                                timeIDClasses[i] = connectionName;
-                                timeIDHashes[i] = documentIDHash;
-                                i++;
-                              }
-                              long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(outputName,timeIDClasses,timeIDHashes);
-                              Long[] recheckTimeArray = new Long[timeArray.length];
-                              int[] actionArray = new int[timeArray.length];
-                              DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
-                              i = 0;
-                              while (i < finishList.size())
+                            case IJobDescription.TYPE_CONTINUOUS:
                               {
-                                QueuedDocument qd = finishList.get(i);
-                                recrawlDocs[i] = qd.getDocumentDescription();
-                                String documentID = recrawlDocs[i].getDocumentIdentifier();
-
-                                // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
-                                boolean wasAborted = abortSet.get(documentID) != null;
-                                if (wasAborted)
+                                // We need to populate timeArray
+                                String[] timeIDClasses = new String[finishList.size()];
+                                String[] timeIDHashes = new String[finishList.size()];
+                                for (int i = 0; i < timeIDHashes.length; i++)
                                 {
-                                  // Requeue for immediate reprocessing
-                                  if (Logging.scheduling.isDebugEnabled())
-                                    Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
-
-                                  actionArray[i] = IJobManager.ACTION_RESCAN;
-                                  recheckTimeArray[i] = new Long(0L);     // Must not use null; that means 'never'.
+                                  QueuedDocument qd = (QueuedDocument)finishList.get(i);
+                                  DocumentDescription dd = qd.getDocumentDescription();
+                                  String documentIDHash = dd.getDocumentIdentifierHash();
+                                  timeIDClasses[i] = connectionName;
+                                  timeIDHashes[i] = documentIDHash;
                                 }
-                                else
+                                long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(outputName,timeIDClasses,timeIDHashes);
+                                Long[] recheckTimeArray = new Long[timeArray.length];
+                                int[] actionArray = new int[timeArray.length];
+                                DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
+                                for (int i = 0; i < finishList.size(); i++)
                                 {
-                                  // Calculate the next time to run, or time to expire.
-
-                                  // For run time, the formula is to calculate the running avg interval between changes,
-                                  // add an additional interval (which comes from the job description),
-                                  // and add that to the current time.
-                                  // One caveat: we really want to calculate the interval from the last
-                                  // time change was detected, but this is not implemented yet.
-                                  long timeAmt = timeArray[i];
-                                  // null value indicates never to schedule
-
-                                  Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
-                                  Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
-
-
-                                  // Merge the two times together.  We decide on the action based on the action with the lowest time.
-                                  if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
+                                  QueuedDocument qd = finishList.get(i);
+                                  recrawlDocs[i] = qd.getDocumentDescription();
+                                  String documentID = recrawlDocs[i].getDocumentIdentifier();
+
+                                  // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
+                                  boolean wasAborted = abortSet.get(documentID) != null;
+                                  if (wasAborted)
                                   {
+                                    // Requeue for immediate reprocessing
                                     if (Logging.scheduling.isDebugEnabled())
-                                      Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
-                                    recheckTimeArray[i] = recrawlTime;
+                                      Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
+
                                     actionArray[i] = IJobManager.ACTION_RESCAN;
-                                  }
-                                  else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
-                                  {
-                                    if (Logging.scheduling.isDebugEnabled())
-                                      Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
-                                    recheckTimeArray[i] = expireTime;
-                                    actionArray[i] = IJobManager.ACTION_REMOVE;
+                                    recheckTimeArray[i] = new Long(0L);     // Must not use null; that means 'never'.
                                   }
                                   else
                                   {
-                                    // Default activity if conflict will be rescan
-                                    if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
-                                      Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
-                                    recheckTimeArray[i] = recrawlTime;
-                                    actionArray[i] = IJobManager.ACTION_RESCAN;
+                                    // Calculate the next time to run, or time to expire.
+
+                                    // For run time, the formula is to calculate the running avg interval between changes,
+                                    // add an additional interval (which comes from the job description),
+                                    // and add that to the current time.
+                                    // One caveat: we really want to calculate the interval from the last
+                                    // time change was detected, but this is not implemented yet.
+                                    long timeAmt = timeArray[i];
+                                    // null value indicates never to schedule
+
+                                    Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
+                                    Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
+
+
+                                    // Merge the two times together.  We decide on the action based on the action with the lowest time.
+                                    if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
+                                    {
+                                      if (Logging.scheduling.isDebugEnabled())
+                                        Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+                                      recheckTimeArray[i] = recrawlTime;
+                                      actionArray[i] = IJobManager.ACTION_RESCAN;
+                                    }
+                                    else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
+                                    {
+                                      if (Logging.scheduling.isDebugEnabled())
+                                        Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
+                                      recheckTimeArray[i] = expireTime;
+                                      actionArray[i] = IJobManager.ACTION_REMOVE;
+                                    }
+                                    else
+                                    {
+                                      // Default activity if conflict will be rescan
+                                      if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
+                                        Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+                                      recheckTimeArray[i] = recrawlTime;
+                                      actionArray[i] = IJobManager.ACTION_RESCAN;
+                                    }
                                   }
                                 }
-                                i++;
-                              }
 
-                              jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
+                                jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
 
-                            }
-                            break;
-                          case IJobDescription.TYPE_SPECIFIED:
-                            {
-                              // Separate the ones we actually finished from the ones we need to requeue because they were aborted
-                              List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
-                              List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
-                              i = 0;
-                              while (i < finishList.size())
+                              }
+                              break;
+                            case IJobDescription.TYPE_SPECIFIED:
                               {
-                                QueuedDocument qd = finishList.get(i++);
-                                DocumentDescription dd = qd.getDocumentDescription();
-                                if (abortSet.get(dd.getDocumentIdentifier()) != null)
-                                {
-                                  // The document was aborted, so put it into the abortedList
-                                  abortedList.add(dd);
-                                }
-                                else
+                                // Separate the ones we actually finished from the ones we need to requeue because they were aborted
+                                List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+                                List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
+                                for (int i = 0; i < finishList.size(); i++)
                                 {
-                                  // The document was completed.
-                                  completedList.add(dd);
+                                  QueuedDocument qd = finishList.get(i);
+                                  DocumentDescription dd = qd.getDocumentDescription();
+                                  if (abortSet.get(dd.getDocumentIdentifier()) != null)
+                                  {
+                                    // The document was aborted, so put it into the abortedList
+                                    abortedList.add(dd);
+                                  }
+                                  else
+                                  {
+                                    // The document was completed.
+                                    completedList.add(dd);
+                                  }
                                 }
-                              }
 
-                              // Requeue the ones that must be repeated
-                              if (abortedList.size() > 0)
-                              {
-                                DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
-                                Long[] recheckTimeArray = new Long[docDescriptions.length];
-                                int[] actionArray = new int[docDescriptions.length];
-                                i = 0;
-                                while (i < docDescriptions.length)
+                                // Requeue the ones that must be repeated
+                                if (abortedList.size() > 0)
                                 {
-                                  docDescriptions[i] = abortedList.get(i);
-                                  recheckTimeArray[i] = new Long(0L);
-                                  actionArray[i] = IJobManager.ACTION_RESCAN;
-                                  i++;
-                                }
+                                  DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
+                                  Long[] recheckTimeArray = new Long[docDescriptions.length];
+                                  int[] actionArray = new int[docDescriptions.length];
+                                  for (int i = 0; i < docDescriptions.length; i++)
+                                  {
+                                    docDescriptions[i] = abortedList.get(i);
+                                    recheckTimeArray[i] = new Long(0L);
+                                    actionArray[i] = IJobManager.ACTION_RESCAN;
+                                  }
 
-                                jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
-                              }
+                                  jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
+                                }
 
-                              // Mark the ones completed that were actually completed.
-                              if (completedList.size() > 0)
-                              {
-                                DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
-                                i = 0;
-                                while (i < docDescriptions.length)
+                                // Mark the ones completed that were actually completed.
+                                if (completedList.size() > 0)
                                 {
-                                  docDescriptions[i] = (DocumentDescription)completedList.get(i);
-                                  i++;
-                                }
+                                  DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
+                                  for (int i = 0; i < docDescriptions.length; i++)
+                                  {
+                                    docDescriptions[i] = (DocumentDescription)completedList.get(i);
+                                  }
 
-                                jobManager.markDocumentCompletedMultiple(docDescriptions);
+                                  jobManager.markDocumentCompletedMultiple(docDescriptions);
+                                }
                               }
+                              break;
+                            default:
+                              throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
                             }
-                            break;
-                          default:
-                            throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
-                          }
 
-                          // Finally, if we're still alive, mark everything as "processed".
-                          i = 0;
-                          while (i < finishList.size())
-                          {
-                            QueuedDocument qd = finishList.get(i++);
-                            qd.setProcessed();
-                          }
+                            // Finally, if we're still alive, mark everything as "processed".
+                            for (int i = 0; i < finishList.size(); i++)
+                            {
+                              QueuedDocument qd = finishList.get(i);
+                              qd.setProcessed();
+                            }
 
+                          }
+                        
+                        }
+                        finally
+                        {
+                          // Make sure we don't leave any dangling carrydown files
+                          activity.discard();
                         }
+                          
+                        // Successful processing of the set
+                        // We count 'get version' time in the average, so even if we decide not to process a doc
+                        // it still counts.
+                        queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
+
                       }
                       finally
                       {
-                        // Make sure we don't leave any dangling carrydown files
-                        activity.discard();
+                        // Release any document temporary storage held by the connector
+                        connector.releaseDocumentVersions(currentDocIDArray,newVersionStringArray);
                       }
-
-                      // Successful processing of the set
-                      // We count 'get version' time in the average, so even if we decide not to process a doc
-                      // it still counts.
-                      queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
-
-                    }
-                    finally
-                    {
-                      // Release any document temporary storage held by the connector
-                      connector.releaseDocumentVersions(docIDArray,currentVersions);
+                    
                     }
                   }
-                  finally
-                  {
-                    RepositoryConnectorFactory.release(connector);
-                  }
+                  
+                  // Now, handle the delete list
+                  processDeleteLists(outputName,connector,connection,jobManager,
+                    deleteList,ingester,
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+
+                  // Handle hopcount removal
+                  processHopcountRemovalLists(outputName,connector,connection,jobManager,
+                    hopcountremoveList,ingester,
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+
+                }
+                finally
+                {
+                  RepositoryConnectorFactory.release(connector);
                 }
+              
               }
+              
+              // Handle rescanning
+              for (int i = 0; i < rescanList.size(); i++)
+              {
+                QueuedDocument qd = rescanList.get(i);
+                jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
+                qd.setProcessed();
+              }
+                
             }
             finally
             {
               // Note termination of processing of these documents in the overlap calculator
               qds.endProcessing(queueTracker);
             }
+            
+            if (abortOnFail != null)
+              throw abortOnFail;
+            
           }
           catch (ManifoldCFException e)
           {
@@ -948,34 +922,89 @@ public class WorkerThread extends Thread
     return true;
   }
 
+  protected static void moveList(List<QueuedDocument> sourceList, List<QueuedDocument> targetList)
+  {
+    for (int i = 0; i < sourceList.size(); i++)
+    {
+      targetList.add(sourceList.get(i));
+    }
+    sourceList.clear();
+  }
+
+  /** Mark specified documents as 'hopcount removed', and remove them from the
+  * index.  Documents in this state are presumed to have:
+  * (a) nothing in the index
+  * (b) no intrinsic links for which they are the origin
+  * In order to guarantee this situation, this method must be capable of doing much
+  * of what the deletion method must do.  Specifically, it should be capable of deleting
+  * documents from the index should they be already present.
+  */
+  protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
+    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> hopcountremoveList,
+    IIncrementalIngester ingester,
+    Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
+    int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    throws ManifoldCFException
+  {
+    // Remove from index
+    hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+    // Mark as 'hopcountremoved' in the job queue
+    processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
+      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+  }
+
   /** Clear specified documents out of the job queue and from the appliance.
   *@param outputName is the output connection name.
   *@param jobManager is the job manager.
-  *@param jobmanagerDeleteList is a list of QueuedDocument objects to clean out.
+  *@param deleteList is a list of QueuedDocument objects to clean out.
   *@param ingester is the handle to the incremental ingestion API control object.
   *@param ingesterDeleteList is a list of document id's to delete.
   */
   protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> jobmanagerDeleteList,
-    IIncrementalIngester ingester, List<String> ingesterDeleteList, List<String> ingesterDeleteListUnhashed,
+    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> deleteList,
+    IIncrementalIngester ingester,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
     int hopcountMethod, QueueTracker queueTracker, long currentTime)
     throws ManifoldCFException
   {
-    String connectionName = connection.getName();
+    // Remove from index
+    deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+    // Delete from the job queue
+    processJobQueueDeletions(deleteList,connector,connection,
+      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+  }
 
+  /** Remove a specified set of documents from the index.
+  *@return the list of documents whose state needs to be updated in jobqueue.
+  */
+  protected static List<QueuedDocument> removeFromIndex(String outputName,
+    String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList, 
+    IIncrementalIngester ingester, OutputActivity ingestLogger)
+    throws ManifoldCFException
+  {
+    List<String> ingesterDeleteList = new ArrayList<String>(deleteList.size());
+    for (int i = 0; i < deleteList.size(); i++)
+    {
+      QueuedDocument qd = deleteList.get(i);
+      DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
+      // See if we need to delete from index
+      if (oldDocStatus != null)
+      {
+        // Queue up to issue deletion
+        ingesterDeleteList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+      }
+    }
+    
     // First, do the ingester delete list.  This guarantees that if the ingestion system is down, this operation will be handled atomically.
     if (ingesterDeleteList.size() > 0)
     {
       String[] deleteClasses = new String[ingesterDeleteList.size()];
       String[] deleteIDs = new String[ingesterDeleteList.size()];
 
-      int i = 0;
-      while (i < ingesterDeleteList.size())
+      for (int i = 0; i < ingesterDeleteList.size(); i++)
       {
         deleteClasses[i] = connectionName;
         deleteIDs[i] = ingesterDeleteList.get(i);
-        i++;
       }
       
       // Try to delete the documents via the output connection.
@@ -987,24 +1016,24 @@ public class WorkerThread extends Thread
       {
         // It looks like the output connection is not currently functioning, so we need to requeue instead of deleting
         // those documents that could not be removed.
-        List<QueuedDocument> newJobmanagerDeleteList = new ArrayList<QueuedDocument>();
+        List<QueuedDocument> newDeleteList = new ArrayList<QueuedDocument>();
         List<QueuedDocument> newRequeueList = new ArrayList<QueuedDocument>();
         
-        Map<String,String> ingesterMap = new HashMap<String,String>();
+        Set<String> ingesterSet = new HashSet<String>();
         for (int j = 0 ; j < ingesterDeleteList.size() ; j++)
         {
           String id = ingesterDeleteList.get(j);
-          ingesterMap.put(id,id);
+          ingesterSet.add(id);
         }
-        for (int j = 0 ; j < jobmanagerDeleteList.size() ; j++)
+        for (int j = 0 ; j < deleteList.size() ; j++)
         {
-          QueuedDocument qd = jobmanagerDeleteList.get(j);
+          QueuedDocument qd = deleteList.get(j);
           DocumentDescription dd = qd.getDocumentDescription();
-          String documentIdentifier = dd.getDocumentIdentifier();
-          if (ingesterMap.get(documentIdentifier) != null)
+          String documentIdentifierHash = dd.getDocumentIdentifierHash();
+          if (ingesterSet.contains(documentIdentifierHash))
             newRequeueList.add(qd);
           else
-            newJobmanagerDeleteList.add(qd);
+            newDeleteList.add(qd);
         }
 
         // Requeue those that are supposed to be requeued
@@ -1012,13 +1041,10 @@ public class WorkerThread extends Thread
           e.getFailRetryCount());
         
         // Process the ones that are just new job queue changes
-        jobmanagerDeleteList = newJobmanagerDeleteList;
+        deleteList = newDeleteList;
       }
     }
-    
-    // Delete from the job queue
-    processJobQueueDeletions(jobmanagerDeleteList,connector,connection,
-      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+    return deleteList;
   }
   
   /** Process job queue deletions.  Either the indexer has already been updated, or it is not necessary to update it.
@@ -1032,12 +1058,10 @@ public class WorkerThread extends Thread
     if (jobmanagerDeleteList.size() > 0)
     {
       DocumentDescription[] deleteDescriptions = new DocumentDescription[jobmanagerDeleteList.size()];
-      int i = 0;
-      while (i < deleteDescriptions.length)
+      for (int i = 0; i < deleteDescriptions.length; i++)
       {
         QueuedDocument qd = jobmanagerDeleteList.get(i);
         deleteDescriptions[i] = qd.getDocumentDescription();
-        i++;
       }
 
       // Do the actual work.
@@ -1047,10 +1071,41 @@ public class WorkerThread extends Thread
       ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
 
       // Mark all these as done
-      i = 0;
-      while (i < jobmanagerDeleteList.size())
+      for (int i = 0; i < jobmanagerDeleteList.size(); i++)
+      {
+        QueuedDocument qd = jobmanagerDeleteList.get(i);
+        qd.setProcessed();
+      }
+    }
+  }
+
+  /** Process job queue hopcount removals.  All indexer updates have already taken place.
+  */
+  protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList,
+    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    throws ManifoldCFException
+  {
+    // Now, do the document queue cleanup for deletions.
+    if (jobmanagerRemovalList.size() > 0)
+    {
+      DocumentDescription[] removalDescriptions = new DocumentDescription[jobmanagerRemovalList.size()];
+      for (int i = 0; i < removalDescriptions.length; i++)
+      {
+        QueuedDocument qd = jobmanagerRemovalList.get(i);
+        removalDescriptions[i] = qd.getDocumentDescription();
+      }
+
+      // Do the actual work.
+      DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod);
+
+      // Requeue those documents that had carrydown data modifications
+      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+
+      // Mark all these as done
+      for (int i = 0; i < jobmanagerRemovalList.size(); i++)
       {
-        QueuedDocument qd = jobmanagerDeleteList.get(i++);
+        QueuedDocument qd = jobmanagerRemovalList.get(i);
         qd.setProcessed();
       }
     }

Modified: manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java (original)
+++ manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java Mon Aug 13 00:44:05 2012
@@ -134,7 +134,7 @@ public class HopcountTester
     // But the max hopcount is 2, so one file will be left behind, so the count should be 6, not 7.
     if (status.getDocumentsProcessed() != 6)
       throw new ManifoldCFException("Wrong number of documents processed - expected 6, saw "+new Long(status.getDocumentsProcessed()).toString());
-
+      
     // Now, delete the job.
     jobManager.deleteJob(job.getID());
     instance.waitJobDeletedNative(jobManager,job.getID(), 120000L);

Modified: manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java (original)
+++ manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java Mon Aug 13 00:44:05 2012
@@ -80,8 +80,8 @@ public class BigCrawlTester
     job.setType(job.TYPE_SPECIFIED);
     job.setStartMethod(job.START_DISABLE);
     job.setHopcountMode(job.HOPCOUNT_ACCURATE);
-    job.addHopCountFilter("link",new Long(3));
-    job.addHopCountFilter("redirect",new Long(2));
+    job.addHopCountFilter("link",new Long(2));
+    //job.addHopCountFilter("redirect",new Long(2));
 
     // Now, set up the document specification.
     DocumentSpecification ds = job.getSpecification();
@@ -120,9 +120,18 @@ public class BigCrawlTester
     // Check to be sure we actually processed the right number of documents.
     JobStatus status = jobManager.getStatus(job.getID());
     // Four levels deep from 10 site seeds: Each site seed has 1 + 10 + 100 + 1000 = 1111 documents, so 10 has 11110.
-    if (status.getDocumentsProcessed() != 11110)
-      throw new ManifoldCFException("Wrong number of documents processed - expected 11110, saw "+new Long(status.getDocumentsProcessed()).toString());
-      
+    if (status.getDocumentsProcessed() != 1110)
+    {
+      System.err.println("Sleeping for database inspection");
+      while (true)
+      {
+        if (1 < 0)
+          break;
+        Thread.sleep(10000L);
+      }
+      throw new ManifoldCFException("Wrong number of documents processed - expected 1110, saw "+new Long(status.getDocumentsProcessed()).toString());
+    }
+    
     // Now, delete the job.
     jobManager.deleteJob(job.getID());
     instance.waitJobDeletedNative(jobManager,job.getID(),18000000L);

Modified: manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java (original)
+++ manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java Mon Aug 13 00:44:05 2012
@@ -136,11 +136,16 @@ public class MockWebService
 	generateLink(res,site,parentLevel,parentItem);
       }
       
-      // Generate links to direct children
-      for (int i = 0; i < docsPerLevel; i++)
+      // Temporary: Prevent links to children deeper than a certain level; this is to help
+      // the debug process
+      if (theLevel < 9)
       {
-        int docNumber = i + theItem * docsPerLevel;
-        generateLink(res,site,theLevel+1,docNumber);
+        // Generate links to direct children
+        for (int i = 0; i < docsPerLevel; i++)
+        {
+          int docNumber = i + theItem * docsPerLevel;
+          generateLink(res,site,theLevel+1,docNumber);
+        }
       }
       
       // Generate some limited cross-links to other items at this level



Mime
View raw message