manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1624236 - /manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
Date Thu, 11 Sep 2014 09:32:09 GMT
Author: kwright
Date: Thu Sep 11 09:32:08 2014
New Revision: 1624236

URL: http://svn.apache.org/r1624236
Log:
Upgrade generic connector.  Part of CONNECTORS-977.

Modified:
    manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java

Modified: manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java?rev=1624236&r1=1624235&r2=1624236&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
(original)
+++ manifoldcf/trunk/connectors/generic/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/generic/GenericConnector.java
Thu Sep 11 09:32:08 2014
@@ -258,8 +258,8 @@ public class GenericConnector extends Ba
   }
 
   @Override
-  public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions,
IVersionActivity activities,
-    DocumentSpecification spec, int jobType, boolean usesDefaultAuthority)
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses,
Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
     throws ManifoldCFException, ServiceInterruption {
 
     // Forced acls
@@ -280,208 +280,180 @@ public class GenericConnector extends Ba
     HttpClient client = getClient();
     StringBuilder url = new StringBuilder(genericEntryPoint);
 
-      url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEMS);
-      for (int i = 0; i < documentIdentifiers.length; i++) {
-        url.append("&id[]=").append(URLEncoder.encode(documentIdentifiers[i]));
-      }
-      for (int i = 0; i < spec.getChildCount(); i++) {
-        SpecificationNode sn = spec.getChild(i);
-        if (sn.getType().equals("param")) {
-          String paramName = sn.getAttributeValue("name");
-          String paramValue = sn.getValue();
-          url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
-        }
-      }
-    try {
-      DocumentVersionThread versioningThread = new DocumentVersionThread(client, url.toString(),
documentIdentifiers, genericAuthMode, rights, documentCache);
-      versioningThread.start();
-      versioningThread.join();
-      if (versioningThread.getException() != null) {
-        Throwable thr = versioningThread.getException();
-        if (thr instanceof ManifoldCFException) {
-          if (((ManifoldCFException) thr).getErrorCode() == ManifoldCFException.INTERRUPTED)
{
-            throw new InterruptedException(thr.getMessage());
-          }
-          throw (ManifoldCFException) thr;
-        } else if (thr instanceof ServiceInterruption) {
-          throw (ServiceInterruption) thr;
-        } else if (thr instanceof IOException) {
-          handleIOException((IOException) thr);
-        } else if (thr instanceof RuntimeException) {
-          throw (RuntimeException) thr;
-        }
-        throw new ManifoldCFException("getDocumentVersions error: " + thr.getMessage(), thr);
-      }
-      return versioningThread.getVersions();
-    } catch (InterruptedException ex) {
-      throw new ManifoldCFException(ex.getMessage(), ex, ManifoldCFException.INTERRUPTED);
+    url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEMS);
+    for (int i = 0; i < documentIdentifiers.length; i++) {
+      url.append("&id[]=").append(URLEncoder.encode(documentIdentifiers[i]));
     }
-  }
-
-  @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity
activities,
-    DocumentSpecification spec, boolean[] scanOnly, int jobType)
-    throws ManifoldCFException, ServiceInterruption {
-
-    // Forced acls
-    String[] acls = getAcls(spec);
-
-    String genericAuthMode = "provided";
     for (int i = 0; i < spec.getChildCount(); i++) {
       SpecificationNode sn = spec.getChild(i);
-      if (sn.getType().equals("genericAuthMode")) {
-        genericAuthMode = sn.getValue();
-        break;
+      if (sn.getType().equals("param")) {
+        String paramName = sn.getAttributeValue("name");
+        String paramValue = sn.getValue();
+        url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
       }
     }
-
-    HttpClient client = getClient();
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      activities.checkJobStillActive();
-
-      Item item = documentCache.get(documentIdentifiers[i]);
-      if (item == null) {
-        throw new ManifoldCFException("processDocuments error - no cache entry for: " + documentIdentifiers[i]);
+    
+    String[] versions = null;
+    try {
+      DocumentVersionThread versioningThread = new DocumentVersionThread(client, url.toString(),
documentIdentifiers, genericAuthMode, rights, documentCache);
+      versioningThread.start();
+      try {
+        versions = versioningThread.finishUp();
+      } catch (IOException ex) {
+        handleIOException((IOException)ex);
+      } catch (InterruptedException ex) {
+        throw new ManifoldCFException(ex.getMessage(), ex, ManifoldCFException.INTERRUPTED);
       }
-
-      if (item.related != null) {
-        for (String rel : item.related) {
-          activities.addDocumentReference(rel, documentIdentifiers[i], RELATIONSHIP_RELATED);
+      
+      // Figure out which ones we need to process, and which we should delete
+      for (int i = 0; i < documentIdentifiers.length; i++) {
+        String documentIdentifier = documentIdentifiers[i];
+        String versionString = versions[i];
+        if (versionString == null) {
+          activities.deleteDocument(documentIdentifier);
+          continue;
+        }
+        Item item = documentCache.get(documentIdentifier);
+        if (item == null) {
+          throw new ManifoldCFException("processDocuments error - no cache entry for: " +
documentIdentifier);
         }
-      }
-      if (scanOnly[i]) {
-        continue;
-      }
 
-      RepositoryDocument doc = new RepositoryDocument();
-      if (item.mimeType != null) {
-        doc.setMimeType(item.mimeType);
-      }
-      if (item.created != null) {
-        doc.setCreatedDate(item.created);
-      }
-      if (item.updated != null) {
-        doc.setModifiedDate(item.updated);
-      }
-      if (item.fileName != null) {
-        doc.setFileName(item.fileName);
-      }
-      if (item.metadata != null) {
-        HashMap<String, List<String>> meta = new HashMap<String, List<String>>();
-        for (Meta m : item.metadata) {
-          if (meta.containsKey(m.name)) {
-            meta.get(m.name).add(m.value);
-          } else {
-            List<String> list = new ArrayList<String>(1);
-            list.add(m.value);
-            meta.put(m.name, list);
+        if (item.related != null) {
+          for (String rel : item.related) {
+            activities.addDocumentReference(rel, documentIdentifier, RELATIONSHIP_RELATED);
           }
         }
-        for (String name : meta.keySet()) {
-          List<String> values = meta.get(name);
-          if (values.size() > 1) {
-            String[] svals = new String[values.size()];
-            for (int j = 0; j < values.size(); j++) {
-              svals[j] = values.get(j);
-            }
-            doc.addField(name, svals);
-          } else {
-            doc.addField(name, values.get(0));
+        if (versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier,versionString))
{
+          
+          // Process the document
+          RepositoryDocument doc = new RepositoryDocument();
+          if (item.mimeType != null) {
+            doc.setMimeType(item.mimeType);
           }
-        }
-      }
-      if ("provided".equals(genericAuthMode)) {
-        if (item.auth != null) {
-          String[] acl = new String[item.auth.size()];
-          for (int j = 0; j < item.auth.size(); j++) {
-            acl[j] = item.auth.get(j);
+          if (item.created != null) {
+            doc.setCreatedDate(item.created);
           }
-          doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acl,new String[]{defaultAuthorityDenyToken});
-        }
-      } else {
-        if (acls.length > 0) {
-          doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls,new String[]{defaultAuthorityDenyToken});
-        }
-      }
-      if (item.content != null) {
-        try {
-          byte[] content = item.content.getBytes(StandardCharsets.UTF_8);
-          ByteArrayInputStream is = new ByteArrayInputStream(content);
-          try {
-            doc.setBinary(is, content.length);
-            activities.ingestDocumentWithException(documentIdentifiers[i], versions[i], item.url,
doc);
-            is.close();
-          } finally {
-            is.close();
+          if (item.updated != null) {
+            doc.setModifiedDate(item.updated);
           }
-        } catch (IOException ex) {
-          handleIOException(ex);
-        }
-      } else {
-        StringBuilder url = new StringBuilder(genericEntryPoint);
-
-          url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEM);
-          url.append("&id=").append(URLEncoder.encode(documentIdentifiers[i]));
-          for (int j = 0; j < spec.getChildCount(); j++) {
-            SpecificationNode sn = spec.getChild(j);
-            if (sn.getType().equals("param")) {
-              String paramName = sn.getAttributeValue("name");
-              String paramValue = sn.getValue();
-              url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
+          if (item.fileName != null) {
+            doc.setFileName(item.fileName);
+          }
+          if (item.metadata != null) {
+            HashMap<String, List<String>> meta = new HashMap<String, List<String>>();
+            for (Meta m : item.metadata) {
+              if (meta.containsKey(m.name)) {
+                meta.get(m.name).add(m.value);
+              } else {
+                List<String> list = new ArrayList<String>(1);
+                list.add(m.value);
+                meta.put(m.name, list);
+              }
+            }
+            for (String name : meta.keySet()) {
+              List<String> values = meta.get(name);
+              if (values.size() > 1) {
+                String[] svals = new String[values.size()];
+                for (int j = 0; j < values.size(); j++) {
+                  svals[j] = values.get(j);
+                }
+                doc.addField(name, svals);
+              } else {
+                doc.addField(name, values.get(0));
+              }
             }
           }
-
-
-        ExecuteProcessThread t = new ExecuteProcessThread(client, url.toString());
-        try {
-          t.start();
-          boolean wasInterrupted = false;
-          try {
-            InputStream is = t.getSafeInputStream();
-            long fileLength = t.getStreamLength();
+          if ("provided".equals(genericAuthMode)) {
+            if (item.auth != null) {
+              String[] acl = new String[item.auth.size()];
+              for (int j = 0; j < item.auth.size(); j++) {
+                acl[j] = item.auth.get(j);
+              }
+              doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acl,new String[]{defaultAuthorityDenyToken});
+            }
+          } else {
+            if (acls.length > 0) {
+              doc.setSecurity(RepositoryDocument.SECURITY_TYPE_DOCUMENT,acls,new String[]{defaultAuthorityDenyToken});
+            }
+          }
+          if (item.content != null) {
             try {
-              // Can only index while background thread is running!
-              doc.setBinary(is, fileLength);
-              activities.ingestDocumentWithException(documentIdentifiers[i], versions[i],
item.url, doc);
-            } finally {
-              is.close();
+              byte[] content = item.content.getBytes(StandardCharsets.UTF_8);
+              ByteArrayInputStream is = new ByteArrayInputStream(content);
+              try {
+                doc.setBinary(is, content.length);
+                activities.ingestDocumentWithException(documentIdentifier, versionString,
item.url, doc);
+                is.close();
+              } finally {
+                is.close();
+              }
+            } catch (IOException ex) {
+              handleIOException(ex);
             }
-          } catch (ManifoldCFException e) {
-            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
-              wasInterrupted = true;
+          } else {
+            url = new StringBuilder(genericEntryPoint);
+
+            url.append("?").append(ACTION_PARAM_NAME).append("=").append(ACTION_ITEM);
+            url.append("&id=").append(URLEncoder.encode(documentIdentifier));
+            for (int j = 0; j < spec.getChildCount(); j++) {
+              SpecificationNode sn = spec.getChild(j);
+              if (sn.getType().equals("param")) {
+                String paramName = sn.getAttributeValue("name");
+                String paramValue = sn.getValue();
+                url.append("&").append(URLEncoder.encode(paramName)).append("=").append(URLEncoder.encode(paramValue));
+              }
             }
-            throw e;
-          } catch (java.net.SocketTimeoutException e) {
-            throw e;
-          } catch (InterruptedIOException e) {
-            wasInterrupted = true;
-            throw e;
-          } finally {
-            if (!wasInterrupted) {
-              t.finishUp();
+
+
+            ExecuteProcessThread t = new ExecuteProcessThread(client, url.toString());
+            try {
+              t.start();
+              boolean wasInterrupted = false;
+              try {
+                InputStream is = t.getSafeInputStream();
+                long fileLength = t.getStreamLength();
+                try {
+                  // Can only index while background thread is running!
+                  doc.setBinary(is, fileLength);
+                  activities.ingestDocumentWithException(documentIdentifier, versionString,
item.url, doc);
+                } finally {
+                  is.close();
+                }
+              } catch (ManifoldCFException e) {
+                if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+                  wasInterrupted = true;
+                }
+                throw e;
+              } catch (java.net.SocketTimeoutException e) {
+                throw e;
+              } catch (InterruptedIOException e) {
+                wasInterrupted = true;
+                throw e;
+              } finally {
+                if (!wasInterrupted) {
+                  t.finishUp();
+                }
+              }
+            } catch (InterruptedException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+            } catch (InterruptedIOException e) {
+              t.interrupt();
+              throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+            } catch (IOException e) {
+              handleIOException(e);
             }
           }
-        } catch (InterruptedException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-        } catch (InterruptedIOException e) {
-          t.interrupt();
-          throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-        } catch (IOException e) {
-          handleIOException(e);
         }
       }
-    }
-  }
-
-  @Override
-  public void releaseDocumentVersions(String[] documentIdentifiers, String[] versions) throws
ManifoldCFException {
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      if (documentCache.containsKey(documentIdentifiers[i])) {
-        documentCache.remove(documentIdentifiers[i]);
+      
+    } finally {
+      for (String documentIdentifier : documentIdentifiers) {
+        if (documentCache.containsKey(documentIdentifier)) {
+          documentCache.remove(documentIdentifier);
+        }
       }
     }
-    super.releaseDocumentVersions(documentIdentifiers, versions);
   }
 
   @Override
@@ -1022,7 +994,7 @@ public class GenericConnector extends Ba
     return true;
   }
 
-  protected static String[] getAcls(DocumentSpecification spec) {
+  protected static String[] getAcls(Specification spec) {
     HashMap map = new HashMap();
     int i = 0;
     while (i < spec.getChildCount()) {
@@ -1115,9 +1087,9 @@ public class GenericConnector extends Ba
 
   protected static class ExecuteSeedingThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected final XThreadStringBuffer seedBuffer;
 
@@ -1193,21 +1165,21 @@ public class GenericConnector extends Ba
 
   protected static class DocumentVersionThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected Throwable exception = null;
 
-    protected String[] versions;
+    protected final String[] versions;
 
-    protected ConcurrentHashMap<String, Item> documentCache;
+    protected final ConcurrentHashMap<String, Item> documentCache;
 
-    protected String[] documentIdentifiers;
+    protected final String[] documentIdentifiers;
 
-    protected String genericAuthMode;
+    protected final String genericAuthMode;
 
-    protected String defaultRights;
+    protected final String defaultRights;
 
     public DocumentVersionThread(HttpClient client, String url, String[] documentIdentifiers,
String genericAuthMode, String defaultRights, ConcurrentHashMap<String, Item> documentCache)
{
       super();
@@ -1265,20 +1237,33 @@ public class GenericConnector extends Ba
       }
     }
 
-    public Throwable getException() {
-      return exception;
-    }
-
-    public String[] getVersions() {
+    public String[] finishUp()
+      throws ManifoldCFException, ServiceInterruption, IOException, InterruptedException
{
+      join();
+      Throwable thr = exception;
+      if (thr != null) {
+        if (thr instanceof ManifoldCFException) {
+          throw (ManifoldCFException) thr;
+        } else if (thr instanceof ServiceInterruption) {
+          throw (ServiceInterruption) thr;
+        } else if (thr instanceof IOException) {
+          throw (IOException) thr;
+        } else if (thr instanceof RuntimeException) {
+          throw (RuntimeException) thr;
+        } else if (thr instanceof Error) {
+          throw (Error) thr;
+        }
+        throw new ManifoldCFException("getDocumentVersions error: " + thr.getMessage(), thr);
+      }
       return versions;
     }
   }
 
   protected static class ExecuteProcessThread extends Thread {
 
-    protected HttpClient client;
+    protected final HttpClient client;
 
-    protected String url;
+    protected final String url;
 
     protected Throwable exception = null;
 
@@ -1328,7 +1313,7 @@ public class GenericConnector extends Ba
       }
     }
 
-    public InputStream getSafeInputStream() throws InterruptedException, IOException {
+    public InputStream getSafeInputStream() throws InterruptedException, IOException, ManifoldCFException
{
       while (true) {
         synchronized (this) {
           if (exception != null) {
@@ -1343,7 +1328,7 @@ public class GenericConnector extends Ba
       }
     }
 
-    public long getStreamLength() throws IOException, InterruptedException {
+    public long getStreamLength() throws IOException, InterruptedException, ManifoldCFException
{
       while (true) {
         synchronized (this) {
           if (exception != null) {
@@ -1359,11 +1344,13 @@ public class GenericConnector extends Ba
     }
 
     protected synchronized void checkException(Throwable exception)
-      throws IOException {
+      throws IOException, ManifoldCFException {
       if (exception != null) {
         Throwable e = exception;
         if (e instanceof IOException) {
           throw (IOException) e;
+        } else if (e instanceof ManifoldCFException) {
+          throw (ManifoldCFException) e;
         } else if (e instanceof RuntimeException) {
           throw (RuntimeException) e;
         } else if (e instanceof Error) {
@@ -1375,7 +1362,7 @@ public class GenericConnector extends Ba
     }
 
     public void finishUp()
-      throws InterruptedException, IOException {
+      throws InterruptedException, IOException, ManifoldCFException {
       // This will be called during the finally
       // block in the case where all is well (and
       // the stream completed) and in the case where



Mime
View raw message