incubator-connectors-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kwri...@apache.org
Subject svn commit: r991491 - /incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
Date Wed, 01 Sep 2010 09:48:37 GMT
Author: kwright
Date: Wed Sep  1 09:48:37 2010
New Revision: 991491

URL: http://svn.apache.org/viewvc?rev=991491&view=rev
Log:
Change notification to handle the case of multiple jobs for the same connection finishing
simultaneously.  Only one notification needs to be sent to the connection in that case.  Part
of CONNECTORS-41.

Modified:
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java?rev=991491&r1=991490&r2=991491&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
(original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
Wed Sep  1 09:48:37 2010
@@ -59,6 +59,8 @@ public class JobNotificationThread exten
         {
           Long[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
           
+          HashMap connectionNames = new HashMap();
+          
           int k = 0;
           while (k < jobsNeedingNotification.length)
           {
@@ -68,48 +70,77 @@ public class JobNotificationThread exten
             {
               // Get the connection name
               String connectionName = job.getOutputConnectionName();
-              IOutputConnection connection = connectionManager.load(connectionName);
-              if (connection != null)
+              connectionNames.put(connectionName,connectionName);
+            }
+          }
+          
+          // Attempt to notify the specified connections
+          HashMap notifiedConnections = new HashMap();
+          
+          Iterator iter = connectionNames.keySet().iterator();
+          while (iter.hasNext())
+          {
+            String connectionName = (String)iter.next();
+            
+            IOutputConnection connection = connectionManager.load(connectionName);
+            if (connection != null)
+            {
+              // Grab an appropriate connection instance
+              IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+              if (connector != null)
               {
-                // Grab an appropriate connection instance
-                IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
-                if (connector != null)
+                try
                 {
+                  // Do the notification itself
                   try
                   {
-                    // Do the notification itself
-                    try
-                    {
-                      connector.noteJobComplete();
-                    }
-                    catch (ServiceInterruption e)
-                    {
-                      Logging.threads.warn("Service interruption notifying connection - retrying:
"+e.getMessage(),e);
-                      continue;
-                    }
-                    catch (ACFException e)
-                    {
-                      if (e.getErrorCode() == ACFException.INTERRUPTED)
-                        throw e;
-                      if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
-                        throw e;
-                      if (e.getErrorCode() == ACFException.SETUP_ERROR)
-                        throw e;
-                      // Nothing special; report the error and keep going.
-                      Logging.threads.error(e.getMessage(),e);
-                      continue;
-                    }
-                    // When done, put the job into the Inactive state.
-                    jobManager.inactivateJob(jobID);
+                    connector.noteJobComplete();
+                    notifiedConnections.put(connectionName,connectionName);
                   }
-                  finally
+                  catch (ServiceInterruption e)
                   {
-                    OutputConnectorFactory.release(connector);
+                    Logging.threads.warn("Service interruption notifying connection - retrying:
"+e.getMessage(),e);
+                    continue;
                   }
+                  catch (ACFException e)
+                  {
+                    if (e.getErrorCode() == ACFException.INTERRUPTED)
+                      throw e;
+                    if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+                      throw e;
+                    if (e.getErrorCode() == ACFException.SETUP_ERROR)
+                      throw e;
+                    // Nothing special; report the error and keep going.
+                    Logging.threads.error(e.getMessage(),e);
+                    continue;
+                  }
+                }
+                finally
+                {
+                  OutputConnectorFactory.release(connector);
                 }
               }
             }
           }
+          
+          // Go through jobs again, and put the notified ones into the inactive state.
+          k = 0;
+          while (k < jobsNeedingNotification.length)
+          {
+            Long jobID = jobsNeedingNotification[k++];
+            IJobDescription job = jobManager.load(jobID,true);
+            if (job != null)
+            {
+              // Get the connection name
+              String connectionName = job.getOutputConnectionName();
+              if (notifiedConnections.get(connectionName) != null)
+              {
+                // When done, put the job into the Inactive state.  Otherwise, the notification
will be retried until it succeeds.
+                jobManager.inactivateJob(jobID);
+              }
+            }
+          }
+
           ACF.sleep(10000L);
         }
         catch (ACFException e)



Mime
View raw message