manifoldcf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r1624449 - /manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
Date Fri, 12 Sep 2014 05:35:57 GMT
Author: kwright
Date: Fri Sep 12 05:35:56 2014
New Revision: 1624449

URL: http://svn.apache.org/r1624449
Log:
Upgrade HDFS output connector.  Part of CONNECTORS-977.

Modified:
    manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java

Modified: manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java?rev=1624449&r1=1624448&r2=1624449&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
(original)
+++ manifoldcf/trunk/connectors/hdfs/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/hdfs/HDFSRepositoryConnector.java
Fri Sep 12 05:35:56 2014
@@ -311,211 +311,170 @@ public class HDFSRepositoryConnector ext
     return "";
   }
 
-  /** Get document versions given an array of document identifiers.
-   * This method is called for EVERY document that is considered. It is therefore important
to perform
-   * as little work as possible here.
-   * The connector will be connected before this method can be called.
-   *@param documentIdentifiers is the array of local document identifiers, as understood
by this connector.
-   *@param oldVersions is the corresponding array of version strings that have been saved
for the document identifiers.
-   *   A null value indicates that this is a first-time fetch, while an empty string indicates
that the previous document
-   *   had an empty version string.
-   *@param activities is the interface this method should use to perform whatever framework
actions are desired.
-   *@param spec is the current document specification for the current job.  If there is a
dependency on this
-   * specification, then the version string should include the pertinent data, so that reingestion
will occur
-   * when the specification changes.  This is primarily useful for metadata.
-   *@param jobMode is an integer describing how the job is being run, whether continuous
or once-only.
-   *@param usesDefaultAuthority will be true only if the authority in use for these documents
is the default one.
-   *@return the corresponding version strings, with null in the places where the document
no longer exists.
-   * Empty version strings indicate that there is no versioning ability for the corresponding
document, and the document
-   * will always be processed.
-   */
-  public String[] getDocumentVersions(String[] documentIdentifiers, String[] oldVersions,
IVersionActivity activities,
-    DocumentSpecification spec, int jobMode, boolean usesDefaultAuthority)
-    throws ManifoldCFException, ServiceInterruption
-  {
-    String[] rval = new String[documentIdentifiers.length];
-    for (int i = 0; i < rval.length; i++) {
-      String documentIdentifier = documentIdentifiers[i];
+  /** Process a set of documents.
+  * This is the method that should cause each document to be fetched, processed, and the
results either added
+  * to the queue of documents for the current job, and/or entered into the incremental ingestion
manager.
+  * The document specification allows this class to filter what is done based on the job.
+  * The connector will be connected before this method can be called.
+  *@param documentIdentifiers is the set of document identifiers to process.
+  *@param statuses are the currently-stored document versions for each document in the set
of document identifiers
+  * passed in above.
+  *@param activities is the interface this method should use to queue up new document references
+  * and ingest documents.
+  *@param jobMode is an integer describing how the job is being run, whether continuous or
once-only.
+  *@param usesDefaultAuthority will be true only if the authority in use for these documents
is the default one.
+  */
+  @Override
+  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses,
Specification spec,
+    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
+    throws ManifoldCFException, ServiceInterruption {
+      
+    for (String documentIdentifier : documentIdentifiers) {
+      
+      String versionString;
       
       FileStatus fileStatus = getObject(new Path(documentIdentifier));
       if (fileStatus != null) {
-        if (fileStatus.isDirectory()) {
+        
+        boolean isDirectory = fileStatus.isDirectory();
+        
+        if (isDirectory) {
           // If HDFS directory modify dates are transitive, as they are on Unix,
           // then getting the modify date of the current version is sufficient
           // to detect any downstream changes we need to be aware of.
           // (If this turns out to be a bad assumption, this should simply set rval[i] ="").
           long lastModified = fileStatus.getModificationTime();
-          rval[i] = new Long(lastModified).toString();
-        } else {
-          long fileLength = fileStatus.getLen();
-          if (activities.checkLengthIndexable(fileLength)) {
-            long lastModified = fileStatus.getModificationTime();
-            StringBuilder sb = new StringBuilder();
-            // Check if the path is to be converted.  We record that info in the version
string so that we'll reindex documents whose
-            // URI's change.
-            String nameNode = nameNodeProtocol + "://" + nameNodeHost + ":" + nameNodePort;
-            String convertPath = findConvertPath(nameNode, spec, fileStatus.getPath());
-            if (convertPath != null)
-            {
-              // Record the path.
-              sb.append("+");
-              pack(sb,convertPath,'+');
+          versionString = new Long(lastModified).toString();
+          
+          if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionString))
{
+            // Process directory!
+            String entityReference = documentIdentifier;
+            FileStatus[] fileStatuses = getChildren(fileStatus.getPath());
+            if (fileStatuses == null) {
+              continue;
+            }
+            for (int j = 0; j < fileStatuses.length; j++) {
+              FileStatus fs = fileStatuses[j++];
+              String canonicalPath = fs.getPath().toString();
+              if (checkInclude(session.getUri().toString(),fs,canonicalPath,spec)) {
+                activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
+              }
             }
-            else
-              sb.append("-");
-            sb.append(new Long(lastModified).toString()).append(":").append(new Long(fileLength).toString());
-            rval[i] = sb.toString();
-          } else {
-            rval[i] = null;
           }
-        }
-      } else {
-        rval[i] = null;
-      }
-    }
-    
-    return rval;
-  }
-
-
-  /** Process a set of documents.
-   * This is the method that should cause each document to be fetched, processed, and the
results either added
-   * to the queue of documents for the current job, and/or entered into the incremental ingestion
manager.
-   * The document specification allows this class to filter what is done based on the job.
-   *@param documentIdentifiers is the set of document identifiers to process.
-   *@param activities is the interface this method should use to queue up new document references
-   * and ingest documents.
-   *@param spec is the document specification.
-   *@param scanOnly is an array corresponding to the document identifiers.  It is set to
true to indicate when the processing
-   * should only find other references, and should not actually call the ingestion methods.
-   */
-  @Override
-  public void processDocuments(String[] documentIdentifiers, String[] versions, IProcessActivity
activities, DocumentSpecification spec, boolean[] scanOnly)
-    throws ManifoldCFException, ServiceInterruption {
-    for (int i = 0; i < documentIdentifiers.length; i++) {
-      String version = versions[i];
-      String documentIdentifier = documentIdentifiers[i];
-        
-      if (Logging.connectors.isDebugEnabled()) {
-        Logging.connectors.debug("HDFS: Processing document identifier '" + documentIdentifier
+ "'");
-      }
-      FileStatus fileStatus = getObject(new Path(documentIdentifier));
-        
-      if (fileStatus == null) {
-        // It is no longer there , so delete right away
-        activities.deleteDocument(documentIdentifier);
-        continue;
-      }
-        
-      if (fileStatus.isDirectory()) {
-        /*
-          * Queue up stuff for directory
-          */
-        String entityReference = documentIdentifier;
-        FileStatus[] fileStatuses = getChildren(fileStatus.getPath());
-        if (fileStatuses == null) {
-          continue;
-        }
-        for (int j = 0; j < fileStatuses.length; j++) {
-          FileStatus fs = fileStatuses[j++];
-          String canonicalPath = fs.getPath().toString();
-          if (checkInclude(session.getUri().toString(),fs,canonicalPath,spec)) {
-            activities.addDocumentReference(canonicalPath,documentIdentifier,RELATIONSHIP_CHILD);
+        } else {
+          long lastModified = fileStatus.getModificationTime();
+          StringBuilder sb = new StringBuilder();
+          // Check if the path is to be converted.  We record that info in the version string
so that we'll reindex documents whose
+          // URI's change.
+          String nameNode = nameNodeProtocol + "://" + nameNodeHost + ":" + nameNodePort;
+          String convertPath = findConvertPath(nameNode, spec, fileStatus.getPath());
+          if (convertPath != null)
+          {
+            // Record the path.
+            sb.append("+");
+            pack(sb,convertPath,'+');
           }
-        }
-      } else {
-        if (scanOnly[i])
-          continue;
-        if (!checkIngest(session.getUri().toString(),fileStatus,spec))
-          continue;
-
-        // Get the WGet conversion path out of the version string
-        String convertPath = null;
-        if (version.length() > 0 && version.startsWith("+"))
-        {
-          StringBuilder unpack = new StringBuilder();
-          unpack(unpack, version, 1, '+');
-          convertPath = unpack.toString();
-        }
+          else
+            sb.append("-");
+          sb.append(new Long(lastModified).toString());
+          versionString = sb.toString();
+          
+          if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionString))
{
+            // Process file!
+            if (!checkIngest(session.getUri().toString(),fileStatus,spec)) {
+              activities.noDocument(documentIdentifier,versionString);
+              continue;
+            }
 
-        // It is a file to be indexed.
-        
-        // Prepare the metadata part of RepositoryDocument
-        RepositoryDocument data = new RepositoryDocument();
+            long fileLength = fileStatus.getLen();
+            if (!activities.checkLengthIndexable(fileLength)) {
+              activities.noDocument(documentIdentifier,versionString);
+              continue;
+            }
 
-        data.setFileName(fileStatus.getPath().getName());
-        data.setMimeType(mapExtensionToMimeType(fileStatus.getPath().getName()));
-        data.setModifiedDate(new Date(fileStatus.getModificationTime()));
-
-        String uri;
-        if (convertPath != null) {
-          uri = convertToWGETURI(convertPath);
-        } else {
-          uri = fileStatus.getPath().toUri().toString();
-        }
-        data.addField("uri",uri);
+            // It is a file to be indexed.
+            
+            // Prepare the metadata part of RepositoryDocument
+            RepositoryDocument data = new RepositoryDocument();
+
+            data.setFileName(fileStatus.getPath().getName());
+            data.setMimeType(mapExtensionToMimeType(fileStatus.getPath().getName()));
+            data.setModifiedDate(new Date(fileStatus.getModificationTime()));
+
+            String uri;
+            if (convertPath != null) {
+              uri = convertToWGETURI(convertPath);
+            } else {
+              uri = fileStatus.getPath().toUri().toString();
+            }
+            data.addField("uri",uri);
 
-        // We will record document fetch as an activity
-        long startTime = System.currentTimeMillis();
-        String errorCode = "FAILED";
-        String errorDesc = StringUtils.EMPTY;
-        long fileSize = 0;
+            // We will record document fetch as an activity
+            long startTime = System.currentTimeMillis();
+            String errorCode = "FAILED";
+            String errorDesc = StringUtils.EMPTY;
+            long fileSize = 0;
 
-        try {
-          BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
-          try {
-            t.start();
-            boolean wasInterrupted = false;
             try {
-              InputStream is = t.getSafeInputStream();
+              BackgroundStreamThread t = new BackgroundStreamThread(getSession(),new Path(documentIdentifier));
               try {
-                data.setBinary(is, fileSize);
-                activities.ingestDocumentWithException(documentIdentifier,version,uri,data);
-              } finally {
-                is.close();
-              }
-            } catch (java.net.SocketTimeoutException e) {
-              throw e;
-            } catch (InterruptedIOException e) {
-              wasInterrupted = true;
-              throw e;
-            } catch (ManifoldCFException e) {
-              if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
-                wasInterrupted = true;
+                t.start();
+                boolean wasInterrupted = false;
+                try {
+                  InputStream is = t.getSafeInputStream();
+                  try {
+                    data.setBinary(is, fileSize);
+                    activities.ingestDocumentWithException(documentIdentifier,versionString,uri,data);
+                  } finally {
+                    is.close();
+                  }
+                } catch (java.net.SocketTimeoutException e) {
+                  throw e;
+                } catch (InterruptedIOException e) {
+                  wasInterrupted = true;
+                  throw e;
+                } catch (ManifoldCFException e) {
+                  if (e.getErrorCode() == ManifoldCFException.INTERRUPTED) {
+                    wasInterrupted = true;
+                  }
+                  throw e;
+                } finally {
+                  if (!wasInterrupted) {
+                    // This does a join
+                    t.finishUp();
+                  }
+                }
+
+                // No errors.  Record the fact that we made it.
+                errorCode = "OK";
+                // Length we did in bytes
+                fileSize = fileStatus.getLen();
+
+              } catch (InterruptedException e) {
+                // We were interrupted out of the join, most likely.  Before we abandon the
thread,
+                // send a courtesy interrupt.
+                t.interrupt();
+                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+              } catch (java.net.SocketTimeoutException e) {
+                errorCode = "IO ERROR";
+                errorDesc = e.getMessage();
+                handleIOException(e);
+              } catch (InterruptedIOException e) {
+                t.interrupt();
+                throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
+              } catch (IOException e) {
+                errorCode = "IO ERROR";
+                errorDesc = e.getMessage();
+                handleIOException(e);
               }
-              throw e;
             } finally {
-              if (!wasInterrupted) {
-                // This does a join
-                t.finishUp();
-              }
+              activities.recordActivity(new Long(startTime),ACTIVITY_READ,new Long(fileSize),documentIdentifier,errorCode,errorDesc,null);
             }
-
-            // No errors.  Record the fact that we made it.
-            errorCode = "OK";
-            // Length we did in bytes
-            fileSize = fileStatus.getLen();
-
-          } catch (InterruptedException e) {
-            // We were interrupted out of the join, most likely.  Before we abandon the thread,
-            // send a courtesy interrupt.
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-          } catch (java.net.SocketTimeoutException e) {
-            errorCode = "IO ERROR";
-            errorDesc = e.getMessage();
-            handleIOException(e);
-          } catch (InterruptedIOException e) {
-            t.interrupt();
-            throw new ManifoldCFException("Interrupted: " + e.getMessage(), e, ManifoldCFException.INTERRUPTED);
-          } catch (IOException e) {
-            errorCode = "IO ERROR";
-            errorDesc = e.getMessage();
-            handleIOException(e);
           }
-        } finally {
-          activities.recordActivity(new Long(startTime),ACTIVITY_READ,new Long(fileSize),documentIdentifier,errorCode,errorDesc,null);
         }
+      } else {
+        activities.deleteDocument(documentIdentifier);
+        continue;
       }
     }
   }
@@ -1377,7 +1336,7 @@ public class HDFSRepositoryConnector ext
   *@param documentIdentifier is the document identifier.
   *@return the part of the path to be converted, or null.
   */
-  protected static String findConvertPath(String nameNode, DocumentSpecification spec, Path
theFile)
+  protected static String findConvertPath(String nameNode, Specification spec, Path theFile)
   {
     String fullpath = theFile.toString();
     for (int j = 0; j < spec.getChildCount(); j++)
@@ -1417,7 +1376,7 @@ public class HDFSRepositoryConnector ext
    *@param documentSpecification is the specification.
    *@return true if it should be included.
    */
-  protected static boolean checkInclude(String nameNode, FileStatus fileStatus, String fileName,
DocumentSpecification documentSpecification)
+  protected static boolean checkInclude(String nameNode, FileStatus fileStatus, String fileName,
Specification documentSpecification)
     throws ManifoldCFException
   {
     if (Logging.connectors.isDebugEnabled())
@@ -1528,7 +1487,7 @@ public class HDFSRepositoryConnector ext
    *@param file is the file.
    *@param documentSpecification is the specification.
    */
-  protected static boolean checkIngest(String nameNode, FileStatus fileStatus, DocumentSpecification
documentSpecification)
+  protected static boolean checkIngest(String nameNode, FileStatus fileStatus, Specification
documentSpecification)
     throws ManifoldCFException
   {
     // Since the only exclusions at this point are not based on file contents, this is a
no-op.



Mime
View raw message