manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1542743 - in /manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/QueueTracker.java system/ManifoldCF.java
Date Sun, 17 Nov 2013 14:58:26 GMT
Author: kwright
Date: Sun Nov 17 14:58:26 2013
New Revision: 1542743

URL: http://svn.apache.org/r1542743
Log:
Put in another write lock, and also refactor to make use of java 1.5 constructs.

Modified:
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
    manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java?rev=1542743&r1=1542742&r2=1542743&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/QueueTracker.java
Sun Nov 17 14:58:26 2013
@@ -58,18 +58,18 @@ public class QueueTracker
   protected final static double binReductionFactor = 1.0;
 
   /** These are the accumulated performance averages for all connections etc. */
-  protected PerformanceStatistics performanceStatistics = new PerformanceStatistics();
+  protected final PerformanceStatistics performanceStatistics = new PerformanceStatistics();
 
   /** These are the bin counts for a prioritization pass.
   * This hash table is keyed by bin, and contains DoubleBinCount objects as values */
-  protected HashMap binCounts = new HashMap();
+  protected final Map<String,DoubleBinCount> binCounts = new HashMap<String,DoubleBinCount>();
 
   /** These are the bin counts for tracking the documents that are on
   * the active queue, but are not being processed yet */
-  protected HashMap queuedBinCounts = new HashMap();
+  protected final Map<String,BinCount> queuedBinCounts = new HashMap<String,BinCount>();
 
   /** These are the bin counts for active threads */
-  protected HashMap activeBinCounts = new HashMap();
+  protected final Map<String,BinCount> activeBinCounts = new HashMap<String,BinCount>();
 
   /** The "minimum depth" - which is the smallest bin count of the last document queued.
 This helps guarantee that documents that are
   * newly discovered don't wind up with high priority, but instead wind up about the same
as the currently active document priority. */
@@ -79,10 +79,11 @@ public class QueueTracker
   protected boolean resetInProgress = false;
 
   /** This hash table is keyed by PriorityKey objects, and contains ArrayList objects containing
Doubles, in sorted order. */
-  protected HashMap availablePriorities = new HashMap();
+  protected final Map<PriorityKey,List<Double>> availablePriorities = new HashMap<PriorityKey,List<Double>>();
 
-  /** This hash table is keyed by a String (which is the bin name), and contains a HashMap
of PriorityKey objects containing that String as a bin */
-  protected HashMap binDependencies = new HashMap();
+  /** This hash table is keyed by a String (which is the bin name), and contains a Set of
PriorityKey objects containing that
+  * String as a bin */
+  protected final Map<String,Set<PriorityKey>> binDependencies = new HashMap<String,Set<PriorityKey>>();
 
 
   /** Constructor */
@@ -140,7 +141,7 @@ public class QueueTracker
       String binName = binNames[i++];
       synchronized (queuedBinCounts)
       {
-        BinCount value = (BinCount)queuedBinCounts.get(binName);
+        BinCount value = queuedBinCounts.get(binName);
         if (value == null)
         {
           value = new BinCount();
@@ -163,10 +164,10 @@ public class QueueTracker
     PriorityKey pk = new PriorityKey(binNames);
     synchronized (binCounts)
     {
-      ArrayList value = (ArrayList)availablePriorities.get(pk);
+      List<Double> value = availablePriorities.get(pk);
       if (value == null)
       {
-        value = new ArrayList();
+        value = new ArrayList<Double>();
         availablePriorities.put(pk,value);
       }
       // Use bisection lookup to file the current priority so that highest priority is at
the end (0.0), and lowest is at the beginning
@@ -195,13 +196,13 @@ public class QueueTracker
       while (i < binNames.length)
       {
         String binName = binNames[i++];
-        HashMap hm = (HashMap)binDependencies.get(binName);
+        Set<PriorityKey> hm = binDependencies.get(binName);
         if (hm == null)
         {
-          hm = new HashMap();
+          hm = new HashSet<PriorityKey>();
           binDependencies.put(binName,hm);
         }
-        hm.put(pk,pk);
+        hm.add(pk);
       }
     }
   }
@@ -251,7 +252,7 @@ public class QueueTracker
       // Increment queued bin count for this bin.
       synchronized (queuedBinCounts)
       {
-        BinCount value = (BinCount)queuedBinCounts.get(binName);
+        BinCount value = queuedBinCounts.get(binName);
         if (value != null)
         {
           if (value.decrement())
@@ -262,7 +263,7 @@ public class QueueTracker
       // Decrement active bin count for this bin.
       synchronized (activeBinCounts)
       {
-        BinCount value = (BinCount)activeBinCounts.get(binName);
+        BinCount value = activeBinCounts.get(binName);
         if (value == null)
         {
           value = new BinCount();
@@ -347,7 +348,7 @@ public class QueueTracker
       String binName = binNames[i++];
       synchronized (activeBinCounts)
       {
-        BinCount value = (BinCount)activeBinCounts.get(binName);
+        BinCount value = activeBinCounts.get(binName);
         if (value != null)
         {
           if (value.decrement())
@@ -380,7 +381,7 @@ public class QueueTracker
       int count = 0;
       synchronized (activeBinCounts)
       {
-        BinCount value = (BinCount)activeBinCounts.get(binName);
+        BinCount value = activeBinCounts.get(binName);
         if (value != null)
           count = value.getValue();
       }
@@ -473,7 +474,7 @@ public class QueueTracker
         binCountScaleFactors[i] = binCountScaleFactor;
 
         double thisCount = 0.0;
-        DoubleBinCount bc = (DoubleBinCount)binCounts.get(binName);
+        DoubleBinCount bc = binCounts.get(binName);
         if (bc != null)
         {
           thisCount = bc.getValue();
@@ -488,13 +489,11 @@ public class QueueTracker
             Logging.scheduling.debug("Resetting value of bin '"+binName+"' to "+new Double(weightedMinimumDepth).toString()+"(scale
factor is "+new Double(binCountScaleFactor)+")");
 
           // Clear available priorities that depend on this bin
-          HashMap hm = (HashMap)binDependencies.get(binName);
+          Set<PriorityKey> hm = binDependencies.get(binName);
           if (hm != null)
           {
-            Iterator iter = hm.keySet().iterator();
-            while (iter.hasNext())
+            for (PriorityKey pk : hm)
             {
-              PriorityKey pk = (PriorityKey)iter.next();
               availablePriorities.remove(pk);
             }
             binDependencies.remove(binName);
@@ -515,7 +514,7 @@ public class QueueTracker
       double returnValue;
 
       PriorityKey pk2 = new PriorityKey(binNames);
-      ArrayList queuedvalue = (ArrayList)availablePriorities.get(pk2);
+      List<Double> queuedvalue = availablePriorities.get(pk2);
       if (queuedvalue != null && queuedvalue.size() > 0)
       {
         // There's a saved value on the queue, which was calculated but not assigned earlier.
 We use these values preferentially.
@@ -526,7 +525,7 @@ public class QueueTracker
           while (i < binNames.length)
           {
             String binName = binNames[i++];
-            HashMap hm = (HashMap)binDependencies.get(binName);
+            Set<PriorityKey> hm = binDependencies.get(binName);
             if (hm != null)
             {
               hm.remove(pk2);
@@ -551,7 +550,7 @@ public class QueueTracker
           double binCountScaleFactor = binCountScaleFactors[i];
 
           double thisCount = 0.0;
-          DoubleBinCount bc = (DoubleBinCount)binCounts.get(binName);
+          DoubleBinCount bc = binCounts.get(binName);
           if (bc != null)
             thisCount = bc.getValue();
 
@@ -579,7 +578,7 @@ public class QueueTracker
         while (j < binNames.length)
         {
           String binName = binNames[j];
-          DoubleBinCount bc = (DoubleBinCount)binCounts.get(binName);
+          DoubleBinCount bc = binCounts.get(binName);
           if (bc == null)
           {
             bc = new DoubleBinCount();

Modified: manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1542743&r1=1542742&r2=1542743&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
(original)
+++ manifoldcf/branches/CONNECTORS-781/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
Sun Nov 17 14:58:26 2013
@@ -721,8 +721,8 @@ public class ManifoldCF extends org.apac
 
         Logging.threads.debug("Agents process reprioritizing documents...");
 
-        HashMap connectionMap = new HashMap();
-        HashMap jobDescriptionMap = new HashMap();
+        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
         // Reprioritize all documents in the jobqueue, 1000 at a time
         long currentTime = System.currentTimeMillis();
 
@@ -740,7 +740,8 @@ public class ManifoldCF extends org.apac
             break;
 
           // Calculate new priorities for all these documents
-          writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap,queueTracker,currentTime);
+          writeDocumentPriorities(threadContext,mgr,jobManager,docs,connectionMap,jobDescriptionMap,
+            queueTracker,currentTime);
 
           Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed
documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
         }
@@ -1480,56 +1481,72 @@ public class ManifoldCF extends org.apac
     return connector.getBinNames(documentIdentifier);
   }
 
+  protected final static String resetDocPrioritiesLock = "_RESETPRIORITIES_";
+  
   /** Reset all (active) document priorities.  This operation may occur due to various externally-triggered
   * events, such a job abort, pause, resume, wait, or unwait.
   */
   public static void resetAllDocumentPriorities(IThreadContext threadContext, QueueTracker
queueTracker, long currentTime)
     throws ManifoldCFException
   {
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
     IJobManager jobManager = JobManagerFactory.make(threadContext);
     IRepositoryConnectionManager connectionManager = RepositoryConnectionManagerFactory.make(threadContext);
     
-    // Reset the queue tracker
-    queueTracker.beginReset();
-    // Perform the reprioritization, for all active documents in active jobs.  During this
time,
-    // it is safe to have other threads assign new priorities to documents, but it is NOT
safe
-    // for other threads to attempt to change the minimum priority level.  The queuetracker
object
-    // will therefore block that from occurring, until the reset is complete.
+    // Only one thread allowed at a time
+    lockManager.enterWriteLock(resetDocPrioritiesLock);
     try
     {
-      // Reprioritize all documents in the jobqueue, 1000 at a time
+      // Reset the queue tracker
+      queueTracker.beginReset();
+      // Perform the reprioritization, for all active documents in active jobs.  During this
time,
+      // it is safe to have other threads assign new priorities to documents, but it is NOT
safe
+      // for other threads to attempt to change the minimum priority level.  The queuetracker
object
+      // will therefore block that from occurring, until the reset is complete.
+      try
+      {
+        // Reprioritize all documents in the jobqueue, 1000 at a time
 
-      HashMap connectionMap = new HashMap();
-      HashMap jobDescriptionMap = new HashMap();
+        Map<String,IRepositoryConnection> connectionMap = new HashMap<String,IRepositoryConnection>();
+        Map<Long,IJobDescription> jobDescriptionMap = new HashMap<Long,IJobDescription>();
 
-      // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing
will be assigned
-      // new priorities.  Already processed documents won't.  This guarantees that our bins
are appropriate for current thread
-      // activity.
-      // In order for this to be the correct functionality, ALL reseeding and requeuing operations
MUST reset the associated document
-      // priorities.
-      while (true)
-      {
-        long startTime = System.currentTimeMillis();
+        // Do the 'not yet processed' documents only.  Documents that are queued for reprocessing
will be assigned
+        // new priorities.  Already processed documents won't.  This guarantees that our
bins are appropriate for current thread
+        // activity.
+        // In order for this to be the correct functionality, ALL reseeding and requeuing
operations MUST reset the associated document
+        // priorities.
+        while (true)
+        {
+          long startTime = System.currentTimeMillis();
 
-        DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime,
10000);
-        if (docs.length == 0)
-          break;
+          DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(currentTime,
10000);
+          if (docs.length == 0)
+            break;
 
-        // Calculate new priorities for all these documents
-        writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,queueTracker,currentTime);
+          // Calculate new priorities for all these documents
+          writeDocumentPriorities(threadContext,connectionManager,jobManager,docs,connectionMap,jobDescriptionMap,
+            queueTracker,currentTime);
 
-        Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed
documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+          Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed
documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
+        }
+      }
+      finally
+      {
+        queueTracker.endReset();
       }
     }
     finally
     {
-      queueTracker.endReset();
+      lockManager.leaveWriteLock(resetDocPrioritiesLock);
     }
   }
   
   /** Write a set of document priorities, based on the current queue tracker.
   */
-  public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager
mgr, IJobManager jobManager, DocumentDescription[] descs, HashMap connectionMap, HashMap jobDescriptionMap,
QueueTracker queueTracker, long currentTime)
+  public static void writeDocumentPriorities(IThreadContext threadContext, IRepositoryConnectionManager
mgr,
+    IJobManager jobManager, DocumentDescription[] descs,
+    Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription>
jobDescriptionMap,
+    QueueTracker queueTracker, long currentTime)
     throws ManifoldCFException
   {
     if (Logging.scheduling.isDebugEnabled())
@@ -1543,14 +1560,14 @@ public class ManifoldCF extends org.apac
     while (i < descs.length)
     {
       DocumentDescription dd = descs[i];
-      IJobDescription job = (IJobDescription)jobDescriptionMap.get(dd.getJobID());
+      IJobDescription job = jobDescriptionMap.get(dd.getJobID());
       if (job == null)
       {
         job = jobManager.load(dd.getJobID(),true);
         jobDescriptionMap.put(dd.getJobID(),job);
       }
       String connectionName = job.getConnectionName();
-      IRepositoryConnection connection = (IRepositoryConnection)connectionMap.get(connectionName);
+      IRepositoryConnection connection = connectionMap.get(connectionName);
       if (connection == null)
       {
         connection = mgr.load(connectionName);



Mime
View raw message