manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1600689 - /manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Date Thu, 05 Jun 2014 15:56:55 GMT
Author: kwright
Date: Thu Jun  5 15:56:55 2014
New Revision: 1600689

URL: http://svn.apache.org/r1600689
Log:
Propagate transformation stuff into VersionActivities

Modified:
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1600689&r1=1600688&r2=1600689&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Thu Jun  5 15:56:55 2014
@@ -145,6 +145,14 @@ public class WorkerThread extends Thread
             // Universal data, from the job
             String connectionName = job.getConnectionName();
             String outputName = job.getOutputConnectionName();
+            int pipelineCount = job.countPipelineStages();
+            String[] transformationNames = new String[pipelineCount];
+            OutputSpecification[] transformationSpecifications = new OutputSpecification[pipelineCount];
+            for (int k = 0; k < pipelineCount; k++)
+            {
+              transformationNames[k] = job.getPipelineStageConnectionName(k);
+              transformationSpecifications[k] = job.getPipelineStageSpecification(k);
+            }
             String newParameterVersion = packParameters(job.getForcedMetadata());
             DocumentSpecification spec = job.getSpecification();
             OutputSpecification outputSpec = job.getOutputSpecification();
@@ -309,9 +317,11 @@ public class WorkerThread extends Thread
 
                     // Get the output version string.
                     String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
-                      
-                    HashMap abortSet = new HashMap();
-                    VersionActivity versionActivity = new VersionActivity(processID,connectionName,connMgr,jobManager,job,ingester,abortSet,outputVersion);
+                    // Get the transformation version strings.
+                    String[] transformationVersions = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
+                    
+                    Set<String> abortSet = new HashSet<String>();
+                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions);
 
                     String aclAuthority = connection.getACLAuthority();
                     boolean isDefaultAuthority = (aclAuthority == null || aclAuthority.length()
== 0);
@@ -405,7 +415,7 @@ public class WorkerThread extends Thread
                           QueuedDocument qd = activeDocuments.get(i);
                           DocumentDescription dd = qd.getDocumentDescription();
                           // If this document was aborted, then treat it specially; we never
go on to fetch it, for one thing.
-                          if (abortSet.get(dd.getDocumentIdentifier()) != null)
+                          if (abortSet.contains(dd.getDocumentIdentifier()))
                           {
                             // Special treatment for aborted documents.
                             // We ignore the returned version string completely, since it's
presumed that processing was not completed for this doc.
@@ -677,7 +687,7 @@ public class WorkerThread extends Thread
                                   String documentID = recrawlDocs[i].getDocumentIdentifier();
 
                                   // If aborted due to sequencing issue, then requeue for
reprocessing immediately, ignoring everything else.
-                                  boolean wasAborted = abortSet.get(documentID) != null;
+                                  boolean wasAborted = abortSet.contains(documentID);
                                   if (wasAborted)
                                   {
                                     // Requeue for immediate reprocessing
@@ -742,7 +752,7 @@ public class WorkerThread extends Thread
                                 {
                                   QueuedDocument qd = finishList.get(i);
                                   DocumentDescription dd = qd.getDocumentDescription();
-                                  if (abortSet.get(dd.getDocumentIdentifier()) != null)
+                                  if (abortSet.contains(dd.getDocumentIdentifier()))
                                   {
                                     // The document was aborted, so put it into the abortedList
                                     abortedList.add(dd);
@@ -1238,59 +1248,74 @@ public class WorkerThread extends Thread
   */
   protected static class VersionActivity implements IVersionActivity
   {
+    protected final Long jobID;
     protected final String processID;
     protected final String connectionName;
+    protected final String outputConnectionName;
+    protected final String[] transformationConnectionNames;
     protected final IRepositoryConnectionManager connMgr;
     protected final IJobManager jobManager;
-    protected final IJobDescription job;
     protected final IIncrementalIngester ingester;
-    protected final HashMap abortSet;
+    protected final Set<String> abortSet;
     protected final String outputVersion;
+    protected final String[] transformationVersions;
 
     /** Constructor.
     */
-    public VersionActivity(String processID, String connectionName, IRepositoryConnectionManager
connMgr,
-      IJobManager jobManager, IJobDescription job, IIncrementalIngester ingester, HashMap
abortSet,
-      String outputVersion)
+    public VersionActivity(Long jobID, String processID,
+      String connectionName, String outputConnectionName,
+      String[] transformationConnectionNames,
+      IRepositoryConnectionManager connMgr,
+      IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
+      String outputVersion, String[] transformationVersions)
     {
+      this.jobID = jobID;
       this.processID = processID;
       this.connectionName = connectionName;
+      this.outputConnectionName = outputConnectionName;
+      this.transformationConnectionNames = transformationConnectionNames;
       this.connMgr = connMgr;
       this.jobManager = jobManager;
-      this.job = job;
       this.ingester = ingester;
       this.abortSet = abortSet;
       this.outputVersion = outputVersion;
+      this.transformationVersions = transformationVersions;
     }
 
     /** Check whether a mime type is indexable by the currently specified output connector.
     *@param mimeType is the mime type to check, not including any character set specification.
     *@return true if the mime type is indexable.
     */
+    @Override
     public boolean checkMimeTypeIndexable(String mimeType)
       throws ManifoldCFException, ServiceInterruption
     {
-      return ingester.checkMimeTypeIndexable(job.getOutputConnectionName(),outputVersion,mimeType);
+      // MHL
+      return ingester.checkMimeTypeIndexable(outputConnectionName,outputVersion,mimeType);
     }
 
     /** Check whether a document is indexable by the currently specified output connector.
     *@param localFile is the local copy of the file to check.
     *@return true if the document is indexable.
     */
+    @Override
     public boolean checkDocumentIndexable(File localFile)
       throws ManifoldCFException, ServiceInterruption
     {
-      return ingester.checkDocumentIndexable(job.getOutputConnectionName(),outputVersion,localFile);
+      // MHL
+      return ingester.checkDocumentIndexable(outputConnectionName,outputVersion,localFile);
     }
 
     /** Check whether a document of a specified length is indexable by the currently specified
output connector.
     *@param length is the length to check.
     *@return true if the document is indexable.
     */
+    @Override
     public boolean checkLengthIndexable(long length)
       throws ManifoldCFException, ServiceInterruption
     {
-      return ingester.checkLengthIndexable(job.getOutputConnectionName(),outputVersion,length);
+      // MHL
+      return ingester.checkLengthIndexable(outputConnectionName,outputVersion,length);
     }
 
     /** Pre-determine whether a document's URL is indexable by this connector.  This method
is used by participating repository connectors
@@ -1298,10 +1323,12 @@ public class WorkerThread extends Thread
     *@param url is the URL of the document.
     *@return true if the file is indexable.
     */
+    @Override
     public boolean checkURLIndexable(String url)
       throws ManifoldCFException, ServiceInterruption
     {
-      return ingester.checkURLIndexable(job.getOutputConnectionName(),outputVersion,url);
+      // MHL
+      return ingester.checkURLIndexable(outputConnectionName,outputVersion,url);
     }
 
     /** Record time-stamped information about the activity of the connector.
@@ -1320,6 +1347,7 @@ public class WorkerThread extends Thread
     *       described in the resultCode field.  This field is not meant to be queried on.
 May be null.
     *@param childIdentifiers is a set of child entity identifiers associated with this activity.
 May be null.
     */
+    @Override
     public void recordActivity(Long startTime, String activityType, Long dataSize,
       String entityIdentifier, String resultCode, String resultDescription, String[] childIdentifiers)
       throws ManifoldCFException
@@ -1333,10 +1361,11 @@ public class WorkerThread extends Thread
     *@param dataName is the name of the data items to retrieve.
     *@return an array containing the unique data values passed from ALL parents.  Note that
these are in no particular order, and there will not be any duplicates.
     */
+    @Override
     public String[] retrieveParentData(String localIdentifier, String dataName)
       throws ManifoldCFException
     {
-      return jobManager.retrieveParentData(job.getID(),ManifoldCF.hash(localIdentifier),dataName);
+      return jobManager.retrieveParentData(jobID,ManifoldCF.hash(localIdentifier),dataName);
     }
 
     /** Retrieve data passed from parents to a specified child document.
@@ -1344,10 +1373,11 @@ public class WorkerThread extends Thread
     *@param dataName is the name of the data items to retrieve.
     *@return an array containing the unique data values passed from ALL parents.  Note that
these are in no particular order, and there will not be any duplicates.
     */
+    @Override
     public CharacterInput[] retrieveParentDataAsFiles(String localIdentifier, String dataName)
       throws ManifoldCFException
     {
-      return jobManager.retrieveParentDataAsFiles(job.getID(),ManifoldCF.hash(localIdentifier),dataName);
+      return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName);
     }
 
     /** Check whether current job is still active.
@@ -1355,10 +1385,11 @@ public class WorkerThread extends Thread
     * itself being aborted.  If the connector should abort, this method will raise a properly-formed
ServiceInterruption, which if thrown to the
     * caller, will signal that the current versioning activity remains incomplete and must
be retried when the job is resumed.
     */
+    @Override
     public void checkJobStillActive()
       throws ManifoldCFException, ServiceInterruption
     {
-      if (jobManager.checkJobActive(job.getID()) == false)
+      if (jobManager.checkJobActive(jobID) == false)
         throw new ServiceInterruption("Job no longer active",System.currentTimeMillis(),true);
     }
 
@@ -1369,6 +1400,7 @@ public class WorkerThread extends Thread
     *@param eventName is the event name.
     *@return false if the event is already in the "pending" state.
     */
+    @Override
     public boolean beginEventSequence(String eventName)
       throws ManifoldCFException
     {
@@ -1382,6 +1414,7 @@ public class WorkerThread extends Thread
     * the sole right to complete it.  Otherwise, race conditions can develop which would
be difficult to diagnose.
     *@param eventName is the event name.
     */
+    @Override
     public void completeEventSequence(String eventName)
       throws ManifoldCFException
     {
@@ -1394,17 +1427,19 @@ public class WorkerThread extends Thread
     * presumed that the reason for the requeue is because of sequencing issues synchronized
around an underlying event.
     *@param localIdentifier is the document identifier to requeue
     */
+    @Override
     public void retryDocumentProcessing(String localIdentifier)
       throws ManifoldCFException
     {
       // Accumulate aborts
-      abortSet.put(localIdentifier,localIdentifier);
+      abortSet.add(localIdentifier);
     }
 
     /** Create a global string from a simple string.
     *@param simpleString is the simple string.
     *@return a global string.
     */
+    @Override
     public String createGlobalString(String simpleString)
     {
       return ManifoldCF.createGlobalString(simpleString);
@@ -1414,6 +1449,7 @@ public class WorkerThread extends Thread
     *@param simpleString is the simple string.
     *@return a connection-specific string.
     */
+    @Override
     public String createConnectionSpecificString(String simpleString)
     {
       return ManifoldCF.createConnectionSpecificString(connectionName,simpleString);
@@ -1423,9 +1459,10 @@ public class WorkerThread extends Thread
     *@param simpleString is the simple string.
     *@return a job-specific string.
     */
+    @Override
     public String createJobSpecificString(String simpleString)
     {
-      return ManifoldCF.createJobSpecificString(job.getID(),simpleString);
+      return ManifoldCF.createJobSpecificString(jobID,simpleString);
     }
 
   }
@@ -1447,7 +1484,7 @@ public class WorkerThread extends Thread
     protected final String[] legalLinkTypes;
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
-    protected final HashMap abortSet;
+    protected final Set<String> abortSet;
     protected final String outputVersion;
     protected final String parameterVersion;
     
@@ -1472,7 +1509,7 @@ public class WorkerThread extends Thread
       IIncrementalIngester ingester, long currentTime,
       IJobDescription job, IRepositoryConnection connection, IRepositoryConnector connector,
       IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
-      HashMap abortSet, String outputVersion, String parameterVersion)
+      Set<String> abortSet, String outputVersion, String parameterVersion)
     {
       this.processID = processID;
       this.threadContext = threadContext;
@@ -2085,7 +2122,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException
     {
       // Accumulate aborts
-      abortSet.put(localIdentifier,localIdentifier);
+      abortSet.add(localIdentifier);
     }
 
     /** Check whether a mime type is indexable by the currently specified output connector.



Mime
View raw message