Return-Path: X-Original-To: apmail-manifoldcf-commits-archive@www.apache.org Delivered-To: apmail-manifoldcf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 876CF10E34 for ; Mon, 2 Dec 2013 11:28:04 +0000 (UTC) Received: (qmail 59697 invoked by uid 500); 2 Dec 2013 11:28:03 -0000 Delivered-To: apmail-manifoldcf-commits-archive@manifoldcf.apache.org Received: (qmail 59636 invoked by uid 500); 2 Dec 2013 11:28:02 -0000 Mailing-List: contact commits-help@manifoldcf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@manifoldcf.apache.org Delivered-To: mailing list commits@manifoldcf.apache.org Received: (qmail 59629 invoked by uid 99); 2 Dec 2013 11:28:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Dec 2013 11:28:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Dec 2013 11:27:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4D9A02388AB8; Mon, 2 Dec 2013 11:27:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1546965 [2/2] - in /manifoldcf/trunk: ./ framework/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ framework/pull-agent/src/main/java/org/apache/m... Date: Mon, 02 Dec 2013 11:27:36 -0000 To: commits@manifoldcf.apache.org From: kwright@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131202112737.4D9A02388AB8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Mon Dec 2 11:27:35 2013 @@ -31,7 +31,6 @@ public class CrawlerAgent implements IAg // Thread objects. // These get filled in as threads are created. - protected InitializationThread initializationThread = null; protected JobStartThread jobStartThread = null; protected StufferThread stufferThread = null; protected FinisherThread finisherThread = null; @@ -140,13 +139,20 @@ public class CrawlerAgent implements IAg * Call this method to clean up dangling persistent state when a cluster is just starting * to come up. This method CANNOT be called when there are any active agents * processes at all. + *@param processID is the current process ID. */ @Override - public void cleanUpAgentData(IThreadContext threadContext) + public void cleanUpAllAgentData(IThreadContext threadContext, String currentProcessID) throws ManifoldCFException { IJobManager jobManager = JobManagerFactory.make(threadContext); jobManager.cleanupProcessData(); + // What kind of reprioritization should be done here? + // Answer: since we basically keep everything in the database now, the only kind of reprioritization we need + // to take care of are dangling ones that won't get done because the process that was doing them went + // away. BUT: somebody may have blown away lock info, in which case we won't know anything at all. + // So we do everything in that case. + ManifoldCF.resetAllDocumentPriorities(threadContext,System.currentTimeMillis(),currentProcessID); } /** Cleanup after agents process. @@ -154,14 +160,58 @@ public class CrawlerAgent implements IAg * This method CANNOT be called when the agent is active, but it can * be called at any time and by any process in order to guarantee that a terminated * agent does not block other agents from completing their tasks. - *@param processID is the process ID of the agent to clean up after. + *@param currentProcessID is the current process ID. + *@param cleanupProcessID is the process ID of the agent to clean up after. */ @Override - public void cleanUpAgentData(IThreadContext threadContext, String processID) + public void cleanUpAgentData(IThreadContext threadContext, String currentProcessID, String cleanupProcessID) throws ManifoldCFException { IJobManager jobManager = JobManagerFactory.make(threadContext); - jobManager.cleanupProcessData(processID); + jobManager.cleanupProcessData(cleanupProcessID); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); + String reproID = rt.isSpecifiedProcessReprioritizing(cleanupProcessID); + if (reproID != null) + { + // We have to take over the prioritization for the process, which apparently died + // in the middle. + IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext); + + // Reprioritize all documents in the jobqueue, 1000 at a time + + Map connectionMap = new HashMap(); + Map jobDescriptionMap = new HashMap(); + + // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned + // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread + // activity. + // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document + // priorities. + while (true) + { + long startTime = System.currentTimeMillis(); + + Long currentTimeValue = rt.checkReprioritizationInProgress(); + if (currentTimeValue == null) + { + // Some other process or thread superceded us. + return; + } + long updateTime = currentTimeValue.longValue(); + + DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000); + if (docs.length == 0) + break; + + // Calculate new priorities for all these documents + ManifoldCF.writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap, + rt,updateTime); + + Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms"); + } + + rt.doneReprioritization(reproID); + } } /** Start the agent. This method should spin up the agent threads, and @@ -277,14 +327,14 @@ public class CrawlerAgent implements IAg docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue,processID); jobStartThread = new JobStartThread(processID); - startupThread = new StartupThread(queueTracker,new StartupResetManager(processID),processID); + startupThread = new StartupThread(new StartupResetManager(processID),processID); startDeleteThread = new StartDeleteThread(new DeleteStartupResetManager(processID),processID); finisherThread = new FinisherThread(processID); notificationThread = new JobNotificationThread(new NotificationResetManager(processID),processID); jobDeleteThread = new JobDeleteThread(processID); stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor,processID); expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager,processID); - setPriorityThread = new SetPriorityThread(queueTracker,numWorkerThreads,blockingDocuments,processID); + setPriorityThread = new SetPriorityThread(numWorkerThreads,blockingDocuments,processID); historyCleanupThread = new HistoryCleanupThread(processID); workerThreads = new WorkerThread[numWorkerThreads]; @@ -299,7 +349,7 @@ public class CrawlerAgent implements IAg i = 0; while (i < numExpireThreads) { - expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,queueTracker,workerResetManager,processID); + expireThreads[i] = new ExpireThread(Integer.toString(i),expireQueue,workerResetManager,processID); i++; } @@ -317,143 +367,61 @@ public class CrawlerAgent implements IAg i = 0; while (i < numCleanupThreads) { - cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager,processID); + cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,docCleanupResetManager,processID); i++; } - jobResetThread = new JobResetThread(queueTracker,processID); - seedingThread = new SeedingThread(queueTracker,new SeedingResetManager(processID),processID); + jobResetThread = new JobResetThread(processID); + seedingThread = new SeedingThread(new SeedingResetManager(processID),processID); idleCleanupThread = new IdleCleanupThread(processID); - initializationThread = new InitializationThread(queueTracker); - // Start the initialization thread. This does the initialization work and starts all the other threads when that's done. It then exits. - initializationThread.start(); - Logging.root.info("Pull-agent started"); - } + // Start all the threads + jobStartThread.start(); + startupThread.start(); + startDeleteThread.start(); + finisherThread.start(); + notificationThread.start(); + jobDeleteThread.start(); + stufferThread.start(); + expireStufferThread.start(); + setPriorityThread.start(); + historyCleanupThread.start(); - protected class InitializationThread extends Thread - { - - protected final QueueTracker queueTracker; - - public InitializationThread(QueueTracker queueTracker) + i = 0; + while (i < numWorkerThreads) { - super(); - this.queueTracker = queueTracker; - setName("Initialization thread"); - setDaemon(true); + workerThreads[i].start(); + i++; } - public void run() + i = 0; + while (i < numExpireThreads) { - int i; - - try - { - IThreadContext threadContext = ThreadContextFactory.make(); - - // First, get a job manager - IJobManager jobManager = JobManagerFactory.make(threadContext); - IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext); - - /* No longer needed, because IAgents specifically initializes/cleans up. - - Logging.threads.debug("Agents process starting initialization..."); - - // Call the database to get it ready - jobManager.prepareForStart(); - */ - - Logging.threads.debug("Agents process reprioritizing documents..."); - - Map connectionMap = new HashMap(); - Map jobDescriptionMap = new HashMap(); - // Reprioritize all documents in the jobqueue, 1000 at a time - long currentTime = System.currentTimeMillis(); - - // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned - // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread - // activity. - // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document - // priorities. - while (true) - { - long startTime = System.currentTimeMillis(); - - DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000); - if (docs.length == 0) - break; - - // Calculate new priorities for all these documents - ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap, - queueTracker,currentTime); - - Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms"); - } - - Logging.threads.debug("Agents process initialization complete!"); - - // Start all the threads - jobStartThread.start(); - startupThread.start(); - startDeleteThread.start(); - finisherThread.start(); - notificationThread.start(); - jobDeleteThread.start(); - stufferThread.start(); - expireStufferThread.start(); - setPriorityThread.start(); - historyCleanupThread.start(); - - i = 0; - while (i < numWorkerThreads) - { - workerThreads[i].start(); - i++; - } + expireThreads[i].start(); + i++; + } - i = 0; - while (i < numExpireThreads) - { - expireThreads[i].start(); - i++; - } + cleanupStufferThread.start(); + i = 0; + while (i < numCleanupThreads) + { + cleanupThreads[i].start(); + i++; + } - cleanupStufferThread.start(); - i = 0; - while (i < numCleanupThreads) - { - cleanupThreads[i].start(); - i++; - } + deleteStufferThread.start(); + i = 0; + while (i < numDeleteThreads) + { + deleteThreads[i].start(); + i++; + } - deleteStufferThread.start(); - i = 0; - while (i < numDeleteThreads) - { - deleteThreads[i].start(); - i++; - } + jobResetThread.start(); + seedingThread.start(); + idleCleanupThread.start(); - jobResetThread.start(); - seedingThread.start(); - idleCleanupThread.start(); - // exit! - } - catch (Throwable e) - { - // Severe error on initialization - if (e instanceof ManifoldCFException) - { - // Deal with interrupted exception gracefully, because it means somebody is trying to shut us down before we got started. - if (((ManifoldCFException)e).getErrorCode() == ManifoldCFException.INTERRUPTED) - return; - } - System.err.println("agents process could not start - shutting down"); - Logging.threads.fatal("Startup initialization error tossed: "+e.getMessage(),e); - System.exit(-300); - } - } + Logging.root.info("Pull-agent started"); } /** Stop the system. @@ -462,7 +430,7 @@ public class CrawlerAgent implements IAg throws ManifoldCFException { Logging.root.info("Shutting down pull-agent..."); - while (initializationThread != null || jobDeleteThread != null || startupThread != null || startDeleteThread != null || + while (jobDeleteThread != null || startupThread != null || startDeleteThread != null || jobStartThread != null || stufferThread != null || finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null || expireThreads != null || deleteStufferThread != null || deleteThreads != null || @@ -472,10 +440,6 @@ public class CrawlerAgent implements IAg // Send an interrupt to all threads that are still there. // In theory, this only needs to be done once. In practice, I have seen cases where the thread loses track of the fact that it has been // interrupted (which may be a JVM bug - who knows?), but in any case there's no harm in doing it again. - if (initializationThread != null) - { - initializationThread.interrupt(); - } if (historyCleanupThread != null) { historyCleanupThread.interrupt(); @@ -587,11 +551,6 @@ public class CrawlerAgent implements IAg } // Check to see which died. - if (initializationThread != null) - { - if (!initializationThread.isAlive()) - initializationThread = null; - } if (historyCleanupThread != null) { if (!historyCleanupThread.isAlive()) Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Dec 2 11:27:35 2013 @@ -50,8 +50,6 @@ public class DocumentCleanupThread exten protected final DocumentCleanupQueue documentCleanupQueue; /** Delete thread pool reset manager */ protected final DocCleanupResetManager resetManager; - /** Queue tracker */ - protected final QueueTracker queueTracker; /** Process ID */ protected final String processID; @@ -59,13 +57,12 @@ public class DocumentCleanupThread exten *@param id is the worker thread id. */ public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue, - QueueTracker queueTracker, DocCleanupResetManager resetManager, String processID) + DocCleanupResetManager resetManager, String processID) throws ManifoldCFException { super(); this.id = id; this.documentCleanupQueue = documentCleanupQueue; - this.queueTracker = queueTracker; this.resetManager = resetManager; this.processID = processID; setName("Document cleanup thread '"+id+"'"); @@ -83,6 +80,7 @@ public class DocumentCleanupThread exten IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); // Loop while (true) @@ -239,7 +237,7 @@ public class DocumentCleanupThread exten DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod); // Use the common method for doing the requeuing ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates, - connector,connection,queueTracker,currentTime); + connector,connection,rt,currentTime); // Finally, completed expiration of the document. dqd.setProcessed(); } Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Mon Dec 2 11:27:35 2013 @@ -39,22 +39,19 @@ public class ExpireThread extends Thread protected final DocumentCleanupQueue documentQueue; /** Worker thread pool reset manager */ protected final WorkerResetManager resetManager; - /** Queue tracker */ - protected final QueueTracker queueTracker; /** Process ID */ protected final String processID; /** Constructor. *@param id is the expire thread id. */ - public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager, String processID) + public ExpireThread(String id, DocumentCleanupQueue documentQueue, WorkerResetManager resetManager, String processID) throws ManifoldCFException { super(); this.id = id; this.documentQueue = documentQueue; this.resetManager = resetManager; - this.queueTracker = queueTracker; this.processID = processID; setName("Expiration thread '"+id+"'"); setDaemon(true); @@ -73,6 +70,7 @@ public class ExpireThread extends Thread IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); // Loop while (true) @@ -242,7 +240,7 @@ public class ExpireThread extends Thread DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod); // Use the common method for doing the requeuing ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates, - connector,connection,queueTracker,currentTime); + connector,connection,rt,currentTime); // Finally, completed expiration of the document. dqd.setProcessed(); } Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobResetThread.java Mon Dec 2 11:27:35 2013 @@ -33,20 +33,17 @@ public class JobResetThread extends Thre public static final String _rcsid = "@(#)$Id: JobResetThread.java 991295 2010-08-31 19:12:14Z kwright $"; // Local data - /** Queue tracker */ - protected final QueueTracker queueTracker; /** Process ID */ protected final String processID; /** Constructor. */ - public JobResetThread(QueueTracker queueTracker, String processID) + public JobResetThread(String processID) throws ManifoldCFException { super(); setName("Job reset thread"); setDaemon(true); - this.queueTracker = queueTracker; this.processID = processID; } @@ -109,7 +106,7 @@ public class JobResetThread extends Thre { Logging.threads.debug("Job reset thread reprioritizing documents..."); - ManifoldCF.resetAllDocumentPriorities(threadContext,queueTracker,currentTime); + ManifoldCF.resetAllDocumentPriorities(threadContext,currentTime,processID); Logging.threads.debug("Job reset thread done reprioritizing documents."); Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Dec 2 11:27:35 2013 @@ -519,10 +519,12 @@ public class ManifoldCF extends org.apac IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext); IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext); IJobManager jobManager = JobManagerFactory.make(threadcontext); + IBinManager binManager = BinManagerFactory.make(threadcontext); org.apache.manifoldcf.authorities.system.ManifoldCF.installSystemTables(threadcontext); repConnMgr.install(); repCon.install(); jobManager.install(); + binManager.install(); } /** Uninstall all the crawler system tables. @@ -534,6 +536,8 @@ public class ManifoldCF extends org.apac IConnectorManager repConnMgr = ConnectorManagerFactory.make(threadcontext); IRepositoryConnectionManager repCon = RepositoryConnectionManagerFactory.make(threadcontext); IJobManager jobManager = JobManagerFactory.make(threadcontext); + IBinManager binManager = BinManagerFactory.make(threadcontext); + binManager.deinstall(); jobManager.deinstall(); repCon.deinstall(); repConnMgr.deinstall(); @@ -839,13 +843,14 @@ public class ManifoldCF extends org.apac /** Requeue documents due to carrydown. */ - public static void requeueDocumentsDueToCarrydown(IJobManager jobManager, DocumentDescription[] requeueCandidates, - IRepositoryConnector connector, IRepositoryConnection connection, QueueTracker queueTracker, long currentTime) + public static void requeueDocumentsDueToCarrydown(IJobManager jobManager, + DocumentDescription[] requeueCandidates, + IRepositoryConnector connector, IRepositoryConnection connection, ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { // A list of document descriptions from finishDocuments() above represents those documents that may need to be requeued, for the // reason that carrydown information for those documents has changed. In order to requeue, we need to calculate document priorities, however. - double[] docPriorities = new double[requeueCandidates.length]; + IPriorityCalculator[] docPriorities = new IPriorityCalculator[requeueCandidates.length]; String[][] binNames = new String[requeueCandidates.length][]; int q = 0; while (q < requeueCandidates.length) @@ -853,27 +858,12 @@ public class ManifoldCF extends org.apac DocumentDescription dd = requeueCandidates[q]; String[] bins = calculateBins(connector,dd.getDocumentIdentifier()); binNames[q] = bins; - docPriorities[q] = queueTracker.calculatePriority(bins,connection); - if (Logging.scheduling.isDebugEnabled()) - Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+" given priority "+new Double(docPriorities[q]).toString()); + docPriorities[q] = new PriorityCalculator(rt,connection,bins); q++; } // Now, requeue the documents with the new priorities - boolean[] trackerNote = jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities); - - // Free the unused priorities. - // Inform queuetracker about what we used and what we didn't - q = 0; - while (q < trackerNote.length) - { - if (trackerNote[q] == false) - { - String[] bins = binNames[q]; - queueTracker.notePriorityNotUsed(bins,connection,docPriorities[q]); - } - q++; - } + jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities); } /** Stuff colons so we can't have conflicts. */ @@ -917,64 +907,54 @@ public class ManifoldCF extends org.apac return connector.getBinNames(documentIdentifier); } - protected final static String resetDocPrioritiesLock = "_RESETPRIORITIES_"; - /** Reset all (active) document priorities. This operation may occur due to various externally-triggered * events, such a job abort, pause, resume, wait, or unwait. */ - public static void resetAllDocumentPriorities(IThreadContext threadContext, QueueTracker queueTracker, long currentTime) + public static void resetAllDocumentPriorities(IThreadContext threadContext, long currentTime, String processID) throws ManifoldCFException { ILockManager lockManager = LockManagerFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext); - - // Only one thread allowed at a time - lockManager.enterWriteLock(resetDocPrioritiesLock); - try - { - // Reset the queue tracker - queueTracker.beginReset(); - // Perform the reprioritization, for all active documents in active jobs. During this time, - // it is safe to have other threads assign new priorities to documents, but it is NOT safe - // for other threads to attempt to change the minimum priority level. The queuetracker object - // will therefore block that from occurring, until the reset is complete. - try - { - // Reprioritize all documents in the jobqueue, 1000 at a time + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); - Map connectionMap = new HashMap(); - Map jobDescriptionMap = new HashMap(); + String reproID = IDFactory.make(threadContext); - // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned - // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread - // activity. - // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document - // priorities. - while (true) - { - long startTime = System.currentTimeMillis(); + rt.startReprioritization(System.currentTimeMillis(),processID,reproID); + // Reprioritize all documents in the jobqueue, 1000 at a time - DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime, 10000); - if (docs.length == 0) - break; - - // Calculate new priorities for all these documents - writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap, - queueTracker,currentTime); + Map connectionMap = new HashMap(); + Map jobDescriptionMap = new HashMap(); + + // Do the 'not yet processed' documents only. Documents that are queued for reprocessing will be assigned + // new priorities. Already processed documents won't. This guarantees that our bins are appropriate for current thread + // activity. + // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document + // priorities. + while (true) + { + long startTime = System.currentTimeMillis(); - Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms"); - } - } - finally + Long currentTimeValue = rt.checkReprioritizationInProgress(); + if (currentTimeValue == null) { - queueTracker.endReset(); + // Some other process or thread superceded us. + return; } + long updateTime = currentTimeValue.longValue(); + + DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000); + if (docs.length == 0) + break; + + // Calculate new priorities for all these documents + writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap, + rt,updateTime); + + Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms"); } - finally - { - lockManager.leaveWriteLock(resetDocPrioritiesLock); - } + + rt.doneReprioritization(reproID); } /** Write a set of document priorities, based on the current queue tracker. @@ -982,14 +962,14 @@ public class ManifoldCF extends org.apac public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager mgr, IJobManager jobManager, DocumentDescription[] descs, Map connectionMap, Map jobDescriptionMap, - QueueTracker queueTracker, long currentTime) + ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { if (Logging.scheduling.isDebugEnabled()) Logging.scheduling.debug("Reprioritizing "+Integer.toString(descs.length)+" documents"); - double[] priorities = new double[descs.length]; + IPriorityCalculator[] priorities = new IPriorityCalculator[descs.length]; // Go through the documents and calculate the priorities int i = 0; @@ -1029,9 +1009,7 @@ public class ManifoldCF extends org.apac RepositoryConnectorFactory.release(connector); } - priorities[i] = queueTracker.calculatePriority(binNames,connection); - if (Logging.scheduling.isDebugEnabled()) - Logging.scheduling.debug("Document '"+dd.getDocumentIdentifier()+"' given priority "+new Double(priorities[i]).toString()); + priorities[i] = new PriorityCalculator(rt,connection,binNames); i++; } Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Mon Dec 2 11:27:35 2013 @@ -40,7 +40,7 @@ public class SeedingActivity implements protected final String connectionName; protected final IRepositoryConnectionManager connManager; protected final IJobManager jobManager; - protected final QueueTracker queueTracker; + protected final ReprioritizationTracker rt; protected final IRepositoryConnection connection; protected final IRepositoryConnector connector; protected final Long jobID; @@ -57,15 +57,16 @@ public class SeedingActivity implements /** Constructor. */ - public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager, IJobManager jobManager, - QueueTracker queueTracker, IRepositoryConnection connection, IRepositoryConnector connector, + public SeedingActivity(String connectionName, IRepositoryConnectionManager connManager, + IJobManager jobManager, + ReprioritizationTracker rt, IRepositoryConnection connection, IRepositoryConnector connector, Long jobID, String[] legalLinkTypes, boolean overrideSchedule, int hopcountMethod, String processID) { this.processID = processID; this.connectionName = connectionName; this.connManager = connManager; this.jobManager = jobManager; - this.queueTracker = queueTracker; + this.rt = rt; this.connection = connection; this.connector = connector; this.jobID = jobID; @@ -215,39 +216,22 @@ public class SeedingActivity implements { // First, prioritize the documents using the queue tracker long prioritizationTime = System.currentTimeMillis(); - double[] docPriorities = new double[docIDHashes.length]; - String[][] binNames = new String[docIDHashes.length][]; + IPriorityCalculator[] docPriorities = new IPriorityCalculator[docIDHashes.length]; int i = 0; while (i < docIDHashes.length) { // Calculate desired document priority based on current queuetracker status. String[] bins = connector.getBinNames(docIDs[i]); - - binNames[i] = bins; - docPriorities[i] = queueTracker.calculatePriority(bins,connection); - if (Logging.scheduling.isDebugEnabled()) - Logging.scheduling.debug("Giving document '"+docIDs[i]+"' priority "+new Double(docPriorities[i]).toString()); + docPriorities[i] = new PriorityCalculator(rt,connection,bins); i++; } - boolean[] trackerNote = jobManager.addDocumentsInitial(processID, + jobManager.addDocumentsInitial(processID, jobID,legalLinkTypes,docIDHashes,docIDs,overrideSchedule,hopcountMethod, prioritizationTime,docPriorities,prereqEventNames); - // Inform queuetracker about what we used and what we didn't - int j = 0; - while (j < trackerNote.length) - { - if (trackerNote[j] == false) - { - String[] bins = binNames[j]; - queueTracker.notePriorityNotUsed(bins,connection,docPriorities[j]); - } - j++; - } - } /** Check whether current job is still active. Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingThread.java Mon Dec 2 11:27:35 2013 @@ -38,8 +38,6 @@ public class SeedingThread extends Threa // Local data /** Seeding reset manager */ protected final SeedingResetManager resetManager; - /** Queue tracker */ - protected final QueueTracker queueTracker; /** Process ID */ protected final String processID; @@ -48,14 +46,13 @@ public class SeedingThread extends Threa /** Constructor. */ - public SeedingThread(QueueTracker queueTracker, SeedingResetManager resetManager, String processID) + public SeedingThread(SeedingResetManager resetManager, String processID) throws ManifoldCFException { super(); setName("Seeding thread"); setDaemon(true); this.resetManager = resetManager; - this.queueTracker = queueTracker; this.processID = processID; } @@ -69,6 +66,7 @@ public class SeedingThread extends Threa IThreadContext threadContext = ThreadContextFactory.make(); IJobManager jobManager = JobManagerFactory.make(threadContext); IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); IDBInterface database = DBInterfaceFactory.make(threadContext, ManifoldCF.getMasterDatabaseName(), @@ -147,7 +145,8 @@ public class SeedingThread extends Threa try { - SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker, + SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr, + jobManager,rt, connection,connector,jobID,legalLinkTypes,false,hopcountMethod,processID); if (Logging.threads.isDebugEnabled()) Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Mon Dec 2 11:27:35 2013 @@ -38,8 +38,6 @@ public class SetPriorityThread extends T public static final String _rcsid = "@(#)$Id: SetPriorityThread.java 988245 2010-08-23 18:39:35Z kwright $"; // Local data - /** This is the queue tracker object. */ - protected final QueueTracker queueTracker; /** This is the number of documents per cycle */ protected final int cycleCount; /** The blocking documents object */ @@ -48,13 +46,11 @@ public class SetPriorityThread extends T protected final String processID; /** Constructor. - *@param qt is the queue tracker object. */ - public SetPriorityThread(QueueTracker qt, int workerThreadCount, BlockingDocuments blockingDocuments, String processID) + public SetPriorityThread(int workerThreadCount, BlockingDocuments blockingDocuments, String processID) throws ManifoldCFException { super(); - this.queueTracker = qt; this.blockingDocuments = blockingDocuments; this.processID = processID; cycleCount = workerThreadCount * 10; @@ -72,7 +68,8 @@ public class SetPriorityThread extends T IThreadContext threadContext = ThreadContextFactory.make(); IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); - + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); + Logging.threads.debug("Set priority thread coming up"); // Job description map (local) - designed to improve performance. @@ -129,7 +126,8 @@ public class SetPriorityThread extends T DocumentDescription desc = blockingDocuments.getBlockingDocument(); if (desc != null) { - ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager,new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,queueTracker,currentTime); + ManifoldCF.writeDocumentPriorities(threadContext,mgr,jobManager, + new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,rt,currentTime); processedCount++; continue; } Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StartupThread.java Mon Dec 2 11:27:35 2013 @@ -33,8 +33,6 @@ public class StartupThread extends Threa public static final String _rcsid = "@(#)$Id: StartupThread.java 988245 2010-08-23 18:39:35Z kwright $"; // Local data - /** Queue tracker */ - protected final QueueTracker queueTracker; /** Process ID */ protected final String processID; /** Reset manager */ @@ -42,13 +40,12 @@ public class StartupThread extends Threa /** Constructor. */ - public StartupThread(QueueTracker queueTracker, StartupResetManager resetManager, String processID) + public StartupThread(StartupResetManager resetManager, String processID) throws ManifoldCFException { super(); setName("Startup thread"); setDaemon(true); - this.queueTracker = queueTracker; this.resetManager = resetManager; this.processID = processID; } @@ -63,6 +60,7 @@ public class StartupThread extends Threa IThreadContext threadContext = ThreadContextFactory.make(); IJobManager jobManager = JobManagerFactory.make(threadContext); IRepositoryConnectionManager connectionMgr = RepositoryConnectionManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); IDBInterface database = DBInterfaceFactory.make(threadContext, ManifoldCF.getMasterDatabaseName(), @@ -151,7 +149,8 @@ public class StartupThread extends Threa try { - SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr,jobManager,queueTracker, + SeedingActivity activity = new SeedingActivity(connection.getName(),connectionMgr, + jobManager,rt, connection,connector,jobID,legalLinkTypes,true,hopcountMethod,processID); if (Logging.threads.isDebugEnabled()) Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Mon Dec 2 11:27:35 2013 @@ -87,6 +87,7 @@ public class StufferThread extends Threa IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext); IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt)); @@ -166,7 +167,7 @@ public class StufferThread extends Threa if (Thread.currentThread().isInterrupted()) throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED); - queueTracker.assessMinimumDepth(depthStatistics.getBins()); + rt.assessMinimumDepth(depthStatistics.getBins()); // Set the last time to be the current time lastTime = currentTime; Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Dec 2 11:27:35 2013 @@ -74,8 +74,10 @@ public class WorkerThread extends Thread IThreadContext threadContext = ThreadContextFactory.make(); IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext); IJobManager jobManager = JobManagerFactory.make(threadContext); + IBinManager binManager = BinManagerFactory.make(threadContext); IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext); IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext); + ReprioritizationTracker rt = new ReprioritizationTracker(threadContext); List fetchList = new ArrayList(); Map versionMap = new HashMap(); @@ -527,7 +529,7 @@ public class WorkerThread extends Thread // First, make the things we will need for all subsequent steps. ProcessActivity activity = new ProcessActivity(processID, - threadContext,queueTracker,jobManager,ingester, + threadContext,rt,jobManager,ingester, currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion,newParameterVersion); try { @@ -569,7 +571,8 @@ public class WorkerThread extends Thread // "Finish" the documents (removing unneeded carrydown info, etc.) DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode()); - ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime); + ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, + requeueCandidates,connector,connection,rt,currentTime); if (Logging.threads.isDebugEnabled()) Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents"); @@ -819,12 +822,12 @@ public class WorkerThread extends Thread // Now, handle the delete list processDeleteLists(outputName,connector,connection,jobManager, deleteList,ingester, - job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime); + job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime); // Handle hopcount removal processHopcountRemovalLists(outputName,connector,connection,jobManager, hopcountremoveList,ingester, - job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime); + job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime); } finally @@ -975,17 +978,18 @@ public class WorkerThread extends Thread * documents from the index should they be already present. */ protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector, - IRepositoryConnection connection, IJobManager jobManager, List hopcountremoveList, + IRepositoryConnection connection, IJobManager jobManager, + List hopcountremoveList, IIncrementalIngester ingester, Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger, - int hopcountMethod, QueueTracker queueTracker, long currentTime) + int hopcountMethod, ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { // Remove from index hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger); // Mark as 'hopcountremoved' in the job queue processJobQueueHopcountRemovals(hopcountremoveList,connector,connection, - jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime); + jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime); } /** Clear specified documents out of the job queue and from the appliance. @@ -996,17 +1000,18 @@ public class WorkerThread extends Thread *@param ingesterDeleteList is a list of document id's to delete. */ protected static void processDeleteLists(String outputName, IRepositoryConnector connector, - IRepositoryConnection connection, IJobManager jobManager, List deleteList, + IRepositoryConnection connection, IJobManager jobManager, + List deleteList, IIncrementalIngester ingester, Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger, - int hopcountMethod, QueueTracker queueTracker, long currentTime) + int hopcountMethod, ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { // Remove from index deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger); // Delete from the job queue processJobQueueDeletions(deleteList,connector,connection, - jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime); + jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime); } /** Remove a specified set of documents from the index. @@ -1086,7 +1091,7 @@ public class WorkerThread extends Thread */ protected static void processJobQueueDeletions(List jobmanagerDeleteList, IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, - Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime) + Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { // Now, do the document queue cleanup for deletions. @@ -1103,7 +1108,8 @@ public class WorkerThread extends Thread DocumentDescription[] requeueCandidates = jobManager.markDocumentDeletedMultiple(jobID,legalLinkTypes,deleteDescriptions,hopcountMethod); // Requeue those documents that had carrydown data modifications - ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime); + ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, + requeueCandidates,connector,connection,rt,currentTime); // Mark all these as done for (int i = 0; i < jobmanagerDeleteList.size(); i++) @@ -1118,7 +1124,7 @@ public class WorkerThread extends Thread */ protected static void processJobQueueHopcountRemovals(List jobmanagerRemovalList, IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager, - Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime) + Long jobID, String[] legalLinkTypes, int hopcountMethod, ReprioritizationTracker rt, long currentTime) throws ManifoldCFException { // Now, do the document queue cleanup for deletions. @@ -1135,7 +1141,8 @@ public class WorkerThread extends Thread DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod); // Requeue those documents that had carrydown data modifications - ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime); + ManifoldCF.requeueDocumentsDueToCarrydown(jobManager, + requeueCandidates,connector,connection,rt,currentTime); // Mark all these as done for (int i = 0; i < jobmanagerRemovalList.size(); i++) @@ -1440,7 +1447,7 @@ public class WorkerThread extends Thread protected final IRepositoryConnectionManager connMgr; protected final String[] legalLinkTypes; protected final OutputActivity ingestLogger; - protected final QueueTracker queueTracker; + protected final ReprioritizationTracker rt; protected final HashMap abortSet; protected final String outputVersion; protected final String parameterVersion; @@ -1461,13 +1468,16 @@ public class WorkerThread extends Thread *@param jobManager is the job manager *@param ingester is the ingester */ - public ProcessActivity(String processID, IThreadContext threadContext, QueueTracker queueTracker, IJobManager jobManager, IIncrementalIngester ingester, - long currentTime, IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector, IRepositoryConnectionManager connMgr, - String[] legalLinkTypes, OutputActivity ingestLogger, HashMap abortSet, String outputVersion, String parameterVersion) + public ProcessActivity(String processID, IThreadContext threadContext, + ReprioritizationTracker rt, IJobManager jobManager, + IIncrementalIngester ingester, long currentTime, + IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector, + IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger, + HashMap abortSet, String outputVersion, String parameterVersion) { this.processID = processID; this.threadContext = threadContext; - this.queueTracker = queueTracker; + this.rt = rt; this.jobManager = jobManager; this.ingester = ingester; this.currentTime = currentTime; @@ -1989,16 +1999,15 @@ public class WorkerThread extends Thread String[] docidHashes = new String[set.size()]; String[] docids = new String[set.size()]; - double[] priorities = new double[set.size()]; - String[][] binNames = new String[set.size()][]; + IPriorityCalculator[] priorities = new IPriorityCalculator[set.size()]; String[][] dataNames = new String[docids.length][]; Object[][][] dataValues = new Object[docids.length][][]; String[][] eventNames = new String[docids.length][]; long currentTime = System.currentTimeMillis(); - int j = 0; - while (j < docidHashes.length) + rt.clearPreloadRequests(); + for (int j = 0; j < docidHashes.length; j++) { DocumentReference dr = (DocumentReference)set.get(j); docidHashes[j] = dr.getLocalIdentifierHash(); @@ -2009,35 +2018,17 @@ public class WorkerThread extends Thread // Calculate desired document priority based on current queuetracker status. String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier()); - - - binNames[j] = bins; - priorities[j] = queueTracker.calculatePriority(bins,connection); - if (Logging.scheduling.isDebugEnabled()) - Logging.scheduling.debug("Assigning '"+docids[j]+"' priority "+new Double(priorities[j]).toString()); - - // No longer used; the functionality is folded atomically into calculatePriority above: - //queueTracker.notePrioritySet(currentTime,job.getID(),bins,connection); - - j++; + PriorityCalculator p = new PriorityCalculator(rt,connection,bins); + priorities[j] = p; + p.makePreloadRequest(); } + rt.preloadBinValues(); - boolean[] trackerNote = jobManager.addDocuments(processID, + jobManager.addDocuments(processID, job.getID(),legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),job.getHopcountMode(), dataNames,dataValues,currentTime,priorities,eventNames); - - // Inform queuetracker about what we used and what we didn't - j = 0; - while (j < trackerNote.length) - { - if (trackerNote[j] == false) - { - String[] bins = binNames[j]; - queueTracker.notePriorityNotUsed(bins,connection,priorities[j]); - } - j++; - } - + + rt.clearPreloadedValues(); } discard(); Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITDerby.java Mon Dec 2 11:27:35 2013 @@ -100,6 +100,7 @@ public class BaseITDerby extends Connect /** Construct a command url. */ protected String makeAPIURL(String command) + throws Exception { return mcfInstance.makeAPIURL(command); } Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITHSQLDB.java Mon Dec 2 11:27:35 2013 @@ -44,6 +44,12 @@ public class BaseITHSQLDB extends Connec mcfInstance = new ManifoldCFInstance(singleWar); } + public BaseITHSQLDB(boolean singleWar, boolean webapps) + { + super(); + mcfInstance = new ManifoldCFInstance(singleWar, webapps); + } + // Basic job support protected void waitJobInactiveNative(IJobManager jobManager, Long jobID, long maxTime) @@ -101,6 +107,7 @@ public class BaseITHSQLDB extends Connec /** Construct a command url. */ protected String makeAPIURL(String command) + throws Exception { return mcfInstance.makeAPIURL(command); } Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITMySQL.java Mon Dec 2 11:27:35 2013 @@ -101,6 +101,7 @@ public class BaseITMySQL extends Connect /** Construct a command url. */ protected String makeAPIURL(String command) + throws Exception { return mcfInstance.makeAPIURL(command); } Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/BaseITPostgresql.java Mon Dec 2 11:27:35 2013 @@ -101,6 +101,7 @@ public class BaseITPostgresql extends Co /** Construct a command url. */ protected String makeAPIURL(String command) + throws Exception { return mcfInstance.makeAPIURL(command); } Modified: manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java?rev=1546965&r1=1546964&r2=1546965&view=diff ============================================================================== --- manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java (original) +++ manifoldcf/trunk/framework/pull-agent/src/test/java/org/apache/manifoldcf/crawler/tests/ManifoldCFInstance.java Mon Dec 2 11:27:35 2013 @@ -56,6 +56,7 @@ import org.apache.http.entity.ContentTyp /** Tests that run the "agents daemon" should be derived from this */ public class ManifoldCFInstance { + protected boolean webapps = true; protected boolean singleWar = false; protected int testPort = 8346; @@ -68,16 +69,27 @@ public class ManifoldCFInstance public ManifoldCFInstance(boolean singleWar) { - this(8346,singleWar); + this(8346,singleWar,true); + } + + public ManifoldCFInstance(boolean singleWar, boolean webapps) + { + this(8346,singleWar,webapps); } public ManifoldCFInstance(int testPort) { - this(testPort,false); + this(testPort,false,true); } public ManifoldCFInstance(int testPort, boolean singleWar) { + this(testPort,singleWar,true); + } + + public ManifoldCFInstance(int testPort, boolean singleWar, boolean webapps) + { + this.webapps = webapps; this.testPort = testPort; this.singleWar = singleWar; } @@ -277,11 +289,17 @@ public class ManifoldCFInstance /** Construct a command url. */ public String makeAPIURL(String command) + throws Exception { - if (singleWar) - return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command; + if (webapps) + { + if (singleWar) + return "http://localhost:"+Integer.toString(testPort)+"/mcf/api/json/"+command; + else + return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command; + } else - return "http://localhost:"+Integer.toString(testPort)+"/mcf-api-service/json/"+command; + throw new Exception("No API servlet running"); } public static String convertToString(HttpResponse httpResponse) @@ -495,53 +513,64 @@ public class ManifoldCFInstance public void start() throws Exception { - // Start jetty - server = new Server( testPort ); - server.setStopAtShutdown( true ); - // Initialize the servlets ContextHandlerCollection contexts = new ContextHandlerCollection(); - server.setHandler(contexts); + if (webapps) + { + // Start jetty + server = new Server( testPort ); + server.setStopAtShutdown( true ); + // Initialize the servlets + server.setHandler(contexts); + } if (singleWar) { - // Start the single combined war - String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war"; - if (System.getProperty("combinedWarPath") != null) - combinedWarPath = System.getProperty("combinedWarPath"); - - // Initialize the servlet - WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf"); - // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. - lcfCombined.setParentLoaderPriority(true); - contexts.addHandler(lcfCombined); - server.start(); + if (webapps) + { + // Start the single combined war + String combinedWarPath = "../../framework/build/war-proprietary/mcf-combined-service.war"; + if (System.getProperty("combinedWarPath") != null) + combinedWarPath = System.getProperty("combinedWarPath"); + + // Initialize the servlet + WebAppContext lcfCombined = new WebAppContext(combinedWarPath,"/mcf"); + // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. + lcfCombined.setParentLoaderPriority(true); + contexts.addHandler(lcfCombined); + server.start(); + } + else + throw new Exception("Can't run singleWar without webapps"); } else { - String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war"; - String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war"; - String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war"; - - if (System.getProperty("crawlerWarPath") != null) - crawlerWarPath = System.getProperty("crawlerWarPath"); - if (System.getProperty("authorityserviceWarPath") != null) - authorityserviceWarPath = System.getProperty("authorityserviceWarPath"); - if (System.getProperty("apiWarPath") != null) - apiWarPath = System.getProperty("apiWarPath"); - - // Initialize the servlets - WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui"); - // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. - lcfCrawlerUI.setParentLoaderPriority(true); - contexts.addHandler(lcfCrawlerUI); - WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service"); - // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. - lcfAuthorityService.setParentLoaderPriority(true); - contexts.addHandler(lcfAuthorityService); - WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service"); - lcfApi.setParentLoaderPriority(true); - contexts.addHandler(lcfApi); - server.start(); + if (webapps) + { + String crawlerWarPath = "../../framework/build/war-proprietary/mcf-crawler-ui.war"; + String authorityserviceWarPath = "../../framework/build/war-proprietary/mcf-authority-service.war"; + String apiWarPath = "../../framework/build/war-proprietary/mcf-api-service.war"; + + if (System.getProperty("crawlerWarPath") != null) + crawlerWarPath = System.getProperty("crawlerWarPath"); + if (System.getProperty("authorityserviceWarPath") != null) + authorityserviceWarPath = System.getProperty("authorityserviceWarPath"); + if (System.getProperty("apiWarPath") != null) + apiWarPath = System.getProperty("apiWarPath"); + + // Initialize the servlets + WebAppContext lcfCrawlerUI = new WebAppContext(crawlerWarPath,"/mcf-crawler-ui"); + // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. + lcfCrawlerUI.setParentLoaderPriority(true); + contexts.addHandler(lcfCrawlerUI); + WebAppContext lcfAuthorityService = new WebAppContext(authorityserviceWarPath,"/mcf-authority-service"); + // This will cause jetty to ignore all of the framework and jdbc jars in the war, which is what we want. + lcfAuthorityService.setParentLoaderPriority(true); + contexts.addHandler(lcfAuthorityService); + WebAppContext lcfApi = new WebAppContext(apiWarPath,"/mcf-api-service"); + lcfApi.setParentLoaderPriority(true); + contexts.addHandler(lcfApi); + server.start(); + } // If all worked, then we can start the daemon. // Clear the agents shutdown signal.