manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1546965 [2/2] - in /manifoldcf/trunk: ./ framework/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ framework/pull-agent/src/main/java/org/apache/m...
Date Mon, 02 Dec 2013 11:27:36 GMT
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Mon Dec  2 11:27:35 2013
@@ -31,7 +31,6 @@ public class CrawlerAgent implements IAg
 
   // Thread objects.
   // These get filled in as threads are created.
-  protected InitializationThread initializationThread = null;
   protected JobStartThread jobStartThread = null;
   protected StufferThread stufferThread = null;
   protected FinisherThread finisherThread = null;
@@ -140,13 +139,20 @@ public class CrawlerAgent implements IAg
   * Call this method to clean up dangling persistent state when a cluster is just starting
   * to come up.  This method CANNOT be called when there are any active agents
   * processes at all.
+  *@param processID is the current process ID.
   */
   @Override
-  public void cleanUpAgentData(IThreadContext threadContext)
+  public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(threadContext);
     jobManager.cleanupProcessData();
+    // What kind of reprioritization should be done here?
+    // Answer: since we basically keep everything in the database now, the only kind of reprioritization we need
+    // to take care of are dangling ones that won't get done because the process that was doing them went
+    // away.  BUT: somebody may have blown away lock info, in which case we won't know anything at all.
+    // So we do everything in that case.
+    ManifoldCF.resetAllDocumentPriorities(threadContext,System.currentTimeMillis(),currentProcessID);
   }
   
   /** Cleanup after agents process.
@@ -154,14 +160,58 @@ public class CrawlerAgent implements IAg
   * This method CANNOT be called when the agent is active, but it can
   * be called at any time and by any process in order to guarantee that a terminated
   * agent does not block other agents from completing their tasks.
-  *@param processID is the process ID of the agent to clean up after.
+  *@param currentProcessID is the current process ID.
+  *@param cleanupProcessID is the process ID of the agent to clean up after.
   */
   @Override
-  public void cleanUpAgentData(IThreadContext threadContext, String processID)
+  public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String cleanupProcessID)
     throws ManifoldCFException
   {
     IJobManager jobManager = JobManagerFactory.make(threadContext);
-    jobManager.cleanupProcessData(processID);
+    jobManager.cleanupProcessData(cleanupProcessID);
+    ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+    String reproID = rt.isSpecifiedProcessReprioritizing(cleanupProcessID);
+    if (reproID != null)
+    {
+      // We have to take over the prioritization for the process, which apparently died
+      // in the middle.
+      IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
+
+      // Reprioritize all documents in the jobqueue, 1000 at a time
+
+      Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+      Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+      
+      // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
+      // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
+      // activity.
+      // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+      // priorities.
+      while (true)
+      {
+        long startTime = System.currentTimeMillis();
+
+        Long currentTimeValue = rt.checkReprioritizationInProgress();
+        if (currentTimeValue == null)
+        {
+          // Some other process or thread superceded us.
+          return;
+        }
+        long updateTime = currentTimeValue.longValue();
+        
+        DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+        if (docs.length == 0)
+          break;
+
+        // Calculate new priorities for all these documents
+        ManifoldCF.writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+          rt,updateTime);
+
+        Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+      }
+      
+      rt.doneReprioritization(reproID);
+    }
   }
 
   /** Start the agent.  This method should spin up the agent threads, and
@@ -277,14 +327,14 @@ public class CrawlerAgent implements IAg
     docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue,processID);
 
     jobStartThread = new JobStartThread(processID);
-    startupThread = new StartupThread(queueTracker,new StartupResetManager(processID),processID);
+    startupThread = new StartupThread(new StartupResetManager(processID),processID);
     startDeleteThread = new StartDeleteThread(new DeleteStartupResetManager(processID),processID);
     finisherThread = new FinisherThread(processID);
     notificationThread = new JobNotificationThread(new NotificationResetManager(processID),processID);
     jobDeleteThread = new JobDeleteThread(processID);
     stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor,processID);
     expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager,processID);
-    setPriorityThread = new SetPriorityThread(queueTracker,numWorkerThreads,blockingDocuments,processID);
+    setPriorityThread = new SetPriorityThread(numWorkerThreads,blockingDocuments,processID);
     historyCleanupThread = new HistoryCleanupThread(processID);
 
     workerThreads = new WorkerThread[numWorkerThreads];
@@ -299,7 +349,7 @@ public class CrawlerAgent implements IAg
     i = 0;
     while (i < numExpireThreads)
     {
-      expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,queueTracker,workerResetManager,processID);
+      expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,workerResetManager,processID);
       i++;
     }
 
@@ -317,143 +367,61 @@ public class CrawlerAgent implements IAg
     i = 0;
     while (i < numCleanupThreads)
     {
-      cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager,processID);
+      cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,docCleanupResetManager,processID);
       i++;
     }
 
-    jobResetThread = new JobResetThread(queueTracker,processID);
-    seedingThread = new SeedingThread(queueTracker,new SeedingResetManager(processID),processID);
+    jobResetThread = new JobResetThread(processID);
+    seedingThread = new SeedingThread(new SeedingResetManager(processID),processID);
     idleCleanupThread = new IdleCleanupThread(processID);
 
-    initializationThread = new InitializationThread(queueTracker);
-    // Start the initialization thread.  This does the initialization work and starts all the other threads when that's done.  It then exits.
-    initializationThread.start();
-    Logging.root.info("Pull-agent started");
-  }
+    // Start all the threads
+    jobStartThread.start();
+    startupThread.start();
+    startDeleteThread.start();
+    finisherThread.start();
+    notificationThread.start();
+    jobDeleteThread.start();
+    stufferThread.start();
+    expireStufferThread.start();
+    setPriorityThread.start();
+    historyCleanupThread.start();
 
-  protected class InitializationThread extends Thread
-  {
-
-    protected final QueueTracker queueTracker;
-
-    public InitializationThread(QueueTracker queueTracker)
+    i = 0;
+    while (i < numWorkerThreads)
     {
-      super();
-      this.queueTracker = queueTracker;
-      setName("Initialization thread");
-      setDaemon(true);
+      workerThreads[i].start();
+      i++;
     }
 
-    public void run()
+    i = 0;
+    while (i < numExpireThreads)
     {
-      int i;
-
-      try
-      {
-        IThreadContext threadContext = ThreadContextFactory.make();
-
-        // First, get a job manager
-        IJobManager jobManager = JobManagerFactory.make(threadContext);
-        IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
-
-        /* No longer needed, because IAgents specifically initializes/cleans up.
-        
-        Logging.threads.debug("Agents process starting initialization...");
-
-        // Call the database to get it ready
-        jobManager.prepareForStart();
-        */
-        
-        Logging.threads.debug("Agents process reprioritizing documents...");
-
-        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
-        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
-        // Reprioritize all documents in the jobqueue, 1000 at a time
-        long currentTime = System.currentTimeMillis();
-
-        // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
-        // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
-        // activity.
-        // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
-        // priorities.
-        while (true)
-        {
-          long startTime = System.currentTimeMillis();
-
-          DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
-          if (docs.length == 0)
-            break;
-
-          // Calculate new priorities for all these documents
-          ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap,
-            queueTracker,currentTime);
-
-          Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
-        }
-
-        Logging.threads.debug("Agents process initialization complete!");
-
-        // Start all the threads
-        jobStartThread.start();
-        startupThread.start();
-        startDeleteThread.start();
-        finisherThread.start();
-        notificationThread.start();
-        jobDeleteThread.start();
-        stufferThread.start();
-        expireStufferThread.start();
-        setPriorityThread.start();
-        historyCleanupThread.start();
-
-        i = 0;
-        while (i < numWorkerThreads)
-        {
-          workerThreads[i].start();
-          i++;
-        }
+      expireThreads[i].start();
+      i++;
+    }
 
-        i = 0;
-        while (i < numExpireThreads)
-        {
-          expireThreads[i].start();
-          i++;
-        }
+    cleanupStufferThread.start();
+    i = 0;
+    while (i < numCleanupThreads)
+    {
+      cleanupThreads[i].start();
+      i++;
+    }
 
-        cleanupStufferThread.start();
-        i = 0;
-        while (i < numCleanupThreads)
-        {
-          cleanupThreads[i].start();
-          i++;
-        }
+    deleteStufferThread.start();
+    i = 0;
+    while (i < numDeleteThreads)
+    {
+      deleteThreads[i].start();
+      i++;
+    }
 
-        deleteStufferThread.start();
-        i = 0;
-        while (i < numDeleteThreads)
-        {
-          deleteThreads[i].start();
-          i++;
-        }
+    jobResetThread.start();
+    seedingThread.start();
+    idleCleanupThread.start();
 
-        jobResetThread.start();
-        seedingThread.start();
-        idleCleanupThread.start();
-        // exit!
-      }
-      catch (Throwable e)
-      {
-        // Severe error on initialization
-        if (e instanceof ManifoldCFException)
-        {
-          // Deal with interrupted exception gracefully, because it means somebody is trying to shut us down before we got started.
-          if (((ManifoldCFException)e).getErrorCode() == ManifoldCFException.INTERRUPTED)
-            return;
-        }
-        System.err.println("agents process could not start - shutting down");
-        Logging.threads.fatal("Startup initialization error tossed: "+e.getMessage(),e);
-        System.exit(-300);
-      }
-    }
+    Logging.root.info("Pull-agent started");
   }
 
   /** Stop the system.
@@ -462,7 +430,7 @@ public class CrawlerAgent implements IAg
     throws ManifoldCFException
   {
     Logging.root.info("Shutting down pull-agent...");
-    while (initializationThread != null || jobDeleteThread != null || startupThread != null || startDeleteThread != null ||
+    while (jobDeleteThread != null || startupThread != null || startDeleteThread != null ||
       jobStartThread != null || stufferThread != null ||
       finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null || expireThreads != null ||
       deleteStufferThread != null || deleteThreads != null ||
@@ -472,10 +440,6 @@ public class CrawlerAgent implements IAg
       // Send an interrupt to all threads that are still there.
       // In theory, this only needs to be done once.  In practice, I have seen cases where the thread loses track of the fact that it has been
       // interrupted (which may be a JVM bug - who knows?), but in any case there's no harm in doing it again.
-      if (initializationThread != null)
-      {
-        initializationThread.interrupt();
-      }
       if (historyCleanupThread != null)
       {
         historyCleanupThread.interrupt();
@@ -587,11 +551,6 @@ public class CrawlerAgent implements IAg
       }
 
       // Check to see which died.
-      if (initializationThread != null)
-      {
-        if (!initializationThread.isAlive())
-          initializationThread = null;
-      }
       if (historyCleanupThread != null)
       {
         if (!historyCleanupThread.isAlive())

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Dec  2 11:27:35 2013
@@ -50,8 +50,6 @@ public class DocumentCleanupThread exten
   protected final DocumentCleanupQueue documentCleanupQueue;
   /** Delete thread pool reset manager */
   protected final DocCleanupResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
@@ -59,13 +57,12 @@ public class DocumentCleanupThread exten
   *@param id is the worker thread id.
   */
   public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue,
-    QueueTracker queueTracker, DocCleanupResetManager resetManager, String processID)
+    DocCleanupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.id = id;
     this.documentCleanupQueue = documentCleanupQueue;
-    this.queueTracker = queueTracker;
     this.resetManager = resetManager;
     this.processID = processID;
     setName("Document cleanup thread '"+id+"'");
@@ -83,6 +80,7 @@ public class DocumentCleanupThread exten
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       // Loop
       while (true)
@@ -239,7 +237,7 @@ public class DocumentCleanupThread exten
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
-                    connector,connection,queueTracker,currentTime);
+                    connector,connection,rt,currentTime);
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Mon Dec  2 11:27:35 2013
@@ -39,22 +39,19 @@ public class ExpireThread extends Thread
   protected final DocumentCleanupQueue documentQueue;
   /** Worker thread pool reset manager */
   protected final WorkerResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
   
   /** Constructor.
   *@param id is the expire thread id.
   */
-  public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager, String processID)
+  public ExpireThread(String id, DocumentCleanupQueue documentQueue, WorkerResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     this.id = id;
     this.documentQueue = documentQueue;
     this.resetManager = resetManager;
-    this.queueTracker = queueTracker;
     this.processID = processID;
     setName("Expiration thread '"+id+"'");
     setDaemon(true);
@@ -73,6 +70,7 @@ public class ExpireThread extends Thread
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       // Loop
       while (true)
@@ -242,7 +240,7 @@ public class ExpireThread extends Thread
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
-                    connector,connection,queueTracker,currentTime);
+                    connector,connection,rt,currentTime);
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java Mon Dec  2 11:27:35 2013
@@ -33,20 +33,17 @@ public class JobResetThread extends Thre
   public static final String _rcsid = "@(#)$Id: JobResetThread.java 991295 2010-08-31 19:12:14Z kwright $";
 
   // Local data
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
   /** Constructor.
   */
-  public JobResetThread(QueueTracker queueTracker, String processID)
+  public JobResetThread(String processID)
     throws ManifoldCFException
   {
     super();
     setName("Job reset thread");
     setDaemon(true);
-    this.queueTracker = queueTracker;
     this.processID = processID;
   }
 
@@ -109,7 +106,7 @@ public class JobResetThread extends Thre
           {
             Logging.threads.debug("Job reset thread reprioritizing documents...");
 
-            ManifoldCF.resetAllDocumentPriorities(threadContext,queueTracker,currentTime);
+            ManifoldCF.resetAllDocumentPriorities(threadContext,currentTime,processID);
             
             Logging.threads.debug("Job reset thread done reprioritizing documents.");
 

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Dec  2 11:27:35 2013
@@ -519,10 +519,12 @@ public class ManifoldCF extends org.apac
     IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext);
     IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext);
     IJobManager jobManager = JobManagerFactory.make(threadcontext);
+    IBinManager binManager = BinManagerFactory.make(threadcontext);
     org.apache.manifoldcf.authorities.system.ManifoldCF.installSystemTables(threadcontext);
     repConnMgr.install();
     repCon.install();
     jobManager.install();
+    binManager.install();
   }
 
   /** Uninstall all the crawler system tables.
@@ -534,6 +536,8 @@ public class ManifoldCF extends org.apac
     IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext);
     IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext);
     IJobManager jobManager = JobManagerFactory.make(threadcontext);
+    IBinManager binManager = BinManagerFactory.make(threadcontext);
+    binManager.deinstall();
     jobManager.deinstall();
     repCon.deinstall();
     repConnMgr.deinstall();
@@ -839,13 +843,14 @@ public class ManifoldCF extends org.apac
   
   /** Requeue documents due to carrydown.
   */
-  public static void requeueDocumentsDueToCarrydown(IJobManager jobManager, DocumentDescription[] requeueCandidates,
-    IRepositoryConnector connector, IRepositoryConnection connection, QueueTracker queueTracker, long currentTime)
+  public static void requeueDocumentsDueToCarrydown(IJobManager jobManager,
+    DocumentDescription[] requeueCandidates,
+    IRepositoryConnector connector, IRepositoryConnection connection, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // A list of document descriptions from finishDocuments() above represents those documents that may need to be requeued, for the
     // reason that carrydown information for those documents has changed.  In order to requeue, we need to calculate document priorities, however.
-    double[] docPriorities = new double[requeueCandidates.length];
+    IPriorityCalculator[] docPriorities = new IPriorityCalculator[requeueCandidates.length];
     String[][] binNames = new String[requeueCandidates.length][];
     int q = 0;
     while (q < requeueCandidates.length)
@@ -853,27 +858,12 @@ public class ManifoldCF extends org.apac
       DocumentDescription dd = requeueCandidates[q];
       String[] bins = calculateBins(connector,dd.getDocumentIdentifier());
       binNames[q] = bins;
-      docPriorities[q] = queueTracker.calculatePriority(bins,connection);
-      if (Logging.scheduling.isDebugEnabled())
-        Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+" given priority "+new Double(docPriorities[q]).toString());
+      docPriorities[q] = new PriorityCalculator(rt,connection,bins);
       q++;
     }
 
     // Now, requeue the documents with the new priorities
-    boolean[] trackerNote = jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
-
-    // Free the unused priorities.
-    // Inform queuetracker about what we used and what we didn't
-    q = 0;
-    while (q < trackerNote.length)
-    {
-      if (trackerNote[q] == false)
-      {
-        String[] bins = binNames[q];
-        queueTracker.notePriorityNotUsed(bins,connection,docPriorities[q]);
-      }
-      q++;
-    }
+    jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
   }
 
   /** Stuff colons so we can't have conflicts. */
@@ -917,64 +907,54 @@ public class ManifoldCF extends org.apac
     return connector.getBinNames(documentIdentifier);
   }
 
-  protected final static String resetDocPrioritiesLock = "_RESETPRIORITIES_";
-  
   /** Reset all (active) document priorities.  This operation may occur due to various externally-triggered
   * events, such a job abort, pause, resume, wait, or unwait.
   */
-  public static void resetAllDocumentPriorities(IThreadContext threadContext, QueueTracker queueTracker, long currentTime)
+  public static void resetAllDocumentPriorities(IThreadContext threadContext, long currentTime, String processID)
     throws ManifoldCFException
   {
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     IJobManager jobManager = JobManagerFactory.make(threadContext);
     IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
-    
-    // Only one thread allowed at a time
-    lockManager.enterWriteLock(resetDocPrioritiesLock);
-    try
-    {
-      // Reset the queue tracker
-      queueTracker.beginReset();
-      // Perform the reprioritization, for all active documents in active jobs.  During this time,
-      // it is safe to have other threads assign new priorities to documents, but it is NOT safe
-      // for other threads to attempt to change the minimum priority level.  The queuetracker object
-      // will therefore block that from occurring, until the reset is complete.
-      try
-      {
-        // Reprioritize all documents in the jobqueue, 1000 at a time
+    ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
-        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
-        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+    String reproID = IDFactory.make(threadContext);
 
-        // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
-        // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
-        // activity.
-        // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
-        // priorities.
-        while (true)
-        {
-          long startTime = System.currentTimeMillis();
+    rt.startReprioritization(System.currentTimeMillis(),processID,reproID);
+    // Reprioritize all documents in the jobqueue, 1000 at a time
 
-          DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000);
-          if (docs.length == 0)
-            break;
-
-          // Calculate new priorities for all these documents
-          writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
-            queueTracker,currentTime);
+    Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+    Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
+    
+    // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing will be assigned
+    // new priorities.  Already processed documents won't.  This guarantees that our bins are appropriate for current thread
+    // activity.
+    // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
+    // priorities.
+    while (true)
+    {
+      long startTime = System.currentTimeMillis();
 
-          Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
-        }
-      }
-      finally
+      Long currentTimeValue = rt.checkReprioritizationInProgress();
+      if (currentTimeValue == null)
       {
-        queueTracker.endReset();
+        // Some other process or thread superceded us.
+        return;
       }
+      long updateTime = currentTimeValue.longValue();
+      
+      DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+      if (docs.length == 0)
+        break;
+
+      // Calculate new priorities for all these documents
+      writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+        rt,updateTime);
+
+      Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
     }
-    finally
-    {
-      lockManager.leaveWriteLock(resetDocPrioritiesLock);
-    }
+    
+    rt.doneReprioritization(reproID);
   }
   
   /** Write a set of document priorities, based on the current queue tracker.
@@ -982,14 +962,14 @@ public class ManifoldCF extends org.apac
   public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager mgr,
     IJobManager jobManager, DocumentDescription[] descs,
     Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap,
-    QueueTracker queueTracker, long currentTime)
+    ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     if (Logging.scheduling.isDebugEnabled())
       Logging.scheduling.debug("Reprioritizing "+Integer.toString(descs.length)+" documents");
 
 
-    double[] priorities = new double[descs.length];
+    IPriorityCalculator[] priorities = new IPriorityCalculator[descs.length];
 
     // Go through the documents and calculate the priorities
     int i = 0;
@@ -1029,9 +1009,7 @@ public class ManifoldCF extends org.apac
         RepositoryConnectorFactory.release(connector);
       }
 
-      priorities[i] = queueTracker.calculatePriority(binNames,connection);
-      if (Logging.scheduling.isDebugEnabled())
-        Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+"' given priority "+new Double(priorities[i]).toString());
+      priorities[i] = new PriorityCalculator(rt,connection,binNames);
 
       i++;
     }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Mon Dec  2 11:27:35 2013
@@ -40,7 +40,7 @@ public class SeedingActivity implements 
   protected final String connectionName;
   protected final IRepositoryConnectionManager connManager;
   protected final IJobManager jobManager;
-  protected final QueueTracker queueTracker;
+  protected final ReprioritizationTracker rt;
   protected final IRepositoryConnection connection;
   protected final IRepositoryConnector connector;
   protected final Long jobID;
@@ -57,15 +57,16 @@ public class SeedingActivity implements 
 
   /** Constructor.
   */
-  public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager, IJobManager jobManager,
-    QueueTracker queueTracker, IRepositoryConnection connection, IRepositoryConnector connector,
+  public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager,
+    IJobManager jobManager,
+    ReprioritizationTracker rt, IRepositoryConnection connection, IRepositoryConnector connector,
     Long jobID, String[] legalLinkTypes, boolean overrideSchedule, int hopcountMethod, String processID)
   {
     this.processID = processID;
     this.connectionName = connectionName;
     this.connManager = connManager;
     this.jobManager = jobManager;
-    this.queueTracker = queueTracker;
+    this.rt = rt;
     this.connection = connection;
     this.connector = connector;
     this.jobID = jobID;
@@ -215,39 +216,22 @@ public class SeedingActivity implements 
   {
     // First, prioritize the documents using the queue tracker
     long prioritizationTime = System.currentTimeMillis();
-    double[] docPriorities = new double[docIDHashes.length];
-    String[][] binNames = new String[docIDHashes.length][];
+    IPriorityCalculator[] docPriorities = new IPriorityCalculator[docIDHashes.length];
 
     int i = 0;
     while (i < docIDHashes.length)
     {
       // Calculate desired document priority based on current queuetracker status.
       String[] bins = connector.getBinNames(docIDs[i]);
-
-      binNames[i] = bins;
-      docPriorities[i] = queueTracker.calculatePriority(bins,connection);
-      if (Logging.scheduling.isDebugEnabled())
-        Logging.scheduling.debug("Giving document '"+docIDs[i]+"' priority "+new Double(docPriorities[i]).toString());
+      docPriorities[i] = new PriorityCalculator(rt,connection,bins);
 
       i++;
     }
 
-    boolean[] trackerNote = jobManager.addDocumentsInitial(processID,
+    jobManager.addDocumentsInitial(processID,
       jobID,legalLinkTypes,docIDHashes,docIDs,overrideSchedule,hopcountMethod,
       prioritizationTime,docPriorities,prereqEventNames);
 
-    // Inform queuetracker about what we used and what we didn't
-    int j = 0;
-    while (j < trackerNote.length)
-    {
-      if (trackerNote[j] == false)
-      {
-        String[] bins = binNames[j];
-        queueTracker.notePriorityNotUsed(bins,connection,docPriorities[j]);
-      }
-      j++;
-    }
-
   }
 
   /** Check whether current job is still active.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java Mon Dec  2 11:27:35 2013
@@ -38,8 +38,6 @@ public class SeedingThread extends Threa
   // Local data
   /** Seeding reset manager */
   protected final SeedingResetManager resetManager;
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
 
@@ -48,14 +46,13 @@ public class SeedingThread extends Threa
 
   /** Constructor.
   */
-  public SeedingThread(QueueTracker queueTracker, SeedingResetManager resetManager, String processID)
+  public SeedingThread(SeedingResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     setName("Seeding thread");
     setDaemon(true);
     this.resetManager = resetManager;
-    this.queueTracker = queueTracker;
     this.processID = processID;
   }
 
@@ -69,6 +66,7 @@ public class SeedingThread extends Threa
       IThreadContext threadContext = ThreadContextFactory.make();
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       IDBInterface database = DBInterfaceFactory.make(threadContext,
         ManifoldCF.getMasterDatabaseName(),
@@ -147,7 +145,8 @@ public class SeedingThread extends Threa
                   try
                   {
 
-                    SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker,
+                    SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
+                      jobManager,rt,
                       connection,connector,jobID,legalLinkTypes,false,hopcountMethod,processID);
 
                     if (Logging.threads.isDebugEnabled())

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Mon Dec  2 11:27:35 2013
@@ -38,8 +38,6 @@ public class SetPriorityThread extends T
   public static final String _rcsid = "@(#)$Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Local data
-  /** This is the queue tracker object. */
-  protected final QueueTracker queueTracker;
   /** This is the number of documents per cycle */
   protected final int cycleCount;
   /** The blocking documents object */
@@ -48,13 +46,11 @@ public class SetPriorityThread extends T
   protected final String processID;
 
   /** Constructor.
-  *@param qt is the queue tracker object.
   */
-  public SetPriorityThread(QueueTracker qt, int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
+  public SetPriorityThread(int workerThreadCount, BlockingDocuments blockingDocuments, String processID)
     throws ManifoldCFException
   {
     super();
-    this.queueTracker = qt;
     this.blockingDocuments = blockingDocuments;
     this.processID = processID;
     cycleCount = workerThreadCount * 10;
@@ -72,7 +68,8 @@ public class SetPriorityThread extends T
       IThreadContext threadContext = ThreadContextFactory.make();
       IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
-
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
+      
       Logging.threads.debug("Set priority thread coming up");
 
       // Job description map (local) - designed to improve performance.
@@ -129,7 +126,8 @@ public class SetPriorityThread extends T
             DocumentDescription desc = blockingDocuments.getBlockingDocument();
             if (desc != null)
             {
-              ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,queueTracker,currentTime);
+              ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,
+                new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,rt,currentTime);
               processedCount++;
               continue;
             }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java Mon Dec  2 11:27:35 2013
@@ -33,8 +33,6 @@ public class StartupThread extends Threa
   public static final String _rcsid = "@(#)$Id: StartupThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Local data
-  /** Queue tracker */
-  protected final QueueTracker queueTracker;
   /** Process ID */
   protected final String processID;
   /** Reset manager */
@@ -42,13 +40,12 @@ public class StartupThread extends Threa
   
   /** Constructor.
   */
-  public StartupThread(QueueTracker queueTracker, StartupResetManager resetManager, String processID)
+  public StartupThread(StartupResetManager resetManager, String processID)
     throws ManifoldCFException
   {
     super();
     setName("Startup thread");
     setDaemon(true);
-    this.queueTracker = queueTracker;
     this.resetManager = resetManager;
     this.processID = processID;
   }
@@ -63,6 +60,7 @@ public class StartupThread extends Threa
       IThreadContext threadContext = ThreadContextFactory.make();
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       IDBInterface database = DBInterfaceFactory.make(threadContext,
         ManifoldCF.getMasterDatabaseName(),
@@ -151,7 +149,8 @@ public class StartupThread extends Threa
 
                   try
                   {
-                    SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker,
+                    SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,
+                      jobManager,rt,
                       connection,connector,jobID,legalLinkTypes,true,hopcountMethod,processID);
 
                     if (Logging.threads.isDebugEnabled())

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Mon Dec  2 11:27:35 2013
@@ -87,6 +87,7 @@ public class StufferThread extends Threa
       IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt));
 
@@ -166,7 +167,7 @@ public class StufferThread extends Threa
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
-          queueTracker.assessMinimumDepth(depthStatistics.getBins());
+          rt.assessMinimumDepth(depthStatistics.getBins());
 
           // Set the last time to be the current time
           lastTime = currentTime;

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Dec  2 11:27:35 2013
@@ -74,8 +74,10 @@ public class WorkerThread extends Thread
       IThreadContext threadContext = ThreadContextFactory.make();
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
+      IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
       IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext);
+      ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
       Map<String,String> versionMap = new HashMap<String,String>();
@@ -527,7 +529,7 @@ public class WorkerThread extends Thread
 
                         // First, make the things we will need for all subsequent steps.
                         ProcessActivity activity = new ProcessActivity(processID,
-                          threadContext,queueTracker,jobManager,ingester,
+                          threadContext,rt,jobManager,ingester,
                           currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion,newParameterVersion);
                         try
                         {
@@ -569,7 +571,8 @@ public class WorkerThread extends Thread
                               // "Finish" the documents (removing unneeded carrydown info, etc.)
                               DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
 
-                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+                              ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+                                requeueCandidates,connector,connection,rt,currentTime);
 
                               if (Logging.threads.isDebugEnabled())
                                 Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
@@ -819,12 +822,12 @@ public class WorkerThread extends Thread
                   // Now, handle the delete list
                   processDeleteLists(outputName,connector,connection,jobManager,
                     deleteList,ingester,
-                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
                   processHopcountRemovalLists(outputName,connector,connection,jobManager,
                     hopcountremoveList,ingester,
-                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                 }
                 finally
@@ -975,17 +978,18 @@ public class WorkerThread extends Thread
   * documents from the index should they be already present.
   */
   protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> hopcountremoveList,
+    IRepositoryConnection connection, IJobManager jobManager,
+    List<QueuedDocument> hopcountremoveList,
     IIncrementalIngester ingester,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
-    int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Remove from index
     hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
     // Mark as 'hopcountremoved' in the job queue
     processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
-      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+      jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Clear specified documents out of the job queue and from the appliance.
@@ -996,17 +1000,18 @@ public class WorkerThread extends Thread
   *@param ingesterDeleteList is a list of document id's to delete.
   */
   protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> deleteList,
+    IRepositoryConnection connection, IJobManager jobManager,
+    List<QueuedDocument> deleteList,
     IIncrementalIngester ingester,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
-    int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Remove from index
     deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
     // Delete from the job queue
     processJobQueueDeletions(deleteList,connector,connection,
-      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+      jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Remove a specified set of documents from the index.
@@ -1086,7 +1091,7 @@ public class WorkerThread extends Thread
   */
   protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList,
     IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
-    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Now, do the document queue cleanup for deletions.
@@ -1103,7 +1108,8 @@ public class WorkerThread extends Thread
       DocumentDescription[] requeueCandidates = jobManager.markDocumentDeletedMultiple(jobID,legalLinkTypes,deleteDescriptions,hopcountMethod);
 
       // Requeue those documents that had carrydown data modifications
-      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+        requeueCandidates,connector,connection,rt,currentTime);
 
       // Mark all these as done
       for (int i = 0; i < jobmanagerDeleteList.size(); i++)
@@ -1118,7 +1124,7 @@ public class WorkerThread extends Thread
   */
   protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList,
     IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
-    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime)
     throws ManifoldCFException
   {
     // Now, do the document queue cleanup for deletions.
@@ -1135,7 +1141,8 @@ public class WorkerThread extends Thread
       DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod);
 
       // Requeue those documents that had carrydown data modifications
-      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
+        requeueCandidates,connector,connection,rt,currentTime);
 
       // Mark all these as done
       for (int i = 0; i < jobmanagerRemovalList.size(); i++)
@@ -1440,7 +1447,7 @@ public class WorkerThread extends Thread
     protected final IRepositoryConnectionManager connMgr;
     protected final String[] legalLinkTypes;
     protected final OutputActivity ingestLogger;
-    protected final QueueTracker queueTracker;
+    protected final ReprioritizationTracker rt;
     protected final HashMap abortSet;
     protected final String outputVersion;
     protected final String parameterVersion;
@@ -1461,13 +1468,16 @@ public class WorkerThread extends Thread
     *@param jobManager is the job manager
     *@param ingester is the ingester
     */
-    public ProcessActivity(String processID, IThreadContext threadContext, QueueTracker queueTracker, IJobManager jobManager, IIncrementalIngester ingester,
-      long currentTime, IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector, IRepositoryConnectionManager connMgr,
-      String[] legalLinkTypes, OutputActivity ingestLogger, HashMap abortSet, String outputVersion, String parameterVersion)
+    public ProcessActivity(String processID, IThreadContext threadContext,
+      ReprioritizationTracker rt, IJobManager jobManager,
+      IIncrementalIngester ingester, long currentTime,
+      IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector,
+      IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
+      HashMap abortSet, String outputVersion, String parameterVersion)
     {
       this.processID = processID;
       this.threadContext = threadContext;
-      this.queueTracker = queueTracker;
+      this.rt = rt;
       this.jobManager = jobManager;
       this.ingester = ingester;
       this.currentTime = currentTime;
@@ -1989,16 +1999,15 @@ public class WorkerThread extends Thread
 
         String[] docidHashes = new String[set.size()];
         String[] docids = new String[set.size()];
-        double[] priorities = new double[set.size()];
-        String[][] binNames = new String[set.size()][];
+        IPriorityCalculator[] priorities = new IPriorityCalculator[set.size()];
         String[][] dataNames = new String[docids.length][];
         Object[][][] dataValues = new Object[docids.length][][];
         String[][] eventNames = new String[docids.length][];
 
         long currentTime = System.currentTimeMillis();
 
-        int j = 0;
-        while (j < docidHashes.length)
+        rt.clearPreloadRequests();
+        for (int j = 0; j < docidHashes.length; j++)
         {
           DocumentReference dr = (DocumentReference)set.get(j);
           docidHashes[j] = dr.getLocalIdentifierHash();
@@ -2009,35 +2018,17 @@ public class WorkerThread extends Thread
 
           // Calculate desired document priority based on current queuetracker status.
           String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier());
-
-
-          binNames[j] = bins;
-          priorities[j] = queueTracker.calculatePriority(bins,connection);
-          if (Logging.scheduling.isDebugEnabled())
-            Logging.scheduling.debug("Assigning '"+docids[j]+"' priority "+new Double(priorities[j]).toString());
-
-          // No longer used; the functionality is folded atomically into calculatePriority above:
-          //queueTracker.notePrioritySet(currentTime,job.getID(),bins,connection);
-
-          j++;
+          PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+          priorities[j] = p;
+          p.makePreloadRequest();
         }
+        rt.preloadBinValues();
 
-        boolean[] trackerNote = jobManager.addDocuments(processID,
+        jobManager.addDocuments(processID,
           job.getID(),legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),job.getHopcountMode(),
           dataNames,dataValues,currentTime,priorities,eventNames);
-
-        // Inform queuetracker about what we used and what we didn't
-        j = 0;
-        while (j < trackerNote.length)
-        {
-          if (trackerNote[j] == false)
-          {
-            String[] bins = binNames[j];
-            queueTracker.notePriorityNotUsed(bins,connection,priorities[j]);
-          }
-          j++;
-        }
-
+        
+        rt.clearPreloadedValues();
       }
 
       discard();

Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java Mon Dec  2 11:27:35 2013
@@ -100,6 +100,7 @@ public class BaseITDerby extends Connect
   /** Construct a command url.
   */
   protected String makeAPIURL(String command)
+    throws Exception
   {
     return mcfInstance.makeAPIURL(command);
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java Mon Dec  2 11:27:35 2013
@@ -44,6 +44,12 @@ public class BaseITHSQLDB extends Connec
     mcfInstance = new ManifoldCFInstance(singleWar);
   }
   
+  public BaseITHSQLDB(boolean singleWar, boolean webapps)
+  {
+    super();
+    mcfInstance = new ManifoldCFInstance(singleWar, webapps);
+  }
+  
   // Basic job support
   
   protected void waitJobInactiveNative(IJobManager jobManager, Long jobID, long maxTime)
@@ -101,6 +107,7 @@ public class BaseITHSQLDB extends Connec
   /** Construct a command url.
   */
   protected String makeAPIURL(String command)
+    throws Exception
   {
     return mcfInstance.makeAPIURL(command);
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java Mon Dec  2 11:27:35 2013
@@ -101,6 +101,7 @@ public class BaseITMySQL extends Connect
   /** Construct a command url.
   */
   protected String makeAPIURL(String command)
+    throws Exception
   {
     return mcfInstance.makeAPIURL(command);
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java Mon Dec  2 11:27:35 2013
@@ -101,6 +101,7 @@ public class BaseITPostgresql extends Co
   /** Construct a command url.
   */
   protected String makeAPIURL(String command)
+    throws Exception
   {
     return mcfInstance.makeAPIURL(command);
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java?rev=1546965&r1=1546964&r2=1546965&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java Mon Dec  2 11:27:35 2013
@@ -56,6 +56,7 @@ import org.apache.http.entity.ContentTyp
 /** Tests that run the "agents daemon" should be derived from this */
 public class ManifoldCFInstance
 {
+  protected boolean webapps = true;
   protected boolean singleWar = false;
   protected int testPort = 8346;
   
@@ -68,16 +69,27 @@ public class ManifoldCFInstance
   
   public ManifoldCFInstance(boolean singleWar)
   {
-    this(8346,singleWar);
+    this(8346,singleWar,true);
+  }
+
+  public ManifoldCFInstance(boolean singleWar, boolean webapps)
+  {
+    this(8346,singleWar,webapps);
   }
   
   public ManifoldCFInstance(int testPort)
   {
-    this(testPort,false);
+    this(testPort,false,true);
   }
 
   public ManifoldCFInstance(int testPort, boolean singleWar)
   {
+    this(testPort,singleWar,true);
+  }
+
+  public ManifoldCFInstance(int testPort, boolean singleWar, boolean webapps)
+  {
+    this.webapps = webapps;
     this.testPort = testPort;
     this.singleWar = singleWar;
   }
@@ -277,11 +289,17 @@ public class ManifoldCFInstance
   /** Construct a command url.
   */
   public String makeAPIURL(String command)
+    throws Exception
   {
-    if (singleWar)
-      return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command;
+    if (webapps)
+    {
+      if (singleWar)
+        return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command;
+      else
+        return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command;
+    }
     else
-      return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command;
+      throw new Exception("No API servlet running");
   }
 
   public static String convertToString(HttpResponse httpResponse)
@@ -495,53 +513,64 @@ public class ManifoldCFInstance
   public void start()
     throws Exception
   {
-    // Start jetty
-    server = new Server( testPort );    
-    server.setStopAtShutdown( true );
-    // Initialize the servlets
     ContextHandlerCollection contexts = new ContextHandlerCollection();
-    server.setHandler(contexts);
+    if (webapps)
+    {
+      // Start jetty
+      server = new Server( testPort );    
+      server.setStopAtShutdown( true );
+      // Initialize the servlets
+      server.setHandler(contexts);
+    }
 
     if (singleWar)
     {
-      // Start the single combined war
-      String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war";
-      if (System.getProperty("combinedWarPath") != null)
-        combinedWarPath = System.getProperty("combinedWarPath");
-      
-      // Initialize the servlet
-      WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf");
-      // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
-      lcfCombined.setParentLoaderPriority(true);
-      contexts.addHandler(lcfCombined);
-      server.start();
+      if (webapps)
+      {
+        // Start the single combined war
+        String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war";
+        if (System.getProperty("combinedWarPath") != null)
+          combinedWarPath = System.getProperty("combinedWarPath");
+        
+        // Initialize the servlet
+        WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf");
+        // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+        lcfCombined.setParentLoaderPriority(true);
+        contexts.addHandler(lcfCombined);
+        server.start();
+      }
+      else
+        throw new Exception("Can't run singleWar without webapps");
     }
     else
     {
-      String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war";
-      String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war";
-      String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war";
-
-      if (System.getProperty("crawlerWarPath") != null)
-          crawlerWarPath = System.getProperty("crawlerWarPath");
-      if (System.getProperty("authorityserviceWarPath") != null)
-          authorityserviceWarPath = System.getProperty("authorityserviceWarPath");
-      if (System.getProperty("apiWarPath") != null)
-          apiWarPath = System.getProperty("apiWarPath");
-
-      // Initialize the servlets
-      WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui");
-      // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
-      lcfCrawlerUI.setParentLoaderPriority(true);
-      contexts.addHandler(lcfCrawlerUI);
-      WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service");
-      // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
-      lcfAuthorityService.setParentLoaderPriority(true);
-      contexts.addHandler(lcfAuthorityService);
-      WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service");
-      lcfApi.setParentLoaderPriority(true);
-      contexts.addHandler(lcfApi);
-      server.start();
+      if (webapps)
+      {
+        String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war";
+        String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war";
+        String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war";
+
+        if (System.getProperty("crawlerWarPath") != null)
+            crawlerWarPath = System.getProperty("crawlerWarPath");
+        if (System.getProperty("authorityserviceWarPath") != null)
+            authorityserviceWarPath = System.getProperty("authorityserviceWarPath");
+        if (System.getProperty("apiWarPath") != null)
+            apiWarPath = System.getProperty("apiWarPath");
+
+        // Initialize the servlets
+        WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui");
+        // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+        lcfCrawlerUI.setParentLoaderPriority(true);
+        contexts.addHandler(lcfCrawlerUI);
+        WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service");
+        // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want.
+        lcfAuthorityService.setParentLoaderPriority(true);
+        contexts.addHandler(lcfAuthorityService);
+        WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service");
+        lcfApi.setParentLoaderPriority(true);
+        contexts.addHandler(lcfApi);
+        server.start();
+      }
 
       // If all worked, then we can start the daemon.
       // Clear the agents shutdown signal.



Mime
View raw message