manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1546376 - in /manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/ system/
Date Thu, 28 Nov 2013 14:39:28 GMT
Author: kwright
Date: Thu Nov 28 14:39:27 2013
New Revision: 1546376

URL: http://svn.apache.org/r1546376
Log:
Hook up global minimum thread depth

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java Thu Nov 28 14:39:27 2013
@@ -67,45 +67,11 @@ public class QueueTracker
   /** These are the bin counts for active threads */
   protected final Map<String,BinCount> activeBinCounts = new HashMap<String,BinCount>();
 
-  /** The locker for the minimum depth calculation */
-  protected final Integer minimumDepthLock = new Integer(0);
-  
-  /** The "minimum depth" - which is the smallest bin count of the last document queued.  This helps guarantee that documents that are
-  * newly discovered don't wind up with high priority, but instead wind up about the same as the currently active document priority. */
-  protected double currentMinimumDepth = 0.0;
-
-  /** This flag, when set, indicates that a reset is in progress, so queuetracker bincount updates are ignored. */
-  protected boolean resetInProgress = false;
-
-
   /** Constructor */
   public QueueTracker()
   {
   }
 
-  /** Reset the queue tracker.
-  * This occurs ONLY when we are about to reprioritize all active documents.  It does not affect the portion of the queue tracker that
-  * tracks the active queue.
-  */
-  public void beginReset()
-  {
-    synchronized (minimumDepthLock)
-    {
-      currentMinimumDepth = 0.0;
-      resetInProgress = true;
-    }
-
-  }
-
-  /** Finish the reset operation */
-  public void endReset()
-  {
-    synchronized (minimumDepthLock)
-    {
-      resetInProgress = false;
-    }
-  }
-
   /** Add an access record to the queue tracker.  This happens when a document
   * is added to the in-memory queue, and allows us to keep track of that particular event so
   * we can schedule in a way that meets our distribution goals.
@@ -209,55 +175,6 @@ public class QueueTracker
     }
   }
 
-  /** Assess the current minimum depth.
-  * This method is called to provide to the QueueTracker information about the priorities of the documents being currently
-  * queued.  It is the case that it is unoptimal to assign document priorities that are fundamentally higher than this value,
-  * because then the new documents will be preferentially queued, and the goal of distributing documents across bins will not be
-  * adequately met.
-  *@param binNamesSet is the current set of priorities we see on the queuing operation.
-  */
-  public void assessMinimumDepth(Double[] binNamesSet)
-  {
-    synchronized (minimumDepthLock)
-    {
-      // Ignore all numbers until reset is complete
-      if (!resetInProgress)
-      {
-        //Logging.scheduling.debug("In assessMinimumDepth");
-        int j = 0;
-        double newMinPriority = Double.MAX_VALUE;
-        while (j < binNamesSet.length)
-        {
-          Double binValue = binNamesSet[j++];
-          if (binValue.doubleValue() < newMinPriority)
-            newMinPriority = binValue.doubleValue();
-        }
-
-        if (newMinPriority != Double.MAX_VALUE)
-        {
-          // Convert minPriority to minDepth.
-          // Note that this calculation does not take into account anything having to do with connection rates, throttling,
-          // or other adjustment factors.  It allows us only to obtain the "raw" minimum depth: the depth without any
-          // adjustments.
-          double newMinDepth = Math.exp(newMinPriority)-1.0;
-
-          if (newMinDepth > currentMinimumDepth)
-          {
-            currentMinimumDepth = newMinDepth;
-            if (Logging.scheduling.isDebugEnabled())
-              Logging.scheduling.debug("Setting new minimum depth value to "+new Double(currentMinimumDepth).toString());
-          }
-          else
-          {
-            if (newMinDepth < currentMinimumDepth && Logging.scheduling.isDebugEnabled())
-              Logging.scheduling.debug("Minimum depth value seems to have been set too high too early! currentMin = "+new Double(currentMinimumDepth).toString()+"; queue value = "+new Double(newMinDepth).toString());
-          }
-        }
-      }
-    }
-
-  }
-
 
   /** Note that we have completed processing of a document with a given set of bins.
   * This method gets called when a Worker Thread has finished with a document.
@@ -341,16 +258,6 @@ public class QueueTracker
     return rval;
   }
 
-  /** Get the minimum depth.
-  */
-  public double getMinimumDepth()
-  {
-    synchronized (minimumDepthLock)
-    {
-      return currentMinimumDepth;
-    }
-  }
-  
 
   /** This is the class which allows a mutable integer count value to be saved in the bincount table.
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Thu Nov 28 14:39:27 2013
@@ -277,14 +277,14 @@ public class CrawlerAgent implements IAg
     docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue,processID);
 
     jobStartThread = new JobStartThread(processID);
-    startupThread = new StartupThread(queueTracker,new StartupResetManager(processID),processID);
+    startupThread = new StartupThread(new StartupResetManager(processID),processID);
     startDeleteThread = new StartDeleteThread(new DeleteStartupResetManager(processID),processID);
     finisherThread = new FinisherThread(processID);
     notificationThread = new JobNotificationThread(new NotificationResetManager(processID),processID);
     jobDeleteThread = new JobDeleteThread(processID);
     stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor,processID);
     expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager,processID);
-    setPriorityThread = new SetPriorityThread(queueTracker,numWorkerThreads,blockingDocuments,processID);
+    setPriorityThread = new SetPriorityThread(numWorkerThreads,blockingDocuments,processID);
     historyCleanupThread = new HistoryCleanupThread(processID);
 
     workerThreads = new WorkerThread[numWorkerThreads];
@@ -299,7 +299,7 @@ public class CrawlerAgent implements IAg
     i = 0;
     while (i < numExpireThreads)
     {
-      expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,queueTracker,workerResetManager,processID);
+      expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,workerResetManager,processID);
       i++;
     }
 
@@ -317,12 +317,12 @@ public class CrawlerAgent implements IAg
     i = 0;
     while (i < numCleanupThreads)
     {
-      cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager,processID);
+      cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,docCleanupResetManager,processID);
       i++;
     }
 
-    jobResetThread = new JobResetThread(queueTracker,processID);
-    seedingThread = new SeedingThread(queueTracker,new SeedingResetManager(processID),processID);
+    jobResetThread = new JobResetThread(processID);
+    seedingThread = new SeedingThread(new SeedingResetManager(processID),processID);
     idleCleanupThread = new IdleCleanupThread(processID);
 
     initializationThread = new InitializationThread(queueTracker);
@@ -356,6 +356,7 @@ public class CrawlerAgent implements IAg
         IJobManager jobManager = JobManagerFactory.make(threadContext);
         IBinManager binManager = BinManagerFactory.make(threadContext);
         IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
+        ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
         /* No longer needed, because IAgents specifically initializes/cleans up.
         
@@ -386,9 +387,9 @@ public class CrawlerAgent implements IAg
             break;
 
           // Calculate new priorities for all these documents
-          ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,binManager,
+          ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,
             docs,connectionMap,jobDescriptionMap,
-            queueTracker,currentTime);
+            rt,currentTime);
 
           Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
         }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Thu Nov 28 14:39:27 2013
@@ -50,8 +50,6 @@ public class DocumentCleanupThread exten
   protected final DocumentCleanupQueue documentCleanupQueue;
   /** Delete thread pool reset manager */
   protected final DocCleanupResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
@@ -59,13 +57,12 @@ public class DocumentCleanupThread exten
   *@param id is the worker thread id.
   */
   public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue,
-    QueueTracker queueTracker, DocCleanupResetManager resetManager, String processID)
+    DocCleanupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.id = id;
     this.documentCleanupQueue = documentCleanupQueue;
-    this.queueTracker = queueTracker;
     this.resetManager = resetManager;
     this.processID = processID;
     setName("Document cleanup thread '"+id+"'");
@@ -82,8 +79,8 @@ public class DocumentCleanupThread exten
       IThreadContext threadContext = ThreadContextFactory.make();
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-      IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       // Loop
       while (true)
@@ -239,8 +236,8 @@ public class DocumentCleanupThread exten
                   String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
-                  ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,binManager, requeueCandidates,
-                    connector,connection,queueTracker,currentTime);
+                  ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+                    connector,connection,rt,currentTime);
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Thu Nov 28 14:39:27 2013
@@ -39,22 +39,19 @@ public class ExpireThread extends Thread
   protected final DocumentCleanupQueue documentQueue;
   /** Worker thread pool reset manager */
   protected final WorkerResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
   
   /** Constructor.
   *@param id is the expire thread id.
   */
-  public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager, String processID)
+  public ExpireThread(String id, DocumentCleanupQueue documentQueue, WorkerResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.id = id;
     this.documentQueue = documentQueue;
     this.resetManager = resetManager;
-    this.queueTracker = queueTracker;
     this.processID = processID;
     setName("Expiration thread '"+id+"'");
     setDaemon(true);
@@ -72,8 +69,8 @@ public class ExpireThread extends Thread
       IThreadContext threadContext = ThreadContextFactory.make();
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-      IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       // Loop
       while (true)
@@ -242,8 +239,8 @@ public class ExpireThread extends Thread
                   String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
-                  ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,binManager,requeueCandidates,
-                    connector,connection,queueTracker,currentTime);
+                  ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+                    connector,connection,rt,currentTime);
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java Thu Nov 28 14:39:27 2013
@@ -33,20 +33,17 @@ public class JobResetThread extends Thre
   public static final String _rcsid = "@(#)$Id: JobResetThread.java 991295 2010-08-31 19:12:14Z kwright $";
 
   // Local data
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
   /** Constructor.
   */
-  public JobResetThread(QueueTracker queueTracker, String processID)
+  public JobResetThread(String processID)
     throws ManifoldCFException
   {
     super();
     setName("Job reset thread");
     setDaemon(true);
-    this.queueTracker = queueTracker;
     this.processID = processID;
   }
 
@@ -109,7 +106,7 @@ public class JobResetThread extends Thre
           {
             Logging.threads.debug("Job reset thread reprioritizing documents...");
 
-            ManifoldCF.resetAllDocumentPriorities(threadContext,queueTracker,currentTime);
+            ManifoldCF.resetAllDocumentPriorities(threadContext,currentTime,processID);
             
             Logging.threads.debug("Job reset thread done reprioritizing documents.");
 

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Thu Nov 28 14:39:27 2013
@@ -843,9 +843,9 @@ public class ManifoldCF extends org.apac
   
   /** Requeue documents due to carrydown.
   */
-  public static void requeueDocumentsDueToCarrydown(IJobManager jobManager, IBinManager binManager,
+  public static void requeueDocumentsDueToCarrydown(IJobManager jobManager,
     DocumentDescription[] requeueCandidates,
-    IRepositoryConnector connector, IRepositoryConnection connection, QueueTracker queueTracker, long currentTime)
+    IRepositoryConnector connector, IRepositoryConnection connection, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // A list of document descriptions from finishDocuments() above represents those documents that may need to be requeued, for the
@@ -858,7 +858,7 @@ public class ManifoldCF extends org.apac
       DocumentDescription dd = requeueCandidates[q];
       String[] bins = calculateBins(connector,dd.getDocumentIdentifier());
       binNames[q] = bins;
-      docPriorities[q] = new PriorityCalculator(queueTracker,connection,bins,binManager);
+      docPriorities[q] = new PriorityCalculator(rt,connection,bins);
       q++;
     }
 
@@ -907,76 +907,62 @@ public class ManifoldCF extends org.apac
     return connector.getBinNames(documentIdentifier);
   }
 
-  protected final static String resetDocPrioritiesLock = "_RESETPRIORITIES_";
-  
   /** Reset all (active) document priorities.  This operation may occur due to various externally-triggered
   * events, such a job abort, pause, resume, wait, or unwait.
   */
-  public static void resetAllDocumentPriorities(IThreadContext threadContext, QueueTracker queueTracker, long currentTime)
+  public static void resetAllDocumentPriorities(IThreadContext threadContext, long currentTime, String processID)
     throws ManifoldCFException
   {
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     IJobManager jobManager = JobManagerFactory.make(threadContext);
-    IBinManager binManager = BinManagerFactory.make(threadContext);
     IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
-    
-    // Only one thread allowed at a time
-    lockManager.enterWriteLock(resetDocPrioritiesLock);
-    try
-    {
-      // Reset the queue tracker
-      queueTracker.beginReset();
-      // Perform the reprioritization, for all active documents in active jobs.  During this time,
-      // it is safe to have other threads assign new priorities to documents, but it is NOT safe
-      // for other threads to attempt to change the minimum priority level.  The queuetracker object
-      // will therefore block that from occurring, until the reset is complete.
-      try
-      {
-        // Does this need to be in a transaction??? Can it fail and require retries?
-        binManager.reset();
-
-        // Reprioritize all documents in the jobqueue, 1000 at a time
+    ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
-        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
-        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+    String reproID = IDFactory.make(threadContext);
 
-        // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
-        // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
-        // activity.
-        // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
-        // priorities.
-        while (true)
-        {
-          long startTime = System.currentTimeMillis();
+    rt.startReprioritization(System.currentTimeMillis(),processID,reproID);
+    // Reprioritize all documents in the jobqueue, 1000 at a time
 
-          DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
-          if (docs.length == 0)
-            break;
-
-          // Calculate new priorities for all these documents
-          writeDocumentPriorities(threadContext,connectionManager,jobManager,binManager,docs,connectionMap,jobDescriptionMap,
-            queueTracker,currentTime);
+    Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+    Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+    
+    // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
+    // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
+    // activity.
+    // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+    // priorities.
+    while (true)
+    {
+      long startTime = System.currentTimeMillis();
 
-          Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
-        }
-      }
-      finally
+      Long currentTimeValue = rt.checkReprioritizationInProgress();
+      if (currentTimeValue == null)
       {
-        queueTracker.endReset();
+        // Some other process or thread superceded us.
+        return;
       }
+      long updateTime = currentTimeValue.longValue();
+      
+      DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+      if (docs.length == 0)
+        break;
+
+      // Calculate new priorities for all these documents
+      writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+        rt,updateTime);
+
+      Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
     }
-    finally
-    {
-      lockManager.leaveWriteLock(resetDocPrioritiesLock);
-    }
+    
+    rt.doneReprioritization(reproID);
   }
   
   /** Write a set of document priorities, based on the current queue tracker.
   */
   public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager mgr,
-    IJobManager jobManager, IBinManager binManager, DocumentDescription[] descs,
+    IJobManager jobManager, DocumentDescription[] descs,
     Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap,
-    QueueTracker queueTracker, long currentTime)
+    ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     if (Logging.scheduling.isDebugEnabled())
@@ -1023,7 +1009,7 @@ public class ManifoldCF extends org.apac
         RepositoryConnectorFactory.release(connector);
       }
 
-      priorities[i] = new PriorityCalculator(queueTracker,connection,binNames,binManager);
+      priorities[i] = new PriorityCalculator(rt,connection,binNames);
 
       i++;
     }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PriorityCalculator.java Thu Nov 28 14:39:27 2013
@@ -35,18 +35,16 @@ public class PriorityCalculator implemen
   */
   private final static double minMsPerFetch = 50.0;
 
-  protected final QueueTracker queueTracker;
   protected final IRepositoryConnection connection;
   protected final String[] binNames;
-  protected final IBinManager binManager;
+  protected final ReprioritizationTracker rt;
   
   /** Constructor. */
-  public PriorityCalculator(QueueTracker queueTracker, IRepositoryConnection connection, String[] documentBins, IBinManager binManager)
+  public PriorityCalculator(ReprioritizationTracker rt, IRepositoryConnection connection, String[] documentBins)
   {
-    this.queueTracker = queueTracker;
     this.connection = connection;
     this.binNames = documentBins;
-    this.binManager = binManager;
+    this.rt = rt;
   }
 
   /** Calculate a document priority value.  Priorities are reversed, and in log space, so that
@@ -93,7 +91,7 @@ public class PriorityCalculator implemen
     double[] weightedMinimumDepths = new double[binNames.length];
 
     // Before calculating priority, calculate some factors that will allow us to determine the proper starting value for a bin.
-    double currentMinimumDepth = queueTracker.getMinimumDepth();
+    double currentMinimumDepth = rt.getMinimumDepth();
 
     // First thing to do is to reset the bin values based on the current minimum.
     for (int i = 0; i < binNames.length; i++)
@@ -121,7 +119,7 @@ public class PriorityCalculator implemen
       double binCountScaleFactor = binCountScaleFactors[i];
       double weightedMinimumDepth = weightedMinimumDepths[i];
 
-      double thisCount = binManager.getIncrementBinValue(binName,weightedMinimumDepth);
+      double thisCount = rt.getIncrementBinValue(binName,weightedMinimumDepth);
       double adjustedCount;
       // Use the scale factor already calculated above to yield a priority that is adjusted for the fetch rate.
       if (binCountScaleFactor == Double.POSITIVE_INFINITY)

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ReprioritizationTracker.java Thu Nov 28 14:39:27 2013
@@ -35,6 +35,7 @@ public class ReprioritizationTracker
 
   protected final static String trackerWriteLock = "_REPR_TRACKER_LOCK_";
   protected final static String trackerProcessIDResource = "_REPR_TRACKER_PID_";
+  protected final static String trackerReproIDResource = "_REPR_TRACKER_RID_";
   protected final static String trackerTimestampResource = "_REPR_TIMESTAMP_";
   protected final static String trackerMinimumDepthResource = "_REPR_MINDEPTH_";
   
@@ -55,8 +56,9 @@ public class ReprioritizationTracker
   *@param prioritizationTime is the timestamp of the prioritization.
   *@param processID is the process ID of the process performing/waiting for the prioritization
   * to complete.
+  *@param reproID is the reprocessing thread ID
   */
-  public void startReprioritization(long prioritizationTime, String processID)
+  public void startReprioritization(long prioritizationTime, String processID, String reproID)
     throws ManifoldCFException
   {
     lockManager.enterWriteLock(trackerWriteLock);
@@ -72,6 +74,7 @@ public class ReprioritizationTracker
       }
       writeTime(new Long(prioritizationTime));
       writeProcessID(processID);
+      writeReproID(reproID);
       try
       {
         binManager.reset();
@@ -80,6 +83,7 @@ public class ReprioritizationTracker
       {
         writeTime(null);
         writeProcessID(null);
+        writeReproID(null);
         if (e instanceof Error)
           throw (Error)e;
         else if (e instanceof RuntimeException)
@@ -110,7 +114,8 @@ public class ReprioritizationTracker
     {
       Long currentTime = readTime();
       String currentProcessID = readProcessID();
-      if (currentTime == null || currentProcessID == null)
+      String currentReproID = readReproID();
+      if (currentTime == null || currentProcessID == null || currentReproID == null)
         return null;
       return currentTime;
     }
@@ -124,7 +129,7 @@ public class ReprioritizationTracker
   * only if the processID matches the one that started the current reprioritization.
   *@param processID is the process ID of the process completing the prioritization.
   */
-  public void doneReprioritization(String processID)
+  public void doneReprioritization(String reproID)
     throws ManifoldCFException
   {
     lockManager.enterWriteLock(trackerWriteLock);
@@ -132,11 +137,13 @@ public class ReprioritizationTracker
     {
       Long currentTime = readTime();
       String currentProcessID = readProcessID();
-      if (currentTime != null && currentProcessID != null && currentProcessID.equals(processID))
+      String currentReproID = readReproID();
+      if (currentTime != null && currentProcessID != null && currentReproID != null && currentReproID.equals(reproID))
       {
         // Null out the fields
         writeTime(null);
         writeProcessID(null);
+        writeReproID(null);
       }
     }
     finally
@@ -158,7 +165,8 @@ public class ReprioritizationTracker
     {
       Long currentTime = readTime();
       String currentProcessID = readProcessID();
-      return (currentTime != null && currentProcessID != null && currentProcessID.equals(processID));
+      String currentReproID = readReproID();
+      return (currentTime != null && currentProcessID != null && currentReproID != null && currentProcessID.equals(processID));
     }
     finally
     {
@@ -238,6 +246,17 @@ public class ReprioritizationTracker
     }
   }
   
+  /** Get a bin value.
+  *@param binName is the bin name.
+  *@param weightedMinimumDepth is the minimum depth to use.
+  *@return the bin value.
+  */
+  public double getIncrementBinValue(String binName, double weightedMinimumDepth)
+    throws ManifoldCFException
+  {
+    return binManager.getIncrementBinValue(binName, weightedMinimumDepth);
+  }
+  
   // Protected methods
   
   /** Read time.
@@ -327,6 +346,47 @@ public class ReprioritizationTracker
     }
   }
 
+  /** Read repriotization ID.
+  *@return reproID, or null if none.
+  */
+  protected String readReproID()
+    throws ManifoldCFException
+  {
+    byte[] reproIDData = lockManager.readData(trackerReproIDResource);
+    if (reproIDData == null)
+      return null;
+    try
+    {
+      return new String(reproIDData, "utf-8");
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      throw new RuntimeException(e.getMessage(),e);
+    }
+  }
+  
+  /** Write repro ID.
+  *@param reproID is the repro ID to write.
+  */
+  protected void writeReproID(String reproID)
+    throws ManifoldCFException
+  {
+    if (reproID == null)
+      lockManager.writeData(trackerReproIDResource, null);
+    else
+    {
+      try
+      {
+        byte[] reproIDData = reproID.getBytes("utf-8");
+        lockManager.writeData(trackerReproIDResource, reproIDData);
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        throw new RuntimeException(e.getMessage(),e);
+      }
+    }
+  }
+
   /** Read minimum depth.
   *@return the minimum depth.
   */

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Thu Nov 28 14:39:27 2013
@@ -40,8 +40,7 @@ public class SeedingActivity implements 
   protected final String connectionName;
   protected final IRepositoryConnectionManager connManager;
   protected final IJobManager jobManager;
-  protected final IBinManager binManager;
-  protected final QueueTracker queueTracker;
+  protected final ReprioritizationTracker rt;
   protected final IRepositoryConnection connection;
   protected final IRepositoryConnector connector;
   protected final Long jobID;
@@ -59,16 +58,15 @@ public class SeedingActivity implements 
   /** Constructor.
   */
   public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager,
-    IJobManager jobManager, IBinManager binManager,
-    QueueTracker queueTracker, IRepositoryConnection connection, IRepositoryConnector connector,
+    IJobManager jobManager,
+    ReprioritizationTracker rt, IRepositoryConnection connection, IRepositoryConnector connector,
     Long jobID, String[] legalLinkTypes, boolean overrideSchedule, int hopcountMethod, String processID)
   {
     this.processID = processID;
     this.connectionName = connectionName;
     this.connManager = connManager;
     this.jobManager = jobManager;
-    this.binManager = binManager;
-    this.queueTracker = queueTracker;
+    this.rt = rt;
     this.connection = connection;
     this.connector = connector;
     this.jobID = jobID;
@@ -225,7 +223,7 @@ public class SeedingActivity implements 
     {
       // Calculate desired document priority based on current queuetracker status.
       String[] bins = connector.getBinNames(docIDs[i]);
-      docPriorities[i] = new PriorityCalculator(queueTracker,connection,bins,binManager);
+      docPriorities[i] = new PriorityCalculator(rt,connection,bins);
 
       i++;
     }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java Thu Nov 28 14:39:27 2013
@@ -38,8 +38,6 @@ public class SeedingThread extends Threa
   // Local data
   /** Seeding reset manager */
   protected final SeedingResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
@@ -48,14 +46,13 @@ public class SeedingThread extends Threa
 
   /** Constructor.
   */
-  public SeedingThread(QueueTracker queueTracker, SeedingResetManager resetManager, String processID)
+  public SeedingThread(SeedingResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     setName("Seeding thread");
     setDaemon(true);
     this.resetManager = resetManager;
-    this.queueTracker = queueTracker;
     this.processID = processID;
   }
 
@@ -68,8 +65,8 @@ public class SeedingThread extends Threa
       // Create a thread context object.
       IThreadContext threadContext = ThreadContextFactory.make();
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-      IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       IDBInterface database = DBInterfaceFactory.make(threadContext,
         ManifoldCF.getMasterDatabaseName(),
@@ -149,7 +146,7 @@ public class SeedingThread extends Threa
                   {
 
                     SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
-                      jobManager,binManager,queueTracker,
+                      jobManager,rt,
                       connection,connector,jobID,legalLinkTypes,false,hopcountMethod,processID);
 
                     if (Logging.threads.isDebugEnabled())

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Thu Nov 28 14:39:27 2013
@@ -38,8 +38,6 @@ public class SetPriorityThread extends T
   public static final String _rcsid = "@(#)$Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Local data
-  /** This is the queue tracker object. */
-  protected final QueueTracker queueTracker;
   /** This is the number of documents per cycle */
   protected final int cycleCount;
   /** The blocking documents object */
@@ -48,13 +46,11 @@ public class SetPriorityThread extends T
   protected final String processID;
 
   /** Constructor.
-  *@param qt is the queue tracker object.
   */
-  public SetPriorityThread(QueueTracker qt, int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
+  public SetPriorityThread(int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
     throws ManifoldCFException
   {
     super();
-    this.queueTracker = qt;
     this.blockingDocuments = blockingDocuments;
     this.processID = processID;
     cycleCount = workerThreadCount * 10;
@@ -72,8 +68,8 @@ public class SetPriorityThread extends T
       IThreadContext threadContext = ThreadContextFactory.make();
       IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-      IBinManager binManager = BinManagerFactory.make(threadContext);
-
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+      
       Logging.threads.debug("Set priority thread coming up");
 
       // Job description map (local) - designed to improve performance.
@@ -130,8 +126,8 @@ public class SetPriorityThread extends T
             DocumentDescription desc = blockingDocuments.getBlockingDocument();
             if (desc != null)
             {
-              ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,binManager,
-                new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,queueTracker,currentTime);
+              ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,
+                new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,rt,currentTime);
               processedCount++;
               continue;
             }

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java Thu Nov 28 14:39:27 2013
@@ -33,8 +33,6 @@ public class StartupThread extends Threa
   public static final String _rcsid = "@(#)$Id: StartupThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Local data
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
   /** Reset manager */
@@ -42,13 +40,12 @@ public class StartupThread extends Threa
   
   /** Constructor.
   */
-  public StartupThread(QueueTracker queueTracker, StartupResetManager resetManager, String processID)
+  public StartupThread(StartupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     setName("Startup thread");
     setDaemon(true);
-    this.queueTracker = queueTracker;
     this.resetManager = resetManager;
     this.processID = processID;
   }
@@ -62,8 +59,8 @@ public class StartupThread extends Threa
       // Create a thread context object.
       IThreadContext threadContext = ThreadContextFactory.make();
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-      IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       IDBInterface database = DBInterfaceFactory.make(threadContext,
         ManifoldCF.getMasterDatabaseName(),
@@ -153,7 +150,7 @@ public class StartupThread extends Threa
                   try
                   {
                     SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
-                      jobManager,binManager,queueTracker,
+                      jobManager,rt,
                       connection,connector,jobID,legalLinkTypes,true,hopcountMethod,processID);
 
                     if (Logging.threads.isDebugEnabled())

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Thu Nov 28 14:39:27 2013
@@ -87,6 +87,7 @@ public class StufferThread extends Threa
       IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt));
 
@@ -166,7 +167,7 @@ public class StufferThread extends Threa
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
-          queueTracker.assessMinimumDepth(depthStatistics.getBins());
+          rt.assessMinimumDepth(depthStatistics.getBins());
 
           // Set the last time to be the current time
           lastTime = currentTime;

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1546376&r1=1546375&r2=1546376&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Thu Nov 28 14:39:27 2013
@@ -77,6 +77,7 @@ public class WorkerThread extends Thread
       IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
       IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
       Map<String,String> versionMap = new HashMap<String,String>();
@@ -528,7 +529,7 @@ public class WorkerThread extends Thread
 
                         // First, make the things we will need for all subsequent steps.
                         ProcessActivity activity = new ProcessActivity(processID,
-                          threadContext,queueTracker,jobManager,binManager,ingester,
+                          threadContext,rt,jobManager,ingester,
                           currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion,newParameterVersion);
                         try
                         {
@@ -570,8 +571,8 @@ public class WorkerThread extends Thread
                               // "Finish" the documents (removing unneeded carrydown info, etc.)
                               DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
 
-                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,binManager,
-                                requeueCandidates,connector,connection,queueTracker,currentTime);
+                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+                                requeueCandidates,connector,connection,rt,currentTime);
 
                               if (Logging.threads.isDebugEnabled())
                                 Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
@@ -819,14 +820,14 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
-                  processDeleteLists(outputName,connector,connection,jobManager,binManager,
+                  processDeleteLists(outputName,connector,connection,jobManager,
                     deleteList,ingester,
-                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
-                  processHopcountRemovalLists(outputName,connector,connection,jobManager, binManager,
+                  processHopcountRemovalLists(outputName,connector,connection,jobManager,
                     hopcountremoveList,ingester,
-                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                 }
                 finally
@@ -977,18 +978,18 @@ public class WorkerThread extends Thread
   * documents from the index should they be already present.
   */
   protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, IBinManager binManager,
+    IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> hopcountremoveList,
     IIncrementalIngester ingester,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
-    int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Remove from index
     hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
     // Mark as 'hopcountremoved' in the job queue
     processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
-      jobManager,binManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+      jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Clear specified documents out of the job queue and from the appliance.
@@ -999,18 +1000,18 @@ public class WorkerThread extends Thread
   *@param ingesterDeleteList is a list of document id's to delete.
   */
   protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, IBinManager binManager,
+    IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> deleteList,
     IIncrementalIngester ingester,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
-    int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Remove from index
     deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
     // Delete from the job queue
     processJobQueueDeletions(deleteList,connector,connection,
-      jobManager,binManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+      jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Remove a specified set of documents from the index.
@@ -1089,8 +1090,8 @@ public class WorkerThread extends Thread
   /** Process job queue deletions.  Either the indexer has already been updated, or it is not necessary to update it.
   */
   protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList,
-    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, IBinManager binManager,
-    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Now, do the document queue cleanup for deletions.
@@ -1107,8 +1108,8 @@ public class WorkerThread extends Thread
       DocumentDescription[] requeueCandidates = jobManager.markDocumentDeletedMultiple(jobID,legalLinkTypes,deleteDescriptions,hopcountMethod);
 
       // Requeue those documents that had carrydown data modifications
-      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,binManager,
-        requeueCandidates,connector,connection,queueTracker,currentTime);
+      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+        requeueCandidates,connector,connection,rt,currentTime);
 
       // Mark all these as done
       for (int i = 0; i < jobmanagerDeleteList.size(); i++)
@@ -1122,8 +1123,8 @@ public class WorkerThread extends Thread
   /** Process job queue hopcount removals.  All indexer updates have already taken place.
   */
   protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList,
-    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, IBinManager binManager,
-    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Now, do the document queue cleanup for deletions.
@@ -1140,8 +1141,8 @@ public class WorkerThread extends Thread
       DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod);
 
       // Requeue those documents that had carrydown data modifications
-      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,binManager,
-        requeueCandidates,connector,connection,queueTracker,currentTime);
+      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+        requeueCandidates,connector,connection,rt,currentTime);
 
       // Mark all these as done
       for (int i = 0; i < jobmanagerRemovalList.size(); i++)
@@ -1438,7 +1439,6 @@ public class WorkerThread extends Thread
     protected final String processID;
     protected final IThreadContext threadContext;
     protected final IJobManager jobManager;
-    protected final IBinManager binManager;
     protected final IIncrementalIngester ingester;
     protected final long currentTime;
     protected final IJobDescription job;
@@ -1447,7 +1447,7 @@ public class WorkerThread extends Thread
     protected final IRepositoryConnectionManager connMgr;
     protected final String[] legalLinkTypes;
     protected final OutputActivity ingestLogger;
-    protected final QueueTracker queueTracker;
+    protected final ReprioritizationTracker rt;
     protected final HashMap abortSet;
     protected final String outputVersion;
     protected final String parameterVersion;
@@ -1469,7 +1469,7 @@ public class WorkerThread extends Thread
     *@param ingester is the ingester
     */
     public ProcessActivity(String processID, IThreadContext threadContext,
-      QueueTracker queueTracker, IJobManager jobManager, IBinManager binManager,
+      ReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester, long currentTime,
       IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector,
       IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
@@ -1477,9 +1477,8 @@ public class WorkerThread extends Thread
     {
       this.processID = processID;
       this.threadContext = threadContext;
-      this.queueTracker = queueTracker;
+      this.rt = rt;
       this.jobManager = jobManager;
-      this.binManager = binManager;
       this.ingester = ingester;
       this.currentTime = currentTime;
       this.job = job;
@@ -2019,7 +2018,7 @@ public class WorkerThread extends Thread
 
           // Calculate desired document priority based on current queuetracker status.
           String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier());
-          priorities[j] = new PriorityCalculator(queueTracker,connection,bins,binManager);
+          priorities[j] = new PriorityCalculator(rt,connection,bins);
 
           // No longer used; the functionality is folded atomically into calculatePriority above:
           //queueTracker.notePrioritySet(currentTime,job.getID(),bins,connection);



Mime
View raw message