manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1602679 - in /manifoldcf/branches/CONNECTORS-962/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ pull-agent/src/main/java/org/apache/manifoldcf/c...
Date Sun, 15 Jun 2014 11:40:53 GMT
Author: kwright
Date: Sun Jun 15 11:40:52 2014
New Revision: 1602679

URL: http://svn.apache.org/r1602679
Log:
Debug the thing

Modified:
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602679&r1=1602678&r2=1602679&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
(original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
Sun Jun 15 11:40:52 2014
@@ -238,6 +238,7 @@ public class IncrementalIngester extends
   *@return true if the mimeType is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription,
String mimeType)
     throws ManifoldCFException, ServiceInterruption
   {
@@ -282,6 +283,7 @@ public class IncrementalIngester extends
   *@return true if the local file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription,
File localFile)
     throws ManifoldCFException, ServiceInterruption
   {
@@ -326,6 +328,7 @@ public class IncrementalIngester extends
   *@return true if the file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkLengthIndexable(String outputConnectionName, String outputDescription,
long length)
     throws ManifoldCFException, ServiceInterruption
   {
@@ -371,6 +374,7 @@ public class IncrementalIngester extends
   *@return true if the file is indexable.
   */
   @Override
+  @Deprecated
   public boolean checkURLIndexable(String outputConnectionName, String outputDescription,
String url)
     throws ManifoldCFException, ServiceInterruption
   {
@@ -715,31 +719,111 @@ public class IncrementalIngester extends
   {
     String docKey = makeKey(identifierClass,identifierHash);
 
-    // MHL - move to somewhere else
-    //if (Logging.ingest.isDebugEnabled())
-    //{
-    //  Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
-    //}
-
-    /* This should invoke the pipeline to do the recording...
-    MHL
-    // With a null document URI, this can't throw either ServiceInterruption or IOException
-    try
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+
+    if (Logging.ingest.isDebugEnabled())
     {
-      performIngestion(new ITransformationConnection[0],new String[0],
-        connectionManager.load(outputConnectionName),null,
-        docKey,documentVersion,null,null,null,
-        null,
-        null,
-        recordTime,
-        null,
-        activities);
+      Logging.ingest.debug("Recording document '"+docKey+"' for output connections '"+outputConnectionNames+"'");
     }
-    catch (IOException e)
+
+    for (int k = 0; k < outputConnectionNames.length; k++)
     {
-      throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e);
+      String outputConnectionName = outputConnectionNames[k];
+      IOutputConnection connection = outputConnections[k];
+
+      String oldURI = null;
+      String oldURIHash = null;
+      String oldOutputVersion = null;
+
+      // Repeat if needed
+      while (true)
+      {
+        long sleepAmt = 0L;
+        try
+        {
+          // See what uri was used before for this doc, if any
+          ArrayList list = new ArrayList();
+          String query = buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(docKeyField,docKey),
+            new UnitaryClause(outputConnNameField,outputConnectionName)});
+            
+          IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+"
FROM "+getTableName()+
+            " WHERE "+query,list,null,null);
+
+          if (set.getRowCount() > 0)
+          {
+            IResultRow row = set.getRow(0);
+            oldURI = (String)row.getValue(docURIField);
+            oldURIHash = (String)row.getValue(uriHashField);
+            oldOutputVersion = (String)row.getValue(lastOutputVersionField);
+          }
+          
+          break;
+        }
+        catch (ManifoldCFException e)
+        {
+          // Look for deadlock and retry if so
+          if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+          {
+            if (Logging.perf.isDebugEnabled())
+              Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
+            sleepAmt = getSleepAmt();
+            continue;
+          }
+          throw e;
+        }
+        finally
+        {
+          sleepFor(sleepAmt);
+        }
+      }
+
+      // If uri hashes collide, then we must be sure to eliminate only the *correct* records
from the table, or we will leave
+      // dangling documents around.  So, all uri searches and comparisons MUST compare the
actual uri as well.
+
+      // But, since we need to insure that any given URI is only worked on by one thread
at a time, use critical sections
+      // to block the rare case that multiple threads try to work on the same URI.
+      
+      String[] lockArray = computeLockArray(null,oldURI,outputConnectionName);
+      lockManager.enterLocks(null,null,lockArray);
+      try
+      {
+
+        ArrayList list = new ArrayList();
+        
+        if (oldURI != null)
+        {
+          IOutputConnector connector = outputConnectorPool.grab(connection);
+          if (connector == null)
+            // The connector is not installed; treat this as a service interruption.
+            throw new ServiceInterruption("Output connector not installed",0L);
+          try
+          {
+            connector.removeDocument(oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
+          }
+          finally
+          {
+            outputConnectorPool.release(connection,connector);
+          }
+          // Delete all records from the database that match the old URI, except for THIS
record.
+          list.clear();
+          String query = buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(uriHashField,"=",oldURIHash),
+            new UnitaryClause(outputConnNameField,outputConnectionName)});
+          list.add(docKey);
+          performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
+        }
+
+        // If we get here, it means we are noting that the document was examined, but that
no change was required.  This is signaled
+        // to noteDocumentIngest by having the null documentURI.
+        noteDocumentIngest(outputConnectionName,docKey,documentVersion,null,null,null,null,recordTime,null,null);
+      }
+      finally
+      {
+        lockManager.leaveLocks(null,null,lockArray);
+      }
     }
-    */
   }
 
   /** Ingest a document.
@@ -955,11 +1039,10 @@ public class IncrementalIngester extends
     
     String docKey = makeKey(identifierClass,identifierHash);
 
-    // MHL - move to the appropriate place
-    //if (Logging.ingest.isDebugEnabled())
-    //{
-    //  Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
-    //}
+    if (Logging.ingest.isDebugEnabled())
+    {
+      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+    }
 
     // Set indexing date
     document.setIndexingDate(new Date());
@@ -1769,7 +1852,7 @@ public class IncrementalIngester extends
       new MultiClause(outputConnNameField,outputConnectionNames)});
       
     // Get the primary records associated with this hash value
-    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+
+    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
       " FROM "+getTableName()+" WHERE "+query,newList,null,null);
 
     // Now, go through the original request once more, this time building the result
@@ -2850,7 +2933,8 @@ public class IncrementalIngester extends
         String oldTransformationVersion = fullSpec.getOutputTransformationVersionString(i);
         String oldAuthorityName = fullSpec.getAuthorityNameString(i);
 
-        String newTransformationVersion = null;
+        // Compute the transformation version string.  Must always be computed if we're going
to reindex, since we save it.
+        String newTransformationVersion = computePackedTransformationVersion(pipelineSpec,outputStage);
         
         boolean needToReindex = (oldDocumentVersion == null);
         if (needToReindex == false)
@@ -2862,8 +2946,6 @@ public class IncrementalIngester extends
         }
         if (needToReindex == false)
         {
-          // Compute the transformation version string
-          newTransformationVersion = computePackedTransformationVersion(pipelineSpec,outputStage);
           needToReindex = (!oldTransformationVersion.equals(newTransformationVersion));
         }
 

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1602679&r1=1602678&r2=1602679&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
(original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
Sun Jun 15 11:40:52 2014
@@ -101,6 +101,7 @@ public interface IIncrementalIngester
   *@param mimeType is the mime type to check.
   *@return true if the mimeType is indexable.
   */
+  @Deprecated
   public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription,
String mimeType)
     throws ManifoldCFException, ServiceInterruption;
 
@@ -122,6 +123,7 @@ public interface IIncrementalIngester
   *@param localFile is the local file to check.
   *@return true if the local file is indexable.
   */
+  @Deprecated
   public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription,
File localFile)
     throws ManifoldCFException, ServiceInterruption;
 
@@ -145,6 +147,7 @@ public interface IIncrementalIngester
   *@param length is the length of the document.
   *@return true if the file is indexable.
   */
+  @Deprecated
   public boolean checkLengthIndexable(String outputConnectionName, String outputDescription,
long length)
     throws ManifoldCFException, ServiceInterruption;
 
@@ -168,6 +171,7 @@ public interface IIncrementalIngester
   *@param url is the url of the document.
   *@return true if the file is indexable.
   */
+  @Deprecated
   public boolean checkURLIndexable(String outputConnectionName, String outputDescription,
String url)
     throws ManifoldCFException, ServiceInterruption;
 

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1602679&r1=1602678&r2=1602679&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
(original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
Sun Jun 15 11:40:52 2014
@@ -792,6 +792,7 @@ public class Jobs extends org.apache.man
       scheduleManager.deleteRows(id);
       hopFilterManager.deleteRows(id);
       forcedParamManager.deleteRows(id);
+      pipelineManager.deleteRows(id);
       ArrayList params = new ArrayList();
       String query = buildConjunctionClause(params,new ClauseDescription[]{
         new UnitaryClause(idField,id)});

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1602679&r1=1602678&r2=1602679&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
(original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Sun Jun 15 11:40:52 2014
@@ -1749,7 +1749,7 @@ public class WorkerThread extends Thread
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       ingester.documentRecord(
         pipelineSpecification.getBasicPipelineSpecification(),
-        documentIdentifier,documentIdentifierHash,
+        connectionName,documentIdentifierHash,
         version,currentTime,ingestLogger);
     }
 



Mime
View raw message