uima-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r708926 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/ java/org/apache/uima/aae/controller/ resources/
Date Wed, 29 Oct 2008 15:52:37 GMT
Author: eae
Date: Wed Oct 29 08:52:36 2008
New Revision: 708926

URL: http://svn.apache.org/viewvc?rev=708926&view=rev
Log:
UIMA-1191 apply core and activemq patch files

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=708926&r1=708925&r2=708926&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
Wed Oct 29 08:52:36 2008
@@ -425,11 +425,11 @@
 	public void addEndpoint( Endpoint anEndpoint, String aCasReferenceId)
 	{
 		CacheEntry casRefEntry = getEntry(aCasReferenceId);
-		casRefEntry.addEndpoint(anEndpoint);
-		
-		
+		if ( !casRefEntry.getEndpointMap().containsKey(anEndpoint.getEndpoint())) {
+	    casRefEntry.addEndpoint(anEndpoint);
+		}
 	}
-
+	
 	public Endpoint getEndpoint(String anEndpointName, String aCasReferenceId)
 	{
 		CacheEntry casRefEntry = getEntry(aCasReferenceId);

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=708926&r1=708925&r2=708926&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Wed Oct 29 08:52:36 2008
@@ -627,6 +627,26 @@
     super.stop();
 	}
 
+	private void stopListener( String key, Endpoint endpoint ) throws Exception {
+    //  Stop the Listener on endpoint that has been disabled
+    InputChannel iC = null;
+    String destName = null;
+    if ( endpoint.getDestination() != null ) {
+      System.out.println("Controller:"+getComponentName()+"-Stopping Listener Thread on Endpoint:"+endpoint.getDestination());
         
+      destName = endpoint.getDestination().toString();
+      iC =  getInputChannel(destName);
+    } else {
+       System.out.println("Controller:"+getComponentName()+"-Stopping Listener Thread on
Endpoint:"+endpoint.getReplyToEndpoint());          
+        destName = endpoint.getReplyToEndpoint();
+       iC = getInputChannel(destName);
+    }
+    if ( iC != null ) {
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stopListener",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stopping_listener__INFO", new Object[] {
getComponentName(),  destName, key });
+      }
+      iC.destroyListener(destName, key);
+    }
+	}
 	public void disableDelegates(List aDelegateList) throws AsynchAEException
 	{
 		try
@@ -635,35 +655,31 @@
 			while (it.hasNext())
 			{
 				String key = (String) it.next();
-				System.out.println("Controller:"+getName()+ " Disabling Delegate:"+key+" Due to Excessive
Errors");
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "disableDelegates",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_removing_endpoint_from_map__INFO", new Object[]
{ getName(), key });
-        }
-				//	Change state of the delegate
-				ServiceInfo sf = getDelegateServiceInfo( key );
-				if ( sf != null )
-				{
-					sf.setState("Disabled");
-				}
-				Endpoint endpoint = lookUpEndpoint(key, false);
-				synchronized( destinationMap )
-				{
-					destinationMap.remove(key);
-				}
-				synchronized( disabledDelegateList )
+        Endpoint endpoint = lookUpEndpoint(key, false);
+				//  if the the current delegate is remote, destroy its listener
+        if ( endpoint != null && endpoint.isRemote() )
 				{
-					disabledDelegateList.add(key);
-				}
-				if ( endpoint != null && 
-				     endpoint.isRemote() && 
-				     getUimaEEAdminContext() != null && 
-				     endpoint.getReplyToEndpoint() != null &&
-				     endpoint.getReplyToEndpoint().trim().length() > 0)
-				{
-					System.out.println("Controller:"+getComponentName()+"-Stopping Listener Thread on Endpoint:"+endpoint.getReplyToEndpoint());
				
-					//	Stop the Listener on endpoint that has been disabled
-					getUimaEEAdminContext().stopListener(endpoint.getReplyToEndpoint());
+          stopListener( key, endpoint);
 				}
+        System.out.println("Controller:"+getComponentName()+ " Disabling Delegate:"+key+"
Due to Excessive Errors");
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "disableDelegates",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_removing_endpoint_from_map__INFO", new Object[]
{ getComponentName(), key });
+        }
+        //  Change state of the delegate
+        ServiceInfo sf = getDelegateServiceInfo( key );
+        if ( sf != null )
+        {
+          sf.setState("Disabled");
+        }
+        synchronized( destinationMap )
+        {
+          destinationMap.remove(key);
+        }
+        synchronized( disabledDelegateList )
+        {
+          disabledDelegateList.add(key);
+        }
+
 			}
 			if (flowControllerContainer != null)
 			{
@@ -676,7 +692,6 @@
 				}
 				catch( Exception ex)
 				{
-					//ex.printStackTrace();
 	         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
 	           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
"disableDelegates", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
new Object[] { ex });
 	         }
@@ -1297,7 +1312,7 @@
 	      else 
 	      {
 	        
-	        if ( cacheEntry.isSubordinate())
+	        if ( cacheEntry.isSubordinate() )
 	        {
 	            cacheEntry.setWaitingForRelease(true);
           }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=708926&r1=708925&r2=708926&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
Wed Oct 29 08:52:36 2008
@@ -948,11 +948,13 @@
 
 				if ( anEndpoint != null )
 				{
+				  Endpoint endpoint = null;
 					List list = new ArrayList();
 					String key = "";
-					if ( ((AggregateAnalysisEngineController)this).lookUpEndpoint(anEndpoint, false) ==
null )
+					if ( (endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(anEndpoint,
false)) == null )
 					{
 						key = ((AggregateAnalysisEngineController)this).lookUpDelegateKey(anEndpoint);
+						endpoint = ((AggregateAnalysisEngineController)this).lookUpEndpoint(key, false);
 						list.add(key);
 					}
 					else
@@ -960,8 +962,29 @@
 						key = anEndpoint;
 						list.add(anEndpoint);
 					}
-					
-					((AggregateAnalysisEngineController)this).disableDelegates(list);
+          ((AggregateAnalysisEngineController)this).disableDelegates(list);
+
+          if ( endpoint != null ) {
+	          try {
+	            // Fetch all Cas entries currently awaiting reply from the delegate that was
just
+	            // disabled. In case the delegate was in a parallel step, we need to adjust
the
+	            // the number of responses received to account for the failed delegate. This

+	            // enables the processing to continue.
+	            CacheEntry[] entries = getInProcessCache().getCacheEntriesForEndpoint(endpoint.getEndpoint());
+	            if ( entries != null ) {
+	              for ( int i=0; i < entries.length; i++ ) {
+	                // Check if this is a parallel step
+	                int parallelDelegateCount = entries[i].getNumberOfParallelDelegates();
+	                // Check if all delegated responded
+	                if ( parallelDelegateCount > 1 && entries[i].howManyDelegatesResponded()
< parallelDelegateCount) {
+	                  // increment responders 
+	                  entries[i].incrementHowManyDelegatesResponded();
+	                }
+	              }
+	            }
+	          } catch( Exception e) {
+	          }
+					}
 	         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
 	           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
 		                "handleAction", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_INFO",

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=708926&r1=708925&r2=708926&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
Wed Oct 29 08:52:36 2008
@@ -169,3 +169,4 @@
 UIMAEE_show_remote_delegate_serialization_INFO = >>> Controller: {0} Configured
To Serialize CASes To Remote Delegate: {1} Using {2} Serialization
 UIMAEE_delegate_in_parallel_step_not_remote_WARNING = >>> Controller: {0} Delegate:
{1} Not Remote But Defined In Parallel Step. Only Remote Delegates Can Be In Parallel Step.
 UIMAEE_move_to_single_step_list__FINE = Controller: {0} Moved Delegate: {1} To Single Step
List From Parallel Step List. The Delegate is Co-located. Only Remote Delegates Can Be in
Parallel Step. CAS id: {2} 
+UIMAEE_stopping_listener__INFO = >>> Controller: {0} Stopping Listener on Queue:
{1} For Delegate: {2}



Mime
View raw message