manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1626228 [4/10] - in /manifoldcf/branches/dev_1x: ./ connectors/alfresco/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/alfresco/ connectors/cmis/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/cmis/ conne...
Date Fri, 19 Sep 2014 14:22:28 GMT
Modified: manifoldcf/branches/dev_1x/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java?rev=1626228&r1=1626227&r2=1626228&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java Fri Sep 19 14:22:27 2014
@@ -248,8 +248,8 @@ public class GenericConnector extends Ba
   }
 
   @Override
-  public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
-    DocumentSpecification spec, int jobType, boolean usesDefaultAuthority)
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
     throws ManifoldCFException, ServiceInterruption {
 
     // Forced acls
@@ -270,208 +270,180 @@ public class GenericConnector extends Ba
     HttpClient client = getClient();
     StringBuilder url = new StringBuilder(genericEntryPoint);
 
-      url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEMS);
-      for (int i = 0; i < documentIdentifiers.length; i++) {
-        url.append("&id[]=").append(URLEncoder.encode(documentIdentifiers[i]));
-      }
-      for (int i = 0; i < spec.getChildCount(); i++) {
-        SpecificationNode sn = spec.getChild(i);
-        if (sn.getType().equals("param")) {
-          String paramName = sn.getAttributeValue("name");
-          String paramValue = sn.getValue();
-          url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
-        }
-      }
-    try {
-      DocumentVersionThread versioningThread = new DocumentVersionThread(client, url.toString(), documentIdentifiers, genericAuthMode, rights, documentCache);
-      versioningThread.start();
-      versioningThread.join();
-      if (versioningThread.getException() != null) {
-        Throwable thr = versioningThread.getException();
-        if (thr instanceof ManifoldCFException) {
-          if (((ManifoldCFException) thr).getErrorCode() == ManifoldCFException.INTERRUPTED) {
-            throw new InterruptedException(thr.getMessage());
-          }
-          throw (ManifoldCFException) thr;
-        } else if (thr instanceof ServiceInterruption) {
-          throw (ServiceInterruption) thr;
-        } else if (thr instanceof IOException) {
-          handleIOException((IOException) thr);
-        } else if (thr instanceof RuntimeException) {
-          throw (RuntimeException) thr;
-        }
-        throw new ManifoldCFException("getDocumentVersions error: " + thr.getMessage(), thr);
-      }
-      return versioningThread.getVersions();
-    } catch (InterruptedException ex) {
-      throw new ManifoldCFException(ex.getMessage(), ex, ManifoldCFException.INTERRUPTED);
+    url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEMS);
+    for (int i = 0; i < documentIdentifiers.length; i++) {
+      url.append("&id[]=").append(URLEncoder.encode(documentIdentifiers[i]));
     }
-  }
-
-  @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities,
-    DocumentSpecification spec, boolean[] scanOnly, int jobType)
-    throws ManifoldCFException, ServiceInterruption {
-
-    // Forced acls
-    String[] acls = getAcls(spec);
-
-    String genericAuthMode = "provided";
     for (int i = 0; i < spec.getChildCount(); i++) {
       SpecificationNode sn = spec.getChild(i);
-      if (sn.getType().equals("genericAuthMode")) {
-        genericAuthMode = sn.getValue();
-        break;
+      if (sn.getType().equals("param")) {
+        String paramName = sn.getAttributeValue("name");
+        String paramValue = sn.getValue();
+        url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
       }
     }
-
-    HttpClient client = getClient();
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      activities.checkJobStillActive();
-
-      Item item = documentCache.get(documentIdentifiers[i]);
-      if (item == null) {
-        throw new ManifoldCFException("processDocuments error - no cache entry for: " + documentIdentifiers[i]);
+    
+    String[] versions = null;
+    try {
+      DocumentVersionThread versioningThread = new DocumentVersionThread(client, url.toString(), documentIdentifiers, genericAuthMode, rights, documentCache);
+      versioningThread.start();
+      try {
+        versions = versioningThread.finishUp();
+      } catch (IOException ex) {
+        handleIOException((IOException)ex);
+      } catch (InterruptedException ex) {
+        throw new ManifoldCFException(ex.getMessage(), ex, ManifoldCFException.INTERRUPTED);
       }
-
-      if (item.related != null) {
-        for (String rel : item.related) {
-          activities.addDocumentReference(rel, documentIdentifiers[i], RELATIONSHIP_RELATED);
+      
+      // Figure out which ones we need to process, and which we should delete
+      for (int i = 0; i < documentIdentifiers.length; i++) {
+        String documentIdentifier = documentIdentifiers[i];
+        String versionString = versions[i];
+        if (versionString == null) {
+          activities.deleteDocument(documentIdentifier);
+          continue;
+        }
+        Item item = documentCache.get(documentIdentifier);
+        if (item == null) {
+          throw new ManifoldCFException("processDocuments error - no cache entry for: " + documentIdentifier);
         }
-      }
-      if (scanOnly[i]) {
-        continue;
-      }
 
-      RepositoryDocument doc = new RepositoryDocument();
-      if (item.mimeType != null) {
-        doc.setMimeType(item.mimeType);
-      }
-      if (item.created != null) {
-        doc.setCreatedDate(item.created);
-      }
-      if (item.updated != null) {
-        doc.setModifiedDate(item.updated);
-      }
-      if (item.fileName != null) {
-        doc.setFileName(item.fileName);
-      }
-      if (item.metadata != null) {
-        HashMap<String, List<String>> meta = new HashMap<String, List<String>>();
-        for (Meta m : item.metadata) {
-          if (meta.containsKey(m.name)) {
-            meta.get(m.name).add(m.value);
-          } else {
-            List<String> list = new ArrayList<String>(1);
-            list.add(m.value);
-            meta.put(m.name, list);
+        if (item.related != null) {
+          for (String rel : item.related) {
+            activities.addDocumentReference(rel, documentIdentifier, RELATIONSHIP_RELATED);
           }
         }
-        for (String name : meta.keySet()) {
-          List<String> values = meta.get(name);
-          if (values.size() > 1) {
-            String[] svals = new String[values.size()];
-            for (int j = 0; j < values.size(); j++) {
-              svals[j] = values.get(j);
-            }
-            doc.addField(name, svals);
-          } else {
-            doc.addField(name, values.get(0));
+        if (versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)) {
+          
+          // Process the document
+          RepositoryDocument doc = new RepositoryDocument();
+          if (item.mimeType != null) {
+            doc.setMimeType(item.mimeType);
           }
-        }
-      }
-      if ("provided".equals(genericAuthMode)) {
-        if (item.auth != null) {
-          String[] acl = new String[item.auth.size()];
-          for (int j = 0; j < item.auth.size(); j++) {
-            acl[j] = item.auth.get(j);
+          if (item.created != null) {
+            doc.setCreatedDate(item.created);
           }
-          doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acl,new String[]{defaultAuthorityDenyToken});
-        }
-      } else {
-        if (acls.length > 0) {
-          doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls,new String[]{defaultAuthorityDenyToken});
-        }
-      }
-      if (item.content != null) {
-        try {
-          byte[] content = item.content.getBytes(StandardCharsets.UTF_8);
-          ByteArrayInputStream is = new ByteArrayInputStream(content);
-          try {
-            doc.setBinary(is, content.length);
-            activities.ingestDocumentWithException(documentIdentifiers[i], versions[i], item.url, doc);
-            is.close();
-          } finally {
-            is.close();
+          if (item.updated != null) {
+            doc.setModifiedDate(item.updated);
           }
-        } catch (IOException ex) {
-          handleIOException(ex);
-        }
-      } else {
-        StringBuilder url = new StringBuilder(genericEntryPoint);
-
-          url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEM);
-          url.append("&id=").append(URLEncoder.encode(documentIdentifiers[i]));
-          for (int j = 0; j < spec.getChildCount(); j++) {
-            SpecificationNode sn = spec.getChild(j);
-            if (sn.getType().equals("param")) {
-              String paramName = sn.getAttributeValue("name");
-              String paramValue = sn.getValue();
-              url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
+          if (item.fileName != null) {
+            doc.setFileName(item.fileName);
+          }
+          if (item.metadata != null) {
+            HashMap<String, List<String>> meta = new HashMap<String, List<String>>();
+            for (Meta m : item.metadata) {
+              if (meta.containsKey(m.name)) {
+                meta.get(m.name).add(m.value);
+              } else {
+                List<String> list = new ArrayList<String>(1);
+                list.add(m.value);
+                meta.put(m.name, list);
+              }
+            }
+            for (String name : meta.keySet()) {
+              List<String> values = meta.get(name);
+              if (values.size() > 1) {
+                String[] svals = new String[values.size()];
+                for (int j = 0; j < values.size(); j++) {
+                  svals[j] = values.get(j);
+                }
+                doc.addField(name, svals);
+              } else {
+                doc.addField(name, values.get(0));
+              }
             }
           }
-
-
-        ExecuteProcessThread t = new ExecuteProcessThread(client, url.toString());
-        try {
-          t.start();
-          boolean wasInterrupted = false;
-          try {
-            InputStream is = t.getSafeInputStream();
-            long fileLength = t.getStreamLength();
+          if ("provided".equals(genericAuthMode)) {
+            if (item.auth != null) {
+              String[] acl = new String[item.auth.size()];
+              for (int j = 0; j < item.auth.size(); j++) {
+                acl[j] = item.auth.get(j);
+              }
+              doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acl,new String[]{defaultAuthorityDenyToken});
+            }
+          } else {
+            if (acls.length > 0) {
+              doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls,new String[]{defaultAuthorityDenyToken});
+            }
+          }
+          if (item.content != null) {
             try {
-              // Can only index while background thread is running!
-              doc.setBinary(is, fileLength);
-              activities.ingestDocumentWithException(documentIdentifiers[i], versions[i], item.url, doc);
-            } finally {
-              is.close();
+              byte[] content = item.content.getBytes(StandardCharsets.UTF_8);
+              ByteArrayInputStream is = new ByteArrayInputStream(content);
+              try {
+                doc.setBinary(is, content.length);
+                activities.ingestDocumentWithException(documentIdentifier, versionString, item.url, doc);
+                is.close();
+              } finally {
+                is.close();
+              }
+            } catch (IOException ex) {
+              handleIOException(ex);
             }
-          } catch (ManifoldCFException e) {
-            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
-              wasInterrupted = true;
+          } else {
+            url = new StringBuilder(genericEntryPoint);
+
+            url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEM);
+            url.append("&id=").append(URLEncoder.encode(documentIdentifier));
+            for (int j = 0; j < spec.getChildCount(); j++) {
+              SpecificationNode sn = spec.getChild(j);
+              if (sn.getType().equals("param")) {
+                String paramName = sn.getAttributeValue("name");
+                String paramValue = sn.getValue();
+                url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
+              }
             }
-            throw e;
-          } catch (java.net.SocketTimeoutException e) {
-            throw e;
-          } catch (InterruptedIOException e) {
-            wasInterrupted = true;
-            throw e;
-          } finally {
-            if (!wasInterrupted) {
-              t.finishUp();
+
+
+            ExecuteProcessThread t = new ExecuteProcessThread(client, url.toString());
+            try {
+              t.start();
+              boolean wasInterrupted = false;
+              try {
+                InputStream is = t.getSafeInputStream();
+                long fileLength = t.getStreamLength();
+                try {
+                  // Can only index while background thread is running!
+                  doc.setBinary(is, fileLength);
+                  activities.ingestDocumentWithException(documentIdentifier, versionString, item.url, doc);
+                } finally {
+                  is.close();
+                }
+              } catch (ManifoldCFException e) {
+                if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+                  wasInterrupted = true;
+                }
+                throw e;
+              } catch (java.net.SocketTimeoutException e) {
+                throw e;
+              } catch (InterruptedIOException e) {
+                wasInterrupted = true;
+                throw e;
+              } finally {
+                if (!wasInterrupted) {
+                  t.finishUp();
+                }
+              }
+            } catch (InterruptedException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+            } catch (InterruptedIOException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+            } catch (IOException e) {
+              handleIOException(e);
             }
           }
-        } catch (InterruptedException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-        } catch (InterruptedIOException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-        } catch (IOException e) {
-          handleIOException(e);
         }
       }
-    }
-  }
-
-  @Override
-  public void releaseDocumentVersions(String[] documentIdentifiers, String[] versions) throws ManifoldCFException {
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      if (documentCache.containsKey(documentIdentifiers[i])) {
-        documentCache.remove(documentIdentifiers[i]);
+      
+    } finally {
+      for (String documentIdentifier : documentIdentifiers) {
+        if (documentCache.containsKey(documentIdentifier)) {
+          documentCache.remove(documentIdentifier);
+        }
       }
     }
-    super.releaseDocumentVersions(documentIdentifiers, versions);
   }
 
   @Override
@@ -1034,7 +1006,7 @@ public class GenericConnector extends Ba
     return true;
   }
 
-  protected static String[] getAcls(DocumentSpecification spec) {
+  protected static String[] getAcls(Specification spec) {
     HashMap map = new HashMap();
     int i = 0;
     while (i < spec.getChildCount()) {
@@ -1127,9 +1099,9 @@ public class GenericConnector extends Ba
 
   protected static class ExecuteSeedingThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected final XThreadStringBuffer seedBuffer;
 
@@ -1205,21 +1177,21 @@ public class GenericConnector extends Ba
 
   protected static class DocumentVersionThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected Throwable exception = null;
 
-    protected String[] versions;
+    protected final String[] versions;
 
-    protected ConcurrentHashMap<String, Item> documentCache;
+    protected final ConcurrentHashMap<String, Item> documentCache;
 
-    protected String[] documentIdentifiers;
+    protected final String[] documentIdentifiers;
 
-    protected String genericAuthMode;
+    protected final String genericAuthMode;
 
-    protected String defaultRights;
+    protected final String defaultRights;
 
     public DocumentVersionThread(HttpClient client, String url, String[] documentIdentifiers, String genericAuthMode, String defaultRights, ConcurrentHashMap<String, Item> documentCache) {
       super();
@@ -1277,20 +1249,33 @@ public class GenericConnector extends Ba
       }
     }
 
-    public Throwable getException() {
-      return exception;
-    }
-
-    public String[] getVersions() {
+    public String[] finishUp()
+      throws ManifoldCFException, ServiceInterruption, IOException, InterruptedException {
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof ManifoldCFException) {
+          throw (ManifoldCFException) thr;
+        } else if (thr instanceof ServiceInterruption) {
+          throw (ServiceInterruption) thr;
+        } else if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else if (thr instanceof Error) {
+          throw (Error) thr;
+        }
+        throw new ManifoldCFException("getDocumentVersions error: " + thr.getMessage(), thr);
+      }
       return versions;
     }
   }
 
   protected static class ExecuteProcessThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected Throwable exception = null;
 
@@ -1340,7 +1325,7 @@ public class GenericConnector extends Ba
       }
     }
 
-    public InputStream getSafeInputStream() throws InterruptedException, IOException {
+    public InputStream getSafeInputStream() throws InterruptedException, IOException, ManifoldCFException {
       while (true) {
         synchronized (this) {
           if (exception != null) {
@@ -1355,7 +1340,7 @@ public class GenericConnector extends Ba
       }
     }
 
-    public long getStreamLength() throws IOException, InterruptedException {
+    public long getStreamLength() throws IOException, InterruptedException, ManifoldCFException {
       while (true) {
         synchronized (this) {
           if (exception != null) {
@@ -1371,11 +1356,13 @@ public class GenericConnector extends Ba
     }
 
     protected synchronized void checkException(Throwable exception)
-      throws IOException {
+      throws IOException, ManifoldCFException {
       if (exception != null) {
         Throwable e = exception;
         if (e instanceof IOException) {
           throw (IOException) e;
+        } else if (e instanceof ManifoldCFException) {
+          throw (ManifoldCFException) e;
         } else if (e instanceof RuntimeException) {
           throw (RuntimeException) e;
         } else if (e instanceof Error) {
@@ -1387,7 +1374,7 @@ public class GenericConnector extends Ba
     }
 
     public void finishUp()
-      throws InterruptedException, IOException {
+      throws InterruptedException, IOException, ManifoldCFException {
       // This will be called during the finally
       // block in the case where all is well (and
       // the stream completed) and in the case where

Modified: manifoldcf/branches/dev_1x/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java?rev=1626228&r1=1626227&r2=1626228&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/googledrive/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/googledrive/GoogleDriveRepositoryConnector.java Fri Sep 19 14:22:27 2014
@@ -50,6 +50,7 @@ import org.apache.manifoldcf.core.interf
 import org.apache.manifoldcf.crawler.interfaces.DocumentSpecification;
 import org.apache.manifoldcf.crawler.interfaces.IProcessActivity;
 import org.apache.manifoldcf.crawler.interfaces.ISeedingActivity;
+import org.apache.manifoldcf.crawler.interfaces.IExistingVersions;
 import org.apache.log4j.Logger;
 
 import com.google.api.services.drive.model.File;
@@ -986,116 +987,136 @@ public class GoogleDriveRepositoryConnec
     }
   }
   
-  /**
-   * Process a set of documents. This is the method that should cause each
-   * document to be fetched, processed, and the results either added to the
-   * queue of documents for the current job, and/or entered into the
-   * incremental ingestion manager. The document specification allows this
-   * class to filter what is done based on the job.
-   *
-   * @param documentIdentifiers is the set of document identifiers to process.
-   * @param versions is the corresponding document versions to process, as
-   * returned by getDocumentVersions() above. The implementation may choose to
-   * ignore this parameter and always process the current version.
-   * @param activities is the interface this method should use to queue up new
-   * document references and ingest documents.
-   * @param spec is the document specification.
-   * @param scanOnly is an array corresponding to the document identifiers. It
-   * is set to true to indicate when the processing should only find other
-   * references, and should not actually call the ingestion methods.
-   * @param jobMode is an integer describing how the job is being run, whether
-   * continuous or once-only.
-   */
-  @SuppressWarnings("unchecked")
+  /** Process a set of documents.
+  * This is the method that should cause each document to be fetched, processed, and the results either added
+  * to the queue of documents for the current job, and/or entered into the incremental ingestion manager.
+  * The document specification allows this class to filter what is done based on the job.
+  * The connector will be connected before this method can be called.
+  *@param documentIdentifiers is the set of document identifiers to process.
+  *@param statuses are the currently-stored document versions for each document in the set of document identifiers
+  * passed in above.
+  *@param activities is the interface this method should use to queue up new document references
+  * and ingest documents.
+  *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
+  *@param usesDefaultAuthority will be true only if the authority in use for these documents is the default one.
+  */
   @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions,
-      IProcessActivity activities, DocumentSpecification spec,
-      boolean[] scanOnly) throws ManifoldCFException, ServiceInterruption {
-
-    Logging.connectors.debug("GOOGLEDRIVE: Inside processDocuments");
-
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      // MHL for access tokens
-      long startTime = System.currentTimeMillis();
-      String errorCode = "FAILED";
-      String errorDesc = StringUtils.EMPTY;
-      Long fileSize = null;
-      boolean doLog = false;
-      String nodeId = documentIdentifiers[i];
-      String version = versions[i];
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
+    throws ManifoldCFException, ServiceInterruption {
       
-      try {
-        if (Logging.connectors.isDebugEnabled()) {
-          Logging.connectors.debug("GOOGLEDRIVE: Processing document identifier '"
-              + nodeId + "'");
-        }
+    // Forced acls
+    String[] acls = getAcls(spec);
+    // Sort it,
+    java.util.Arrays.sort(acls);
 
-        File googleFile = getObject(nodeId);
-        if (googleFile == null || (googleFile.containsKey("explicitlyTrashed") && googleFile.getExplicitlyTrashed())) {
-          //its deleted, move on
-          continue;
-        }
+    for (String documentIdentifier : documentIdentifiers) {
+      File googleFile = getObject(documentIdentifier);
+      String versionString;
+      
+      if (googleFile == null || (googleFile.containsKey("explicitlyTrashed") && googleFile.getExplicitlyTrashed())) {
+        //its deleted, move on
+        activities.deleteDocument(documentIdentifier);
+        continue;
+      }
 
+      if (!isDir(googleFile)) {
+        String rev = googleFile.getModifiedDate().toStringRfc3339();
+        if (StringUtils.isNotEmpty(rev)) {
+          StringBuilder sb = new StringBuilder();
 
-        if (Logging.connectors.isDebugEnabled()) {
-          Logging.connectors.debug("GOOGLEDRIVE: have this file:\t" + googleFile.getTitle());
+          // Acls
+          packList(sb,acls,'+');
+          if (acls.length > 0) {
+            sb.append('+');
+            pack(sb,defaultAuthorityDenyToken,'+');
+          }
+          else
+            sb.append('-');
+
+          sb.append(rev);
+          versionString = sb.toString();
+        } else {
+          //a google document that doesn't contain versioning information will NEVER be processed.
+          // I don't know what this means, and whether it can ever occur.
+          activities.deleteDocument(documentIdentifier);
+          continue;
         }
+      } else {
+        //a google folder will always be processed
+        versionString = StringUtils.EMPTY;
+      }
 
-        if ("application/vnd.google-apps.folder".equals(googleFile.getMimeType())) {
-          //if directory add its children
+      if (versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)) {
+        long startTime = System.currentTimeMillis();
+        String errorCode = "FAILED";
+        String errorDesc = StringUtils.EMPTY;
+        Long fileSize = null;
+        boolean doLog = false;
+        String nodeId = documentIdentifier;
+        String version = versionString;
 
+        try {
           if (Logging.connectors.isDebugEnabled()) {
-            Logging.connectors.debug("GOOGLEDRIVE: its a directory");
+            Logging.connectors.debug("GOOGLEDRIVE: Processing document identifier '"
+                + nodeId + "'");
+            Logging.connectors.debug("GOOGLEDRIVE: have this file:\t" + googleFile.getTitle());
           }
 
-          // adding all the children + subdirs for a folder
+          if ("application/vnd.google-apps.folder".equals(googleFile.getMimeType())) {
+            //if directory add its children
+
+            if (Logging.connectors.isDebugEnabled()) {
+              Logging.connectors.debug("GOOGLEDRIVE: its a directory");
+            }
+
+            // adding all the children + subdirs for a folder
 
-          getSession();
-          GetChildrenThread t = new GetChildrenThread(nodeId);
-          try {
-            t.start();
-            boolean wasInterrupted = false;
+            getSession();
+            GetChildrenThread t = new GetChildrenThread(nodeId);
             try {
-              XThreadStringBuffer childBuffer = t.getBuffer();
-              // Pick up the paths, and add them to the activities, before we join with the child thread.
-              while (true) {
-                // The only kind of exceptions this can throw are going to shut the process down.
-                String child = childBuffer.fetch();
-                if (child ==  null)
-                  break;
-                // Add the pageID to the queue
-                activities.addDocumentReference(child, nodeId, RELATIONSHIP_CHILD);
+              t.start();
+              boolean wasInterrupted = false;
+              try {
+                XThreadStringBuffer childBuffer = t.getBuffer();
+                // Pick up the paths, and add them to the activities, before we join with the child thread.
+                while (true) {
+                  // The only kind of exceptions this can throw are going to shut the process down.
+                  String child = childBuffer.fetch();
+                  if (child ==  null)
+                    break;
+                  // Add the pageID to the queue
+                  activities.addDocumentReference(child, nodeId, RELATIONSHIP_CHILD);
+                }
+              } catch (InterruptedException e) {
+                wasInterrupted = true;
+                throw e;
+              } catch (ManifoldCFException e) {
+                if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+                  wasInterrupted = true;
+                throw e;
+              } finally {
+                if (!wasInterrupted)
+                  t.finishUp();
               }
             } catch (InterruptedException e) {
-              wasInterrupted = true;
-              throw e;
-            } catch (ManifoldCFException e) {
-              if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
-                wasInterrupted = true;
-              throw e;
-            } finally {
-              if (!wasInterrupted)
-                t.finishUp();
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+                ManifoldCFException.INTERRUPTED);
+            } catch (java.net.SocketTimeoutException e) {
+              Logging.connectors.warn("GOOGLEDRIVE: Socket timeout adding child documents: " + e.getMessage(), e);
+              handleIOException(e);
+            } catch (InterruptedIOException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
+                ManifoldCFException.INTERRUPTED);
+            } catch (IOException e) {
+              Logging.connectors.warn("GOOGLEDRIVE: Error adding child documents: " + e.getMessage(), e);
+              handleIOException(e);
             }
-          } catch (InterruptedException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-              ManifoldCFException.INTERRUPTED);
-          } catch (java.net.SocketTimeoutException e) {
-            Logging.connectors.warn("GOOGLEDRIVE: Socket timeout adding child documents: " + e.getMessage(), e);
-            handleIOException(e);
-          } catch (InterruptedIOException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e,
-              ManifoldCFException.INTERRUPTED);
-          } catch (IOException e) {
-            Logging.connectors.warn("GOOGLEDRIVE: Error adding child documents: " + e.getMessage(), e);
-            handleIOException(e);
-          }
 
-        } else {
-          // its a file
-          if (!scanOnly[i]) {
+          } else {
+            // its a file
             doLog = true;
 
             if (Logging.connectors.isDebugEnabled()) {
@@ -1114,28 +1135,16 @@ public class GoogleDriveRepositoryConnec
             Long fileLength = Objects.firstNonNull(googleFile.getFileSize(), 0L);
             if (fileLength != null) {
 
-              // Unpack the version string
-              ArrayList acls = new ArrayList();
-              StringBuilder denyAclBuffer = new StringBuilder();
-              int index = unpackList(acls,version,0,'+');
-              if (index < version.length() && version.charAt(index++) == '+') {
-                index = unpack(denyAclBuffer,version,index,'+');
-              }
-
-              //otherwise process
               RepositoryDocument rd = new RepositoryDocument();
 
-              // Turn into acls and add into description
-              String[] aclArray = new String[acls.size()];
-              for (int j = 0; j < aclArray.length; j++) {
-                aclArray[j] = (String)acls.get(j);
-              }
-              rd.setSecurityACL(RepositoryDocument.SECURITY_TYPE_DOCUMENT,aclArray);
-              if (denyAclBuffer.length() > 0) {
-                String[] denyAclArray = new String[]{denyAclBuffer.toString()};
-                rd.setSecurityDenyACL(RepositoryDocument.SECURITY_TYPE_DOCUMENT,denyAclArray);
+              if (acls != null) {
+                rd.setSecurityACL(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls);
+                if (acls.length > 0) {
+                  String[] denyAclArray = new String[]{defaultAuthorityDenyToken};
+                  rd.setSecurityDenyACL(RepositoryDocument.SECURITY_TYPE_DOCUMENT,denyAclArray);
+                }
               }
-
+              
               // Now do standard stuff
               String mimeType = googleFile.getMimeType();
               DateTime createdDate = googleFile.getCreatedDate();
@@ -1214,15 +1223,16 @@ public class GoogleDriveRepositoryConnec
               errorDesc = "Document "+nodeId+" had no length; skipping";
             }
           }
+        } finally {
+          if (doLog)
+            activities.recordActivity(new Long(startTime), ACTIVITY_READ,
+              fileSize, nodeId, errorCode, errorDesc, null);
         }
-      } finally {
-        if (doLog)
-          activities.recordActivity(new Long(startTime), ACTIVITY_READ,
-            fileSize, nodeId, errorCode, errorDesc, null);
       }
     }
+    
   }
-
+  
   protected class DocumentReadingThread extends Thread {
 
     protected Throwable exception = null;
@@ -1329,70 +1339,11 @@ public class GoogleDriveRepositoryConnec
     }
   }
 
-  /**
-   * The short version of getDocumentVersions. Get document versions given an
-   * array of document identifiers. This method is called for EVERY document
-   * that is considered. It is therefore important to perform as little work
-   * as possible here.
-   *
-   * @param documentIdentifiers is the array of local document identifiers, as
-   * understood by this connector.
-   * @param spec is the current document specification for the current job. If
-   * there is a dependency on this specification, then the version string
-   * should include the pertinent data, so that reingestion will occur when
-   * the specification changes. This is primarily useful for metadata.
-   * @return the corresponding version strings, with null in the places where
-   * the document no longer exists. Empty version strings indicate that there
-   * is no versioning ability for the corresponding document, and the document
-   * will always be processed.
-   */
-  @Override
-  public String[] getDocumentVersions(String[] documentIdentifiers,
-      DocumentSpecification spec) throws ManifoldCFException,
-      ServiceInterruption {
-
-    // Forced acls
-    String[] acls = getAcls(spec);
-    // Sort it,
-    java.util.Arrays.sort(acls);
-
-    String[] rval = new String[documentIdentifiers.length];
-    for (int i = 0; i < rval.length; i++) {
-      File googleFile = getObject(documentIdentifiers[i]);
-      if (!isDir(googleFile)) {
-        String rev = googleFile.getModifiedDate().toStringRfc3339();
-        if (StringUtils.isNotEmpty(rev)) {
-          StringBuilder sb = new StringBuilder();
-
-          // Acls
-          packList(sb,acls,'+');
-          if (acls.length > 0) {
-            sb.append('+');
-            pack(sb,defaultAuthorityDenyToken,'+');
-          }
-          else
-            sb.append('-');
-
-          sb.append(rev);
-          rval[i] = sb.toString();
-        } else {
-          //a google document that doesn't contain versioning information will NEVER be processed.
-          // I don't know what this means, and whether it can ever occur.
-          rval[i] = null;
-        }
-      } else {
-        //a google folder will always be processed
-        rval[i] = StringUtils.EMPTY;
-      }
-    }
-    return rval;
-  }
-
   /** Grab forced acl out of document specification.
   *@param spec is the document specification.
   *@return the acls.
   */
-  protected static String[] getAcls(DocumentSpecification spec) {
+  protected static String[] getAcls(Specification spec) {
     Set<String> map = new HashSet<String>();
     for (int i = 0; i < spec.getChildCount(); i++) {
       SpecificationNode sn = spec.getChild(i);

Modified: manifoldcf/branches/dev_1x/connectors/gridfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/gridfs/GridFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/gridfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/gridfs/GridFSRepositoryConnector.java?rev=1626228&r1=1626227&r2=1626228&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/gridfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/gridfs/GridFSRepositoryConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/gridfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/gridfs/GridFSRepositoryConnector.java Fri Sep 19 14:22:27 2014
@@ -44,10 +44,12 @@ import org.apache.manifoldcf.core.interf
 import org.apache.manifoldcf.core.interfaces.IPostParameters;
 import org.apache.manifoldcf.core.interfaces.IThreadContext;
 import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
+import org.apache.manifoldcf.core.interfaces.Specification;
 import org.apache.manifoldcf.crawler.connectors.BaseRepositoryConnector;
 import org.apache.manifoldcf.crawler.interfaces.DocumentSpecification;
 import org.apache.manifoldcf.crawler.interfaces.IProcessActivity;
 import org.apache.manifoldcf.crawler.interfaces.ISeedingActivity;
+import org.apache.manifoldcf.crawler.interfaces.IExistingVersions;
 import org.apache.manifoldcf.crawler.system.Logging;
 import org.bson.types.ObjectId;
 
@@ -379,69 +381,67 @@ public class GridFSRepositoryConnector e
         }
     }
 
-    /**
-     * Process a set of documents. This is the method that should cause each
-     * document to be fetched, processed, and the results either added to the
-     * queue of documents for the current job, and/or entered into the
-     * incremental ingestion manager. The document specification allows this
-     * class to filter what is done based on the job. The connector will be
-     * connected before this method can be called.
-     *
-     * @param documentIdentifiers is the set of document identifiers to process.
-     * @param versions is the corresponding document versions to process, as
-     * returned by getDocumentVersions() above. The implementation may choose to
-     * ignore this parameter and always process the current version.
-     * @param activities is the interface this method should use to queue up new
-     * document references and ingest documents.
-     * @param spec is the document specification.
-     * @param scanOnly is an array corresponding to the document identifiers. It
-     * is set to true to indicate when the processing should only find other
-     * references, and should not actually call the ingestion methods.
-     * @throws org.apache.manifoldcf.core.interfaces.ManifoldCFException
-     * @throws org.apache.manifoldcf.agents.interfaces.ServiceInterruption
-     */
-    @Override
-    public void processDocuments(String[] documentIdentifiers, String[] versions,
-            IProcessActivity activities, DocumentSpecification spec,
-            boolean[] scanOnly) throws ManifoldCFException, ServiceInterruption {
-        if (Logging.connectors.isDebugEnabled()) {
-            Logging.connectors.debug("GridFS: Inside processDocuments");
-        }
-        int i = 0;
-        while (i < documentIdentifiers.length) {
-            long startTime = System.currentTimeMillis();
-            String errorCode = "OK";
-            String errorDesc = null;
-            String _id = documentIdentifiers[i];
-            String version = versions[i];
+    /** Process a set of documents.
+    * This is the method that should cause each document to be fetched, processed, and the results either added
+    * to the queue of documents for the current job, and/or entered into the incremental ingestion manager.
+    * The document specification allows this class to filter what is done based on the job.
+    * The connector will be connected before this method can be called.
+    *@param documentIdentifiers is the set of document identifiers to process.
+    *@param statuses are the currently-stored document versions for each document in the set of document identifiers
+    * passed in above.
+    *@param activities is the interface this method should use to queue up new document references
+    * and ingest documents.
+    *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
+    *@param usesDefaultAuthority will be true only if the authority in use for these documents is the default one.
+    */
+    @Override
+    public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
+      IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
+      throws ManifoldCFException, ServiceInterruption {
+        
+        for (String documentIdentifier : documentIdentifiers) {
+          
+            String versionString;
+            GridFS gfs;
+            GridFSDBFile document;
+          
             getSession();
-            GridFS gfs = new GridFS(session, bucket);
-
-            RepositoryDocument rd = new RepositoryDocument();
-            if (Logging.connectors.isDebugEnabled()) {
-                Logging.connectors.debug("GridFS: Processing document _id = " + _id);
-            }
-
-            GridFSDBFile document = gfs.findOne(new ObjectId(_id));
-
+            String _id = documentIdentifier;
+            gfs = new GridFS(session, bucket);
+            document = gfs.findOne(new ObjectId(_id));
             if (document == null) {
-                activities.deleteDocument(_id);
-                i++;
+                activities.deleteDocument(documentIdentifier);
                 continue;
+            } else {
+                DBObject metadata = document.getMetaData();
+                versionString = document.getMD5() + "+" + metadata != null
+                        ? Integer.toString(metadata.hashCode())
+                        : StringUtils.EMPTY;
             }
+            
+            if (versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)) {
+                long startTime = System.currentTimeMillis();
+                String errorCode = "OK";
+                String errorDesc = null;
+                String version = versionString;
 
-            DBObject metadata = document.getMetaData();
-            if (metadata == null) {
-                Logging.connectors.warn("GridFS: Document " + _id + " has a null metadata - skipping.");
-                i++;
-                continue;
-            }
+                RepositoryDocument rd = new RepositoryDocument();
+
+                if (Logging.connectors.isDebugEnabled()) {
+                    Logging.connectors.debug("GridFS: Processing document _id = " + _id);
+                }
+
+                DBObject metadata = document.getMetaData();
+                if (metadata == null) {
+                    Logging.connectors.warn("GridFS: Document " + _id + " has a null metadata - skipping.");
+                    activities.noDocument(_id,version);
+                    continue;
+                }
 
-            String urlValue = document.getMetaData().get(this.url) == null
-                    ? StringUtils.EMPTY
-                    : document.getMetaData().get(this.url).toString();
-            if (!StringUtils.isEmpty(urlValue)) {
-                if (!scanOnly[i]) {
+                String urlValue = document.getMetaData().get(this.url) == null
+                        ? StringUtils.EMPTY
+                        : document.getMetaData().get(this.url).toString();
+                if (!StringUtils.isEmpty(urlValue)) {
                     boolean validURL;
                     try {
                         new java.net.URI(urlValue);
@@ -507,17 +507,16 @@ public class GridFSRepositoryConnector e
                                 fileLenght, _id, errorCode, errorDesc, null);
                     } else {
                         Logging.connectors.warn("GridFS: Document " + _id + " has a invalid URL: " + urlValue + " - skipping.");
+                        activities.noDocument(_id,version);
                     }
                 } else {
-                    if (Logging.connectors.isDebugEnabled()) {
-                        Logging.connectors.debug("GridFS: Document " + _id + " wasn't fetched because has still same version.");
-                    }
+                    Logging.connectors.warn("GridFS: Document " + _id + " has a null URL - skipping.");
+                    activities.noDocument(_id,version);
                 }
-            } else {
-                Logging.connectors.warn("GridFS: Document " + _id + " has a null URL - skipping.");
+              
             }
-            i++;
         }
+
     }
 
     protected static void handleIOException(IOException e) throws ManifoldCFException, ServiceInterruption {
@@ -529,51 +528,6 @@ public class GridFSRepositoryConnector e
     }
     
     /**
-     * Get document versions given an array of document identifiers. This method
-     * is called for EVERY document that is considered. It is therefore
-     * important to perform as little work as possible here. The connector will
-     * be connected before this method can be called.
-     *
-     * @param documentIdentifiers is the array of local document identifiers, as
-     * understood by this connector.
-     * @param spec is the current document specification for the current job. If
-     * there is a dependency on this specification, then the version string
-     * should include the pertinent data, so that reingestion will occur when
-     * the specification changes. This is primarily useful for metadata.
-     * @return the corresponding version strings, with null in the places where
-     * the document no longer exists. Empty version strings indicate that there
-     * is no versioning ability for the corresponding document, and the document
-     * will always be processed.
-     * @throws org.apache.manifoldcf.core.interfaces.ManifoldCFException
-     * @throws org.apache.manifoldcf.agents.interfaces.ServiceInterruption
-     */
-    @Override
-    public String[] getDocumentVersions(String[] documentIdentifiers,
-            DocumentSpecification spec) throws ManifoldCFException, ServiceInterruption {
-        if (Logging.connectors.isDebugEnabled()) {
-            Logging.connectors.debug("GridFS: Inside getDocumentVersions");
-        }
-        String[] versions = new String[documentIdentifiers.length];
-        getSession();
-        int i = 0;
-        while (i < versions.length) {
-            String _id = documentIdentifiers[i];
-            GridFS gridfs = new GridFS(session, bucket);
-            GridFSDBFile document = gridfs.findOne(new ObjectId(_id));
-            if (document == null) {
-                versions[i] = null;
-            } else {
-                DBObject metadata = document.getMetaData();
-                versions[i] = document.getMD5() + "+" + metadata != null
-                        ? Integer.toString(metadata.hashCode())
-                        : StringUtils.EMPTY;
-            }
-            i++;
-        }
-        return versions;
-    }
-
-    /**
      * Output the configuration header section. This method is called in the
      * head section of the connector's configuration page. Its purpose is to add
      * the required tabs to the list, and to output any javascript methods that

Modified: manifoldcf/branches/dev_1x/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1626228&r1=1626227&r2=1626228&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java Fri Sep 19 14:22:27 2014
@@ -321,211 +321,170 @@ public class HDFSRepositoryConnector ext
     }
   }
 
-  /** Get document versions given an array of document identifiers.
-   * This method is called for EVERY document that is considered. It is therefore important to perform
-   * as little work as possible here.
-   * The connector will be connected before this method can be called.
-   *@param documentIdentifiers is the array of local document identifiers, as understood by this connector.
-   *@param oldVersions is the corresponding array of version strings that have been saved for the document identifiers.
-   *   A null value indicates that this is a first-time fetch, while an empty string indicates that the previous document
-   *   had an empty version string.
-   *@param activities is the interface this method should use to perform whatever framework actions are desired.
-   *@param spec is the current document specification for the current job.  If there is a dependency on this
-   * specification, then the version string should include the pertinent data, so that reingestion will occur
-   * when the specification changes.  This is primarily useful for metadata.
-   *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
-   *@param usesDefaultAuthority will be true only if the authority in use for these documents is the default one.
-   *@return the corresponding version strings, with null in the places where the document no longer exists.
-   * Empty version strings indicate that there is no versioning ability for the corresponding document, and the document
-   * will always be processed.
-   */
-  public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions, IVersionActivity activities,
-    DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
-    throws ManifoldCFException, ServiceInterruption
-  {
-    String[] rval = new String[documentIdentifiers.length];
-    for (int i = 0; i < rval.length; i++) {
-      String documentIdentifier = documentIdentifiers[i];
+  /** Process a set of documents.
+  * This is the method that should cause each document to be fetched, processed, and the results either added
+  * to the queue of documents for the current job, and/or entered into the incremental ingestion manager.
+  * The document specification allows this class to filter what is done based on the job.
+  * The connector will be connected before this method can be called.
+  *@param documentIdentifiers is the set of document identifiers to process.
+  *@param statuses are the currently-stored document versions for each document in the set of document identifiers
+  * passed in above.
+  *@param activities is the interface this method should use to queue up new document references
+  * and ingest documents.
+  *@param jobMode is an integer describing how the job is being run, whether continuous or once-only.
+  *@param usesDefaultAuthority will be true only if the authority in use for these documents is the default one.
+  */
+  @Override
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
+    throws ManifoldCFException, ServiceInterruption {
+      
+    for (String documentIdentifier : documentIdentifiers) {
+      
+      String versionString;
       
       FileStatus fileStatus = getObject(new Path(documentIdentifier));
       if (fileStatus != null) {
-        if (fileStatus.isDirectory()) {
+        
+        boolean isDirectory = fileStatus.isDirectory();
+        
+        if (isDirectory) {
           // If HDFS directory modify dates are transitive, as they are on Unix,
           // then getting the modify date of the current version is sufficient
           // to detect any downstream changes we need to be aware of.
           // (If this turns out to be a bad assumption, this should simply set rval[i] ="").
           long lastModified = fileStatus.getModificationTime();
-          rval[i] = new Long(lastModified).toString();
-        } else {
-          long fileLength = fileStatus.getLen();
-          if (activities.checkLengthIndexable(fileLength)) {
-            long lastModified = fileStatus.getModificationTime();
-            StringBuilder sb = new StringBuilder();
-            // Check if the path is to be converted.  We record that info in the version string so that we'll reindex documents whose
-            // URI's change.
-            String nameNode = nameNodeProtocol + "://" + nameNodeHost + ":" + nameNodePort;
-            String convertPath = findConvertPath(nameNode, spec, fileStatus.getPath());
-            if (convertPath != null)
-            {
-              // Record the path.
-              sb.append("+");
-              pack(sb,convertPath,'+');
+          versionString = new Long(lastModified).toString();
+          
+          if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)) {
+            // Process directory!
+            String entityReference = documentIdentifier;
+            FileStatus[] fileStatuses = getChildren(fileStatus.getPath());
+            if (fileStatuses == null) {
+              continue;
+            }
+            for (int j = 0; j < fileStatuses.length; j++) {
+              FileStatus fs = fileStatuses[j++];
+              String canonicalPath = fs.getPath().toString();
+              if (checkInclude(session.getUri().toString(),fs,canonicalPath,spec)) {
+                activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
+              }
             }
-            else
-              sb.append("-");
-            sb.append(new Long(lastModified).toString()).append(":").append(new Long(fileLength).toString());
-            rval[i] = sb.toString();
-          } else {
-            rval[i] = null;
           }
-        }
-      } else {
-        rval[i] = null;
-      }
-    }
-    
-    return rval;
-  }
-
-
-  /** Process a set of documents.
-   * This is the method that should cause each document to be fetched, processed, and the results either added
-   * to the queue of documents for the current job, and/or entered into the incremental ingestion manager.
-   * The document specification allows this class to filter what is done based on the job.
-   *@param documentIdentifiers is the set of document identifiers to process.
-   *@param activities is the interface this method should use to queue up new document references
-   * and ingest documents.
-   *@param spec is the document specification.
-   *@param scanOnly is an array corresponding to the document identifiers.  It is set to true to indicate when the processing
-   * should only find other references, and should not actually call the ingestion methods.
-   */
-  @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity activities, DocumentSpecification spec, boolean[] scanOnly)
-    throws ManifoldCFException, ServiceInterruption {
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      String version = versions[i];
-      String documentIdentifier = documentIdentifiers[i];
-        
-      if (Logging.connectors.isDebugEnabled()) {
-        Logging.connectors.debug("HDFS: Processing document identifier '" + documentIdentifier + "'");
-      }
-      FileStatus fileStatus = getObject(new Path(documentIdentifier));
-        
-      if (fileStatus == null) {
-        // It is no longer there , so delete right away
-        activities.deleteDocument(documentIdentifier);
-        continue;
-      }
-        
-      if (fileStatus.isDirectory()) {
-        /*
-          * Queue up stuff for directory
-          */
-        String entityReference = documentIdentifier;
-        FileStatus[] fileStatuses = getChildren(fileStatus.getPath());
-        if (fileStatuses == null) {
-          continue;
-        }
-        for (int j = 0; j < fileStatuses.length; j++) {
-          FileStatus fs = fileStatuses[j++];
-          String canonicalPath = fs.getPath().toString();
-          if (checkInclude(session.getUri().toString(),fs,canonicalPath,spec)) {
-            activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
+        } else {
+          long lastModified = fileStatus.getModificationTime();
+          StringBuilder sb = new StringBuilder();
+          // Check if the path is to be converted.  We record that info in the version string so that we'll reindex documents whose
+          // URI's change.
+          String nameNode = nameNodeProtocol + "://" + nameNodeHost + ":" + nameNodePort;
+          String convertPath = findConvertPath(nameNode, spec, fileStatus.getPath());
+          if (convertPath != null)
+          {
+            // Record the path.
+            sb.append("+");
+            pack(sb,convertPath,'+');
           }
-        }
-      } else {
-        if (scanOnly[i])
-          continue;
-        if (!checkIngest(session.getUri().toString(),fileStatus,spec))
-          continue;
-
-        // Get the WGet conversion path out of the version string
-        String convertPath = null;
-        if (version.length() > 0 && version.startsWith("+"))
-        {
-          StringBuilder unpack = new StringBuilder();
-          unpack(unpack, version, 1, '+');
-          convertPath = unpack.toString();
-        }
+          else
+            sb.append("-");
+          sb.append(new Long(lastModified).toString());
+          versionString = sb.toString();
+          
+          if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionString)) {
+            // Process file!
+            if (!checkIngest(session.getUri().toString(),fileStatus,spec)) {
+              activities.noDocument(documentIdentifier,versionString);
+              continue;
+            }
 
-        // It is a file to be indexed.
-        
-        // Prepare the metadata part of RepositoryDocument
-        RepositoryDocument data = new RepositoryDocument();
+            long fileLength = fileStatus.getLen();
+            if (!activities.checkLengthIndexable(fileLength)) {
+              activities.noDocument(documentIdentifier,versionString);
+              continue;
+            }
 
-        data.setFileName(fileStatus.getPath().getName());
-        data.setMimeType(mapExtensionToMimeType(fileStatus.getPath().getName()));
-        data.setModifiedDate(new Date(fileStatus.getModificationTime()));
-
-        String uri;
-        if (convertPath != null) {
-          uri = convertToWGETURI(convertPath);
-        } else {
-          uri = fileStatus.getPath().toUri().toString();
-        }
-        data.addField("uri",uri);
+            // It is a file to be indexed.
+            
+            // Prepare the metadata part of RepositoryDocument
+            RepositoryDocument data = new RepositoryDocument();
+
+            data.setFileName(fileStatus.getPath().getName());
+            data.setMimeType(mapExtensionToMimeType(fileStatus.getPath().getName()));
+            data.setModifiedDate(new Date(fileStatus.getModificationTime()));
+
+            String uri;
+            if (convertPath != null) {
+              uri = convertToWGETURI(convertPath);
+            } else {
+              uri = fileStatus.getPath().toUri().toString();
+            }
+            data.addField("uri",uri);
 
-        // We will record document fetch as an activity
-        long startTime = System.currentTimeMillis();
-        String errorCode = "FAILED";
-        String errorDesc = StringUtils.EMPTY;
-        long fileSize = 0;
+            // We will record document fetch as an activity
+            long startTime = System.currentTimeMillis();
+            String errorCode = "FAILED";
+            String errorDesc = StringUtils.EMPTY;
+            long fileSize = 0;
 
-        try {
-          BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
-          try {
-            t.start();
-            boolean wasInterrupted = false;
             try {
-              InputStream is = t.getSafeInputStream();
+              BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
               try {
-                data.setBinary(is, fileSize);
-                activities.ingestDocumentWithException(documentIdentifier,version,uri,data);
-              } finally {
-                is.close();
-              }
-            } catch (java.net.SocketTimeoutException e) {
-              throw e;
-            } catch (InterruptedIOException e) {
-              wasInterrupted = true;
-              throw e;
-            } catch (ManifoldCFException e) {
-              if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
-                wasInterrupted = true;
+                t.start();
+                boolean wasInterrupted = false;
+                try {
+                  InputStream is = t.getSafeInputStream();
+                  try {
+                    data.setBinary(is, fileSize);
+                    activities.ingestDocumentWithException(documentIdentifier,versionString,uri,data);
+                  } finally {
+                    is.close();
+                  }
+                } catch (java.net.SocketTimeoutException e) {
+                  throw e;
+                } catch (InterruptedIOException e) {
+                  wasInterrupted = true;
+                  throw e;
+                } catch (ManifoldCFException e) {
+                  if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+                    wasInterrupted = true;
+                  }
+                  throw e;
+                } finally {
+                  if (!wasInterrupted) {
+                    // This does a join
+                    t.finishUp();
+                  }
+                }
+
+                // No errors.  Record the fact that we made it.
+                errorCode = "OK";
+                // Length we did in bytes
+                fileSize = fileStatus.getLen();
+
+              } catch (InterruptedException e) {
+                // We were interrupted out of the join, most likely.  Before we abandon the thread,
+                // send a courtesy interrupt.
+                t.interrupt();
+                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+              } catch (java.net.SocketTimeoutException e) {
+                errorCode = "IO ERROR";
+                errorDesc = e.getMessage();
+                handleIOException(e);
+              } catch (InterruptedIOException e) {
+                t.interrupt();
+                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+              } catch (IOException e) {
+                errorCode = "IO ERROR";
+                errorDesc = e.getMessage();
+                handleIOException(e);
               }
-              throw e;
             } finally {
-              if (!wasInterrupted) {
-                // This does a join
-                t.finishUp();
-              }
+              activities.recordActivity(new Long(startTime),ACTIVITY_READ,new Long(fileSize),documentIdentifier,errorCode,errorDesc,null);
             }
-
-            // No errors.  Record the fact that we made it.
-            errorCode = "OK";
-            // Length we did in bytes
-            fileSize = fileStatus.getLen();
-
-          } catch (InterruptedException e) {
-            // We were interrupted out of the join, most likely.  Before we abandon the thread,
-            // send a courtesy interrupt.
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-          } catch (java.net.SocketTimeoutException e) {
-            errorCode = "IO ERROR";
-            errorDesc = e.getMessage();
-            handleIOException(e);
-          } catch (InterruptedIOException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-          } catch (IOException e) {
-            errorCode = "IO ERROR";
-            errorDesc = e.getMessage();
-            handleIOException(e);
           }
-        } finally {
-          activities.recordActivity(new Long(startTime),ACTIVITY_READ,new Long(fileSize),documentIdentifier,errorCode,errorDesc,null);
         }
+      } else {
+        activities.deleteDocument(documentIdentifier);
+        continue;
       }
     }
   }
@@ -1409,7 +1368,7 @@ public class HDFSRepositoryConnector ext
   *@param documentIdentifier is the document identifier.
   *@return the part of the path to be converted, or null.
   */
-  protected static String findConvertPath(String nameNode, DocumentSpecification spec, Path theFile)
+  protected static String findConvertPath(String nameNode, Specification spec, Path theFile)
   {
     String fullpath = theFile.toString();
     for (int j = 0; j < spec.getChildCount(); j++)
@@ -1449,7 +1408,7 @@ public class HDFSRepositoryConnector ext
    *@param documentSpecification is the specification.
    *@return true if it should be included.
    */
-  protected static boolean checkInclude(String nameNode, FileStatus fileStatus, String fileName, DocumentSpecification documentSpecification)
+  protected static boolean checkInclude(String nameNode, FileStatus fileStatus, String fileName, Specification documentSpecification)
     throws ManifoldCFException
   {
     if (Logging.connectors.isDebugEnabled())
@@ -1560,7 +1519,7 @@ public class HDFSRepositoryConnector ext
    *@param file is the file.
    *@param documentSpecification is the specification.
    */
-  protected static boolean checkIngest(String nameNode, FileStatus fileStatus, DocumentSpecification documentSpecification)
+  protected static boolean checkIngest(String nameNode, FileStatus fileStatus, Specification documentSpecification)
     throws ManifoldCFException
   {
     // Since the only exclusions at this point are not based on file contents, this is a no-op.



Mime
View raw message