manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1624729 - /manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java
Date Sat, 13 Sep 2014 12:37:22 GMT
Author: kwright
Date: Sat Sep 13 12:37:22 2014
New Revision: 1624729

URL: http://svn.apache.org/r1624729
Log:
Rework JDBC connector.  Part of CONNECTORS-977.

Modified:
    manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java

Modified: manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java?rev=1624729&r1=1624728&r2=1624729&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java
(original)
+++ manifoldcf/trunk/connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java
Sat Sep 13 12:37:22 2014
@@ -36,36 +36,7 @@ import javax.sql.*;
 import java.io.*;
 import java.util.*;
 
-/** This interface describes an instance of a connection between a repository and ManifoldCF's
-* standard "pull" ingestion agent.
-*
-* Each instance of this interface is used in only one thread at a time.  Connection Pooling
-* on these kinds of objects is performed by the factory which instantiates repository connectors
-* from symbolic names and config parameters, and is pooled by these parameters.  That is,
a pooled connector
-* handle is used only if all the connection parameters for the handle match.
-*
-* Implementers of this interface should provide a default constructor which has this signature:
-*
-* xxx();
-*
-* Connectors are either configured or not.  If configured, they will persist in a pool, and
be
-* reused multiple times.  Certain methods of a connector may be called before the connector
is
-* configured.  This includes basically all methods that permit inspection of the connector's
-* capabilities.  The complete list is:
-*
-*
-* The purpose of the repository connector is to allow documents to be fetched from the repository.
-*
-* Each repository connector describes a set of documents that are known only to that connector.
-* It therefore establishes a space of document identifiers.  Each connector will only ever
be
-* asked to deal with identifiers that have in some way originated from the connector.
-*
-* Documents are fetched in three stages.  First, the getDocuments() method is called in the
connector
-* implementation.  This returns a set of document identifiers.  The document identifiers
are used to
-* obtain the current document version strings in the second stage, using the getDocumentVersions()
method.
-* The last stage is processDocuments(), which queues up any additional documents needed,
and also ingests.
-* This method will not be called if the document version seems to indicate that no document
change took
-* place.
+/** JDBC repository connector.
 */
 public class JDBCConnector extends org.apache.manifoldcf.crawler.connectors.BaseRepositoryConnector
 {
@@ -313,31 +284,27 @@ public class JDBCConnector extends org.a
     return new Long(seedTime).toString();
   }
 
-  /** Get document versions given an array of document identifiers.
-  * This method is called for EVERY document that is considered. It is
-  * therefore important to perform as little work as possible here.
-  *@param documentIdentifiers is the array of local document identifiers, as understood by
this connector.
-  *@param oldVersions is the corresponding array of version strings that have been saved
for the document identifiers.
-  *   A null value indicates that this is a first-time fetch, while an empty string indicates
that the previous document
-  *   had an empty version string.
-  *@param activities is the interface this method should use to perform whatever framework
actions are desired.
-  *@param spec is the current document specification for the current job.  If there is a
dependency on this
-  * specification, then the version string should include the pertinent data, so that reingestion
will occur
-  * when the specification changes.  This is primarily useful for metadata.
+  /** Process a set of documents.
+  * This is the method that should cause each document to be fetched, processed, and the
results either added
+  * to the queue of documents for the current job, and/or entered into the incremental ingestion
manager.
+  * The document specification allows this class to filter what is done based on the job.
+  * The connector will be connected before this method can be called.
+  *@param documentIdentifiers is the set of document identifiers to process.
+  *@param statuses are the currently-stored document versions for each document in the set
of document identifiers
+  * passed in above.
+  *@param activities is the interface this method should use to queue up new document references
+  * and ingest documents.
   *@param jobMode is an integer describing how the job is being run, whether continuous or
once-only.
   *@param usesDefaultAuthority will be true only if the authority in use for these documents
is the default one.
-  *@return the corresponding version strings, with null in the places where the document
no longer exists.
-  * Empty version strings indicate that there is no versioning ability for the corresponding
document, and the document
-  * will always be processed.
   */
   @Override
-  public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions,
IVersionActivity activities,
-    DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses,
Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
     throws ManifoldCFException, ServiceInterruption
   {
-    getSession();
     TableSpec ts = new TableSpec(spec);
-    String[] acls = getAcls(spec);
+    
+    String[] acls = ts.getAcls();
     // Sort these,
     java.util.Arrays.sort(acls);
 
@@ -355,125 +322,122 @@ public class JDBCConnector extends org.a
       {
         versionsReturned[i++] = "";
       }
-
-      return versionsReturned;
     }
-
-    // If there IS a versions query, do it.  First set up the variables, then do the substitution.
-    VariableMap vm = new VariableMap();
-    addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
-    addConstant(vm,JDBCConstants.versionReturnVariable,JDBCConstants.versionReturnColumnName);
-    if (!addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,null))
-      return new String[0];
-
-    // Do the substitution
-    ArrayList paramList = new ArrayList();
-    StringBuilder sb = new StringBuilder();
-    substituteQuery(ts.versionQuery,vm,sb,paramList);
-
-    // Now, build a result return, and a hash table so we can correlate the returned values
with the place to put them.
-    // We presume that if the row is missing, the document is gone.
-    Map map = new HashMap();
-    int j = 0;
-    while (j < documentIdentifiers.length)
+    else
     {
-      map.put(documentIdentifiers[j],new Integer(j));
-      versionsReturned[j] = "";
-      j++;
-    }
+      // If there IS a versions query, do it.  First set up the variables, then do the substitution.
+      VariableMap vm = new VariableMap();
+      addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
+      addConstant(vm,JDBCConstants.versionReturnVariable,JDBCConstants.versionReturnColumnName);
+      if (addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,null))
+      {
+        // Do the substitution
+        ArrayList paramList = new ArrayList();
+        StringBuilder sb = new StringBuilder();
+        substituteQuery(ts.versionQuery,vm,sb,paramList);
+
+        // Now, build a result return, and a hash table so we can correlate the returned
values with the place to put them.
+        // We presume that if the row is missing, the document is gone.
+        Map<String,Integer> map = new HashMap<String,Integer>();
+        int j = 0;
+        while (j < documentIdentifiers.length)
+        {
+          map.put(documentIdentifiers[j],new Integer(j));
+          versionsReturned[j] = "";
+          j++;
+        }
 
-    // Fire off the query!
-    IDynamicResultSet result;
-    String queryText = sb.toString();
-    long startTime = System.currentTimeMillis();
-    // Get a dynamic resultset.  Contract for dynamic resultset is that if
-    // one is returned, it MUST be closed, or a connection will leak.
-    try
-    {
-      result = connection.executeUncachedQuery(queryText,paramList,-1);
-    }
-    catch (ManifoldCFException e)
-    {
-      // If failure, record the failure.
-      activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
-        createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
-      throw e;
-    }
-    try
-    {
-      // If success, record that too.
-      activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
-        createQueryString(queryText,paramList), "OK", null, null);
-      // Now, go through resultset
-      while (true)
-      {
-        IDynamicResultRow row = result.getNextRow();
-        if (row == null)
-          break;
+        // Fire off the query!
+        getSession();
+        IDynamicResultSet result;
+        String queryText = sb.toString();
+        long startTime = System.currentTimeMillis();
+        // Get a dynamic resultset.  Contract for dynamic resultset is that if
+        // one is returned, it MUST be closed, or a connection will leak.
         try
         {
-          Object o = row.getValue(JDBCConstants.idReturnColumnName);
-          if (o == null)
-            throw new ManifoldCFException("Bad version query; doesn't return $(IDCOLUMN)
column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\".");
-          String idValue = JDBCConnection.readAsString(o);
-          o = row.getValue(JDBCConstants.versionReturnColumnName);
-          String versionValue;
-          // Null version is OK; make it a ""
-          if (o == null)
-            versionValue = "";
-          else
+          result = connection.executeUncachedQuery(queryText,paramList,-1);
+        }
+        catch (ManifoldCFException e)
+        {
+          // If failure, record the failure.
+          activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
+            createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
+          throw e;
+        }
+        try
+        {
+          // If success, record that too.
+          activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
+            createQueryString(queryText,paramList), "OK", null, null);
+          // Now, go through resultset
+          while (true)
           {
-            // A real version string!  Any acls must be added to the front, if they are present...
-            sb = new StringBuilder();
-            packList(sb,acls,'+');
-            if (acls.length > 0)
+            IDynamicResultRow row = result.getNextRow();
+            if (row == null)
+              break;
+            try
             {
-              sb.append('+');
-              pack(sb,defaultAuthorityDenyToken,'+');
-            }
-            else
-              sb.append('-');
+              Object o = row.getValue(JDBCConstants.idReturnColumnName);
+              if (o == null)
+                throw new ManifoldCFException("Bad version query; doesn't return $(IDCOLUMN)
column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\".");
+              String idValue = JDBCConnection.readAsString(o);
+              o = row.getValue(JDBCConstants.versionReturnColumnName);
+              String versionValue;
+              // Null version is OK; make it a ""
+              if (o == null)
+                versionValue = "";
+              else
+              {
+                // A real version string!  Any acls must be added to the front, if they are
present...
+                sb = new StringBuilder();
+                packList(sb,acls,'+');
+                if (acls.length > 0)
+                {
+                  sb.append('+');
+                  pack(sb,defaultAuthorityDenyToken,'+');
+                }
+                else
+                  sb.append('-');
 
-            sb.append(JDBCConnection.readAsString(o)).append("=").append(ts.dataQuery);
-            versionValue = sb.toString();
+                sb.append(JDBCConnection.readAsString(o)).append("=").append(ts.dataQuery);
+                versionValue = sb.toString();
+              }
+              // Versions that are "", when processed, will have their acls fetched at that
time...
+              versionsReturned[map.get(idValue).intValue()] = versionValue;
+            }
+            finally
+            {
+              row.close();
+            }
           }
-          // Versions that are "", when processed, will have their acls fetched at that time...
-          versionsReturned[((Integer)map.get(idValue)).intValue()] = versionValue;
         }
         finally
         {
-          row.close();
+          result.close();
         }
       }
     }
-    finally
-    {
-      result.close();
+    
+    // Delete the documents that had no version, and work only on ones that did
+    Set<String> fetchDocuments = new HashSet<String>();
+    Map<String,String> map = new HashMap<String,String>();
+    for (int i = 0; i < documentIdentifiers.length; i++)
+    {
+      String documentIdentifier = documentIdentifiers[i];
+      String versionValue = versionsReturned[i];
+      if (versionValue == null)
+      {
+        activities.deleteDocument(documentIdentifier);
+        continue;
+      }
+      if (versionValue.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionValue))
+      {
+        fetchDocuments.add(documentIdentifier);
+        map.put(documentIdentifier,versionValue);
+      }
     }
-
-    return versionsReturned;
-  }
-
-  /** Process a set of documents.
-  * This is the method that should cause each document to be fetched, processed, and the
results either added
-  * to the queue of documents for the current job, and/or entered into the incremental ingestion
manager.
-  * The document specification allows this class to filter what is done based on the job.
-  *@param documentIdentifiers is the set of document identifiers to process.
-  *@param versions is the corresponding document versions to process, as returned by getDocumentVersions()
above.
-  *       The implementation may choose to ignore this parameter and always process the current
version.
-  *@param activities is the interface this method should use to queue up new document references
-  * and ingest documents.
-  *@param spec is the document specification.
-  *@param scanOnly is an array corresponding to the document identifiers.  It is set to true
to indicate when the processing
-  * should only find other references, and should not actually call the ingestion methods.
-  */
-  @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity
activities, DocumentSpecification spec, boolean[] scanOnly)
-    throws ManifoldCFException, ServiceInterruption
-  {
-    getSession();
-    TableSpec ts = new TableSpec(spec);
-
+    
     // For all the documents not marked "scan only", form a query and pick up the contents.
     // If the contents is not found, then explicitly call the delete action method.
     VariableMap vm = new VariableMap();
@@ -481,7 +445,7 @@ public class JDBCConnector extends org.a
     addConstant(vm,JDBCConstants.urlReturnVariable,JDBCConstants.urlReturnColumnName);
     addConstant(vm,JDBCConstants.dataReturnVariable,JDBCConstants.dataReturnColumnName);
     addConstant(vm,JDBCConstants.contentTypeReturnVariable,JDBCConstants.contentTypeReturnColumnName);
-    if (!addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,scanOnly))
+    if (!addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,fetchDocuments))
       return;
 
     // Do the substitution
@@ -489,23 +453,8 @@ public class JDBCConnector extends org.a
     StringBuilder sb = new StringBuilder();
     substituteQuery(ts.dataQuery,vm,sb,paramList);
 
-    int i;
-
-    // Build a map of versions we are allowed to ingest
-    Map map = new HashMap();
-    i = 0;
-    while (i < documentIdentifiers.length)
-    {
-      if (!scanOnly[i])
-      {
-        // Version strings at this point should never be null; the CF interprets nulls as
-        // meaning that delete must occur.  Empty strings are possible though.
-        map.put(documentIdentifiers[i],versions[i]);
-      }
-      i++;
-    }
-
     // Execute the query
+    getSession();
     IDynamicResultSet result;
     String queryText = sb.toString();
     long startTime = System.currentTimeMillis();
@@ -539,7 +488,7 @@ public class JDBCConnector extends org.a
           if (o == null)
             throw new ManifoldCFException("Bad document query; doesn't return $(IDCOLUMN)
column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\".");
           String id = JDBCConnection.readAsString(o);
-          String version = (String)map.get(id);
+          String version = map.get(id);
           if (version != null)
           {
             // This document was marked as "not scan only", so we expect to find it.
@@ -592,7 +541,7 @@ public class JDBCConnector extends org.a
                       else
                         rd.setMimeType(contentType);
                       
-                      applyAccessTokens(rd,version,spec);
+                      applyAccessTokens(rd,ts);
                       applyMetadata(rd,row);
 
                       BinaryInput bi = (BinaryInput)contents;
@@ -634,7 +583,7 @@ public class JDBCConnector extends org.a
                       else
                         rd.setMimeType(contentType);
                       
-                      applyAccessTokens(rd,version,spec);
+                      applyAccessTokens(rd,ts);
                       applyMetadata(rd,row);
 
                       CharacterInput ci = (CharacterInput)contents;
@@ -680,7 +629,7 @@ public class JDBCConnector extends org.a
                         else
                           rd.setMimeType(contentType);
                         
-                        applyAccessTokens(rd,version,spec);
+                        applyAccessTokens(rd,ts);
                         applyMetadata(rd,row);
 
                         InputStream is = new ByteArrayInputStream(bytes);
@@ -705,16 +654,28 @@ public class JDBCConnector extends org.a
                     }
                   }
                   else
+                  {
                     Logging.connectors.warn("JDBC: Document '"+id+"' excluded because of
mime type - skipping");
+                    activities.noDocument(id,version);
+                  }
                 }
                 else
+                {
                   Logging.connectors.warn("JDBC: Document '"+id+"' seems to have null data
- skipping");
+                  activities.noDocument(id,version);
+                }
               }
               else
+              {
                 Logging.connectors.warn("JDBC: Document '"+id+"' has an illegal url: '"+url+"'
- skipping");
+                activities.noDocument(id,version);
+              }
             }
             else
+            {
               Logging.connectors.warn("JDBC: Document '"+id+"' has a null url - skipping");
+              activities.noDocument(id,version);
+            }
           }
         }
         finally
@@ -722,28 +683,28 @@ public class JDBCConnector extends org.a
           row.close();
         }
       }
-      // Now, go through the original id's, and see which ones are still in the map.  These
-      // did not appear in the result and are presumed to be gone from the database, and
thus must be deleted.
-      i = 0;
-      while (i < documentIdentifiers.length)
-      {
-        if (!scanOnly[i])
-        {
-          String documentIdentifier = documentIdentifiers[i];
-          if (map.get(documentIdentifier) != null)
-          {
-            // This means we did not see it (or data for it) in the result set.  Delete it!
-            activities.deleteDocument(documentIdentifier);
-          }
-        }
-        i++;
-      }
 
     }
     finally
     {
       result.close();
     }
+    
+    // Now, go through the original id's, and see which ones are still in the map.  These
+    // did not appear in the result and are presumed to be gone from the database, and thus
must be deleted.
+    for (String documentIdentifier : documentIdentifiers)
+    {
+      if (fetchDocuments.contains(documentIdentifier))
+      {
+        String documentVersion = map.get(documentIdentifier);
+        if (documentVersion != null)
+        {
+          // This means we did not see it (or data for it) in the result set.  Delete it!
+          activities.noDocument(documentIdentifier,documentVersion);
+        }
+      }
+    }
+
   }
   
   // UI support methods.
@@ -1563,48 +1524,16 @@ public class JDBCConnector extends org.a
   *@param version is the version string.
   *@param spec is the document specification.
   */
-  protected void applyAccessTokens(RepositoryDocument rd, String version, DocumentSpecification
spec)
+  protected void applyAccessTokens(RepositoryDocument rd, TableSpec ts)
     throws ManifoldCFException
   {
-    // Set up any acls
-    String[] accessAcls = null;
-    String[] denyAcls = null;
-
-    if (version.length() == 0)
-    {
-      // Version is empty string, therefore acl information must be gathered from spec
-      String[] specAcls = getAcls(spec);
-      accessAcls = specAcls;
-      if (specAcls.length != 0)
-        denyAcls = new String[]{defaultAuthorityDenyToken};
-      else
-        denyAcls = new String[0];
-    }
+    String[] accessAcls = ts.getAcls();
+    String[] denyAcls;
+    if (accessAcls.length == 0)
+      denyAcls = new String[0];
     else
-    {
-      // Unpack access tokens and the deny token too
-      ArrayList acls = new ArrayList();
-      StringBuilder denyAclBuffer = new StringBuilder();
-      int startPos = unpackList(acls,version,0,'+');
-      if (startPos < version.length() && version.charAt(startPos++) == '+')
-      {
-        startPos = unpack(denyAclBuffer,version,startPos,'+');
-      }
-      // Turn into acls and add into description
-      accessAcls = new String[acls.size()];
-      int j = 0;
-      while (j < accessAcls.length)
-      {
-        accessAcls[j] = (String)acls.get(j);
-        j++;
-      }
-      // Deny acl too
-      if (denyAclBuffer.length() > 0)
-      {
-        denyAcls = new String[]{denyAclBuffer.toString()};
-      }
-    }
-
+      denyAcls = new String[]{defaultAuthorityDenyToken};
+      
     rd.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,accessAcls,denyAcls);
 
   }
@@ -1649,24 +1578,21 @@ public class JDBCConnector extends org.a
 
   /** Build an idlist variable, and add it to the specified variable map.
   */
-  protected static boolean addIDList(VariableMap map, String varName, String[] documentIdentifiers,
boolean[] scanOnly)
+  protected static boolean addIDList(VariableMap map, String varName, String[] documentIdentifiers,
Set<String> fetchDocuments)
   {
     ArrayList params = new ArrayList();
     StringBuilder sb = new StringBuilder(" (");
-    int i = 0;
     int k = 0;
-    while (i < documentIdentifiers.length)
+    for (String documentIdentifier : documentIdentifiers)
     {
-      if (scanOnly == null || !scanOnly[i])
+      if (fetchDocuments == null || fetchDocuments.contains(documentIdentifier))
       {
         if (k > 0)
           sb.append(",");
-        String documentIdentifier = documentIdentifiers[i];
         sb.append("?");
         params.add(documentIdentifier);
         k++;
       }
-      i++;
     }
     sb.append(") ");
     map.addVariable(varName,sb.toString(),params);
@@ -1846,16 +1772,19 @@ public class JDBCConnector extends org.a
   */
   protected static class TableSpec
   {
-    public String idQuery;
-    public String versionQuery;
-    public String dataQuery;
+    public final String idQuery;
+    public final String versionQuery;
+    public final String dataQuery;
+    public final Set<String> aclMap = new HashSet<String>();
 
     public TableSpec(Specification ds)
     {
-      int i = 0;
-      while (i < ds.getChildCount())
+      String idQuery = null;
+      String versionQuery = null;
+      String dataQuery = null;
+      for (int i = 0; i < ds.getChildCount(); i++)
       {
-        SpecificationNode sn = ds.getChild(i++);
+        SpecificationNode sn = ds.getChild(i);
         if (sn.getType().equals(JDBCConstants.idQueryNode))
         {
           idQuery = sn.getValue();
@@ -1874,8 +1803,26 @@ public class JDBCConnector extends org.a
           if (dataQuery == null)
             dataQuery = "";
         }
+        else if (sn.getType().equals("access"))
+        {
+          String token = sn.getAttributeValue("token");
+          aclMap.add(token);
+        }
       }
+      this.idQuery = idQuery;
+      this.versionQuery = versionQuery;
+      this.dataQuery = dataQuery;
+    }
 
+    public String[] getAcls()
+    {
+      String[] rval = new String[aclMap.size()];
+      int i = 0;
+      for (String token : aclMap)
+      {
+        rval[i++] = token;
+      }
+      return rval;
     }
 
   }



Mime
View raw message