Return-Path: Delivered-To: apmail-incubator-uima-commits-archive@locus.apache.org Received: (qmail 26944 invoked from network); 22 Oct 2008 16:22:41 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Oct 2008 16:22:41 -0000 Received: (qmail 88466 invoked by uid 500); 22 Oct 2008 16:22:43 -0000 Delivered-To: apmail-incubator-uima-commits-archive@incubator.apache.org Received: (qmail 88451 invoked by uid 500); 22 Oct 2008 16:22:43 -0000 Mailing-List: contact uima-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: uima-dev@incubator.apache.org Delivered-To: mailing list uima-commits@incubator.apache.org Received: (qmail 88442 invoked by uid 99); 22 Oct 2008 16:22:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Oct 2008 09:22:43 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Oct 2008 16:21:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8B5482388979; Wed, 22 Oct 2008 09:22:05 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r707119 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq: JmsEndpointConnection_impl.java JmsInputChannel.java JmsOutputChannel.java UimaDefaultMessageListenerContainer.java Date: Wed, 22 Oct 2008 16:22:05 -0000 To: uima-commits@incubator.apache.org From: eae@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081022162209.8B5482388979@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: eae Date: Wed Oct 22 09:22:04 2008 New Revision: 707119 URL: http://svn.apache.org/viewvc?rev=707119&view=rev Log: (empty) Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=707119&r1=707118&r2=707119&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Oct 22 09:22:04 2008 @@ -45,6 +45,8 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.uima.UIMAFramework; +import org.apache.uima.aae.InputChannel; +import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.Endpoint; import org.apache.uima.aae.error.AsynchAEException; @@ -53,6 +55,7 @@ import org.apache.uima.aae.message.AsynchAEMessage; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.util.Level; +import org.springframework.jms.JmsException; import org.springframework.util.Assert; public class JmsEndpointConnection_impl implements ConsumerListener @@ -93,6 +96,10 @@ private boolean isReplyEndpoint; + private volatile boolean failed = false; + + private Object recoveryMux = new Object(); + public JmsEndpointConnection_impl(Map aConnectionMap, Endpoint anEndpoint) { connectionMap = aConnectionMap; @@ -218,6 +225,9 @@ } catch ( Exception e) { + if ( e instanceof JMSException ) { + handleJmsException( (JMSException)e ); + } throw new AsynchAEException(e); } @@ -471,83 +481,133 @@ { String destinationName = ""; - Exception lastException = null; - // Send a message to the destination. Retry 10 times if unable to send. - // After 10 tries give up and throw exception - for (int i = 0; i < 10; i++) + try { - try - { - stopTimer(); + stopTimer(); - if (conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning()) - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() }); - openChannel(); + if ( failed || conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning()) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() }); + openChannel(); + // The connection has been successful. Now check if we need to create a new listener + // and a temp queue to receive replies. A new listener will be created only if the + // endpoint for the delegate is marked as FAILED. This will be the case if the listener + // on the reply queue for the endpoint has failed. + synchronized( recoveryMux ) { + if ( controller instanceof AggregateAnalysisEngineController ) { + // Using the queue name lookup the delegate key + String key = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint()); + if ( key != null && destination != null && !isReplyEndpoint ) { + // The aggregate has a master list of endpoints which are typically cloned during processing + // This object uses a copy of the master. When a listener fails, the status of the master + // endpoint is changed. To check the status, fetch the master endpoint, check its status + // and if marked as FAILED, create a new listener, a new temp queue and override this + // object endpoint copy destination property. It *IS* a new replyTo temp queue. + Endpoint masterEndpoint = ((AggregateAnalysisEngineController)controller).lookUpEndpoint(key, false); + if ( masterEndpoint.getStatus() == Endpoint.FAILED ) { + // Create a new Listener Object to receive replies + createListener(key); + destination = (Destination)masterEndpoint.getDestination(); + delegateEndpoint.setDestination(destination); + // Override the reply destination. A new listener has been created along with a new temp queue for replies. + aMessage.setJMSReplyTo(destination); + } + } + } } - // Send a reply to a queue provided by the client - if ( isReplyEndpoint && delegateEndpoint.getDestination() != null ) + } + // Send a reply to a queue provided by the client + if ( isReplyEndpoint && delegateEndpoint.getDestination() != null ) + { + destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName(); + if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) { - destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName(); - if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName }); - } - synchronized(producer) - { - producer.send((Destination)delegateEndpoint.getDestination(), aMessage); - } + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName }); } - else + synchronized(producer) { - destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName(); - if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName }); - } - synchronized(producer) - { - producer.send(aMessage); - } + producer.send((Destination)delegateEndpoint.getDestination(), aMessage); } + } + else + { + destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName(); + if ( UIMAFramework.getLogger().isLoggable(Level.FINE)) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName }); + } + synchronized(producer) + { + producer.send(aMessage); + } + } if (startTimer) { startTimer(connectionCreationTimestamp); } - lastException = null; return true; - } - catch ( Exception e) - { - lastException = e; - // If the controller has been stopped no need to send messages - if ( controller.isStopped()) - { - return true; - } - else - { - e.printStackTrace(); - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 }); - } - } - try + } + catch ( Exception e) + { + // If the controller has been stopped no need to send messages + if ( controller.isStopped()) { - wait(50); + return true; } - catch ( Exception ex) + else { + if ( e instanceof JMSException ) { + handleJmsException( (JMSException)e ); + } + + e.printStackTrace(); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e}); } } - if ( lastException != null ) - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), lastException}); - } stopTimer(); return false; } + /** + * This method is called during recovery of failed connection. It is only called if the endpoint + * associated with a given delegate is marked as FAILED. It is marked that way when a listener + * attached to the reply queue fails. This method creates a new listener and a new temp queue. + * + * @param delegateKey + * @throws Exception + */ + private void createListener(String delegateKey) throws Exception { + if ( controller instanceof AggregateAnalysisEngineController ) { + // Fetch an InputChannel that handles messages for a given delegate + InputChannel iC = controller.getReplyInputChannel(delegateKey); + // Create a new Listener, new Temp Queue and associate the listener with the Input Channel + iC.createListener(delegateKey); + } + } + + private synchronized void handleJmsException( JMSException ex) { + if ( failed ) { + return; // Already marked failed + } + failed = true; +/* + try { + if ( controller instanceof AggregateAnalysisEngineController ) { + String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint()); + if ( delegateEndpoint.getDestination() != null ) { + InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getDestination().toString()); + iC.destroyListener(delegateEndpoint.getDestination().toString(), delegateKey); + } else { + InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getEndpoint()); + iC.destroyListener(delegateEndpoint.getEndpoint(), delegateKey); + } + } + } catch( Exception e) { + e.printStackTrace(); + } +*/ + } public void onConsumerEvent(ConsumerEvent arg0) { if (controller != null) Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=707119&r1=707118&r2=707119&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Oct 22 09:22:04 2008 @@ -22,22 +22,21 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; -import mx4j.tools.adaptor.http.GetAttributeCommandProcessor; - import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQSession; import org.apache.activemq.command.ActiveMQMessage; import org.apache.uima.UIMAFramework; import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; +import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.controller.Endpoint_impl; import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController; import org.apache.uima.aae.error.InvalidMessageException; import org.apache.uima.aae.handler.Handler; @@ -94,6 +93,9 @@ private Object mux = new Object(); + private ConcurrentHashMap failedListenerMap = + new ConcurrentHashMap(); + public AnalysisEngineController getController() { return controller; @@ -236,7 +238,8 @@ if ( command != AsynchAEMessage.Process && command != AsynchAEMessage.GetMeta && command != AsynchAEMessage.ReleaseCAS && - command != AsynchAEMessage.Stop && + command != AsynchAEMessage.Stop && + command != AsynchAEMessage.Ping && command != AsynchAEMessage.CollectionProcessComplete ) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), @@ -270,7 +273,8 @@ int command = aMessage.getIntProperty(AsynchAEMessage.Command); if ( command == AsynchAEMessage.GetMeta || command == AsynchAEMessage.CollectionProcessComplete || - command == AsynchAEMessage.Stop || + command == AsynchAEMessage.Stop || + command == AsynchAEMessage.Ping || command == AsynchAEMessage.ReleaseCAS) { // Payload not included in GetMeta Request @@ -316,7 +320,7 @@ // Shutting down return true; } - if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response ) + if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response ) { String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference); if (!getController().getInProcessCache().entryExists(casReferenceId)) @@ -404,8 +408,10 @@ return "CollectionProcessComplete"; case AsynchAEMessage.ReleaseCAS: return "ReleaseCAS"; - case AsynchAEMessage.Stop: - return "Stop"; + case AsynchAEMessage.Stop: + return "Stop"; + case AsynchAEMessage.Ping: + return "Ping"; } } @@ -706,7 +712,11 @@ public String getInputQueueName() { if ( messageListener != null ) - return messageListener.getDestinationName();//getEndpointName(); + if ( messageListener.getDestination() != null ) { + return messageListener.getDestination().toString(); + } else { + return messageListener.getDestinationName();//getEndpointName(); + } else { return ""; @@ -777,4 +787,103 @@ { return messageListener.getConcurrentConsumers(); } + + public void createListener( String aDelegateKey ) throws Exception { + + UimaDefaultMessageListenerContainer failedListener = + failedListenerMap.get(aDelegateKey); + UimaDefaultMessageListenerContainer newListener = + new UimaDefaultMessageListenerContainer(); + newListener.setConnectionFactory(failedListener.getConnectionFactory()); + newListener.setMessageListener(this); + newListener.setController(getController()); + + TempDestinationResolver resolver = new TempDestinationResolver(); + resolver.setConnectionFactory((ActiveMQConnectionFactory)failedListener.getConnectionFactory()); + newListener.setDestinationResolver(resolver); + + org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor executor = new org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor(); + executor.setCorePoolSize(failedListener.getConcurrentConsumers()); + executor.setMaxPoolSize(failedListener.getMaxConcurrentConsumers()); + executor.setQueueCapacity(failedListener.getMaxConcurrentConsumers()); + executor.initialize(); + + newListener.setTaskExecutor(executor); + newListener.initialize(); + newListener.start(); + // Wait until the resolver plugs in the destination + while (newListener.getDestination() == null) { + synchronized( newListener ) { + + System.out.println(".... Waiting For Destination ..."); + newListener.wait(100); + } + } + newListener.afterPropertiesSet(); + // Get the endpoint object for a given delegate key from the Aggregate + Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false); + endpoint.setStatus(Endpoint.OK); + // Override the reply destination. + endpoint.setDestination(newListener.getDestination()); + Object clone = ((Endpoint_impl) endpoint).clone(); + newListener.setTargetEndpoint((Endpoint)clone); + } + + private UimaDefaultMessageListenerContainer getListenerForEndpoint( String anEndpointName ) { + for( int i=0; i < listenerContainerList.size(); i++ ) { + UimaDefaultMessageListenerContainer mListener = + (UimaDefaultMessageListenerContainer) listenerContainerList.get(i); + if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) { + return mListener; + } + } + return null; + } + public void destroyListener( String anEndpointName, String aDelegateKey ) { + final UimaDefaultMessageListenerContainer mListener = + getListenerForEndpoint(anEndpointName); + if ( mListener == null ) { + System.out.println("--- Listener For Endpoint: "+aDelegateKey+" Not Found"); + return; + } + + try { +// if ( messageListener.getDestination().toString().equals( anEndpointName)) { + System.out.println("++++ Stopping Listener ..."); + mListener.stop(); + System.out.println("++++ Destroying Listener ..."); + new Thread() { + public void run() { + mListener.destroy(); + } + }; + while( mListener.isRunning()); + System.out.println("++++ Listener on Queue:"+anEndpointName+" Has Been Stopped..."); + Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false); + endpoint.setStatus(Endpoint.FAILED); + if ( mListener.getConnectionFactory() != null) { + if ( getController() instanceof AggregateAnalysisEngineController ) { + if ( !failedListenerMap.containsKey(aDelegateKey )) { + failedListenerMap.put( aDelegateKey, mListener); + listenerContainerList.remove(mListener); + System.out.println("++++ Saving Connection Factory"); + } + } + } + //} + } catch( Exception e) { + e.printStackTrace(); + } + } + public boolean isFailed(String aDelegateKey) { + return failedListenerMap.containsKey(aDelegateKey); + } + public boolean isListenerForDestination( String anEndpointName) { + UimaDefaultMessageListenerContainer mListener = + getListenerForEndpoint(anEndpointName); + if ( mListener == null ) { + return false; + } + return true; + } } Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=707119&r1=707118&r2=707119&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Oct 22 09:22:04 2008 @@ -559,8 +559,6 @@ "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST", new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS }); } - - // Send process request to remote delegate and start timeout timer sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0); } else { Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=707119&r1=707118&r2=707119&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Oct 22 09:22:04 2008 @@ -23,19 +23,21 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.TemporaryQueue; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQDestination; import org.apache.uima.UIMAFramework; +import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.UIMAEE_Constants; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.Endpoint; -import org.apache.uima.aae.controller.Endpoint_impl; import org.apache.uima.aae.error.ErrorHandler; import org.apache.uima.aae.error.Threshold; import org.apache.uima.aae.error.handler.GetMetaErrorHandler; @@ -51,14 +53,16 @@ private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class; private String destinationName=""; private Endpoint endpoint; - private boolean freeCasQueueListener; + private volatile boolean freeCasQueueListener; private AnalysisEngineController controller; - private int retryCount = 2; + private volatile boolean failed = false; + private Object mux = new Object(); public UimaDefaultMessageListenerContainer() { super(); - setRecoveryInterval(60000); + setRecoveryInterval(5); +// setRecoveryInterval(60000); setAcceptMessagesWhileStopping(false); setExceptionListener(this); } @@ -71,6 +75,11 @@ { controller = aController; } + /** + * + * @param t + * @return + */ private boolean disableListener( Throwable t) { System.out.println(t.toString()); @@ -79,119 +88,202 @@ return true; return false; } + /** + * Stops this Listener + */ + private synchronized void handleListenerFailure() { + try { + if ( controller instanceof AggregateAnalysisEngineController ) { + String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint()); + if ( endpoint.getDestination() != null ) { + InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getDestination().toString()); + iC.destroyListener(endpoint.getDestination().toString(), delegateKey); + } else { + InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getEndpoint()); + iC.destroyListener(endpoint.getEndpoint(), delegateKey); + } + } + } catch( Exception e) { + e.printStackTrace(); + } + } + /** + * Handles failure on a temp queue + * @param t + */ + private void handleTempQueueFailure(Throwable t) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING", + new Object[] { endpoint.getDestination(), getBrokerUrl(), t }); + // Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to provide + // the cause. Just the top level IllegalStateException with a text message. This is what we need to + // check for. + if ( t instanceof javax.jms.IllegalStateException && t.getMessage().equals("The Consumer is closed")) { + if ( controller != null && controller instanceof AggregateAnalysisEngineController ) { + String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint()); + try { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_listener_INFO", + new Object[] { controller.getComponentName(), endpoint.getDestination(),delegateKey }); + // Stop current listener + handleListenerFailure(); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(), + "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO", + new Object[] { controller.getComponentName(), endpoint.getDestination() }); + } catch ( Exception e ) { + e.printStackTrace(); + } + } + } else if ( disableListener(t)) { + handleQueueFailure(t); +// terminate(t); + } + } + + private ErrorHandler fetchGetMetaErrorHandler() { + ErrorHandler handler = null; + Iterator it = controller.getErrorHandlerChain().iterator(); + // Find the error handler for GetMeta in the Error Handler List provided in the + // deployment descriptor + while ( it.hasNext() ) + { + handler = (ErrorHandler)it.next(); + if ( handler instanceof GetMetaErrorHandler ) + { + return handler; + } + } + return null; + } + /** + * Handles failures on non-temp queues + * @param t + */ + private void handleQueueFailure(Throwable t) { + final String endpointName = + (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), + "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING", + new Object[] { endpointName, getBrokerUrl(), t }); + boolean terminate = true; + // Check if the failure is severe enough to disable this listener. Whether or not this listener is actully + // disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error Handler is + // configured to terminate the service on failure, this listener will be terminated and the entire service + // will be stopped. + if ( disableListener(t) ) { + endpoint.setReplyDestinationFailed(); + // If this is a listener attached to the Aggregate Controller, use GetMeta Error + // Thresholds defined to determine what to do next after failure. Either terminate + // the service or disable the delegate with which this listener is associated with + if ( controller != null && controller instanceof AggregateAnalysisEngineController ) + { + ErrorHandler handler = fetchGetMetaErrorHandler(); + // Fetch a Map containing thresholds for GetMeta for each delegate. + Map thresholds = handler.getEndpointThresholdMap(); + // Lookup delegate's key using delegate's endpoint name + String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint()); + // If the delegate has a threshold defined on GetMeta apply Action defined + if ( delegateKey != null && thresholds.containsKey(delegateKey)) + { + // Fetch the Threshold object containing error configuration + Threshold threshold = (Threshold) thresholds.get(delegateKey); + // Check if the delegate needs to be disabled + if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) { + // The disable delegate method takes a list of delegates + List list = new ArrayList(); + // Add the delegate to disable to the list + list.add(delegateKey); + try { + System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid"); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(), + "handleQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO", + new Object[] { controller.getComponentName(), delegateKey, getBrokerUrl() }); + // Remove the delegate from the routing table. + ((AggregateAnalysisEngineController) controller).disableDelegates(list); + terminate = false; //just disable the delegate and continue + } catch (Exception e) { + e.printStackTrace(); + terminate = true; + } + } + } + } + } + System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl()); + System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint()); + setRecoveryInterval(0); + + // Spin a shutdown thread to terminate listener. + new Thread() { + public void run() + { + try + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), + "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING", + new Object[] { endpointName, getBrokerUrl() }); + + shutdown(); + } + catch( Exception e) { e.printStackTrace();} + } + }.start(); + + if ( terminate ) + { + terminate(t); + } + + } + /** + * This method is called by Spring when a listener fails + */ protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled ) { - if ( !(t instanceof javax.jms.IllegalStateException ) ) - { - t.printStackTrace(); - final String endpointName = - (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); - - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), - "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING", - new Object[] { endpointName, getBrokerUrl(), t }); - - boolean terminate = true; - if ( disableListener(t) ) - { - if ( endpoint != null ) - { - endpoint.setReplyDestinationFailed(); - // If this is a listener attached to the Aggregate Controller, use GetMeta Error - // Thresholds defined to determine what to do next after failure. Either terminate - // the service or disable the delegate with which this listener is associated with - if ( controller != null && controller instanceof AggregateAnalysisEngineController ) - { - ErrorHandler handler = null; - Iterator it = controller.getErrorHandlerChain().iterator(); - // Find the error handler for GetMeta in the Error Handler List provided in the - // deployment descriptor - while ( it.hasNext() ) - { - handler = (ErrorHandler)it.next(); - if ( handler instanceof GetMetaErrorHandler ) - { - break; - } - } - // Fetch a Map containing thresholds for GetMeta for each delegate. - java.util.Map thresholds = handler.getEndpointThresholdMap(); - // Lookup delegate's key using delegate's endpoint name - String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint()); - // If the delegate has a threshold defined on GetMeta apply Action defined - if ( delegateKey != null && thresholds.containsKey(delegateKey)) - { - // Fetcg the Threshold object containing error configuration - Threshold threshold = (Threshold) thresholds.get(delegateKey); - // Check if the delegate needs to be disabled - if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) { - // The disable delegate method takes a list of delegates - List list = new ArrayList(); - // Add the delegate to disable to the list - list.add(delegateKey); - try { - System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid"); - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(), - "handleListenerSetupFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO", - new Object[] { controller.getComponentName(), delegateKey, getBrokerUrl() }); - // Remove the delegate from the routing table. - ((AggregateAnalysisEngineController) controller).disableDelegates(list); - } catch (Exception e) { - e.printStackTrace(); - } - terminate = false; //just disable the delegate and continue - } - } - } - } - - System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl()); - if ( endpoint != null ) - { - System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint()); - } - setRecoveryInterval(0); - - // Spin a shutdown thread to terminate listener. - new Thread() { - public void run() - { - try - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), - "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING", - new Object[] { endpointName, getBrokerUrl() }); - - shutdown(); - } - catch( Exception e) { e.printStackTrace();} - } - }.start(); - - if ( terminate ) - { - // **************************************** - // terminate the service - // **************************************** - System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl()); - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), - "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING", - new Object[] { controller.getComponentName(), getBrokerUrl() }); - controller.stop(); - controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t)); - } - } - else - { - super.handleListenerSetupFailure(t, false); - } - } - - + // If controller is stopping not need to recover the connection + if ( controller != null && controller.isStopped()) { + return; + } + t.printStackTrace(); + + + if ( endpoint == null ) { + + super.handleListenerSetupFailure(t, false); + terminate(t); + return; + } + + synchronized( mux ) { + if ( !failed ) { + // Check if this listener is attached to a temp queue. If so, this is a listener + // on a reply queue. Handle temp queue listener failure differently than an + // input queue listener. + if ( endpoint.isTempReplyDestination()) { + handleTempQueueFailure(t); + } else { + // Handle non-temp queue failure + handleQueueFailure(t); + } + } + failed = true; + } + } + + private void terminate(Throwable t) { + // **************************************** + // terminate the service + // **************************************** + System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl()); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), + "terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING", + new Object[] { controller.getComponentName(), getBrokerUrl() }); + controller.stop(); + controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t)); } - protected void handleListenerException( Throwable t ) { + t.printStackTrace(); String endpointName = (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); @@ -277,6 +369,13 @@ if ( endpoint != null) { endpoint.setDestination(aDestination); + if ( aDestination instanceof TemporaryQueue ) { + endpoint.setTempReplyDestination(true); + System.out.println("Resolver Plugged In a Temp Queue:"+aDestination); + if ( getMessageListener() != null && getMessageListener() instanceof InputChannel ) { + ((JmsInputChannel)getMessageListener()).setListenerContainer(this); + } + } endpoint.setServerURI(getBrokerUrl()); } } @@ -287,7 +386,8 @@ public void onException(JMSException arg0) { - String endpointName = + arg0.printStackTrace(); + String endpointName = (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),