uima-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cwik...@apache.org
Subject svn commit: r1196753 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/ uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ uimaj-as-jms/src/...
Date Wed, 02 Nov 2011 18:42:11 GMT
Author: cwiklik
Date: Wed Nov  2 18:42:11 2011
New Revision: 1196753

URL: http://svn.apache.org/viewvc?rev=1196753&view=rev
Log:
UIMA-1435 Modified client side of UIMA AS to manage multiple connections to brokers. Previously
only one static connection per jvm was allowed.  

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
(original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
Wed Nov  2 18:42:11 2011
@@ -157,7 +157,7 @@ public class ActiveMQMessageSender exten
   public MessageProducer getMessageProducer() {
     if ( engine.running && engine.producerInitialized == false  ) {
       try {
-        setConnection(engine.sharedConnection.getConnection());
+        setConnection(engine.lookupConnection(getBrokerURL()).getConnection());
         initializeProducer();
         engine.producerInitialized = true;
       } catch( Exception e) {

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
(original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Wed Nov  2 18:42:11 2011
@@ -30,7 +30,6 @@ import java.util.concurrent.Semaphore;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
@@ -38,18 +37,13 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.ObjectName;
-import javax.naming.Context;
 import javax.naming.InitialContext;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMA_IllegalArgumentException;
@@ -64,7 +58,6 @@ import org.apache.uima.aae.controller.Co
 import org.apache.uima.aae.controller.ControllerLifecycle;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.UimacppServiceController;
-import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.delegate.Delegate.DelegateEntry;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
@@ -74,9 +67,7 @@ import org.apache.uima.aae.message.UIMAM
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
-import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.adapter.jms.service.Dd2spring;
-import org.apache.uima.analysis_engine.AnalysisEngineDescription;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.impl.UimaVersion;
@@ -98,8 +89,6 @@ public class BaseUIMAAsynchronousEngine_
 
   private MessageProducer producer;
 
-  private String brokerURI = null;
-
   private Session session = null;
 
   private Session consumerSession = null;
@@ -137,6 +126,7 @@ public class BaseUIMAAsynchronousEngine_
             "UIMA-AS version " + UIMAFramework.getVersionString());
   }
 
+
   protected TextMessage createTextMessage() throws ResourceInitializationException {
     return new ActiveMQTextMessage();
   }
@@ -232,7 +222,8 @@ public class BaseUIMAAsynchronousEngine_
 
   }
   private void stopConnection() {
-    if (sharedConnection != null) {
+	SharedConnection sharedConnection;
+    if ((sharedConnection = lookupConnection(brokerURI)) != null) {
       // Remove a client from registry
       sharedConnection.unregisterClient(this);
       // The destroy method closes the JMS connection when
@@ -277,8 +268,8 @@ public class BaseUIMAAsynchronousEngine_
 					sharedConnectionSemaphore.acquire();
 					stopConnection();
 				} catch (InterruptedException ex) {
-				  // Force connection stop
-          stopConnection();
+				    // Force connection stop
+                    stopConnection();
 					if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
 							Level.WARNING)) {
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
@@ -330,6 +321,7 @@ public class BaseUIMAAsynchronousEngine_
 
 	private boolean connectionClosedOrInvalid() {
 		synchronized (connectionMux) {
+			SharedConnection sharedConnection = lookupConnection(brokerURI);
 			if (sharedConnection == null
 					|| sharedConnection.getConnection() == null
 					|| ((ActiveMQConnection) sharedConnection.getConnection())
@@ -344,11 +336,13 @@ public class BaseUIMAAsynchronousEngine_
 		return false;
 	}
 
-	protected void createSharedConnection(String aBrokerURI) throws Exception {
+	protected SharedConnection createSharedConnection(String aBrokerURI) throws Exception {
+		SharedConnection sharedConnection = null;
 		synchronized (connectionMux) {
 			try {
 				// Acquire global static semaphore
 				sharedConnectionSemaphore.acquire();
+				sharedConnection = lookupConnection(aBrokerURI);
 				// check the state of a connection
 				if (connectionClosedOrInvalid()) {
 					if (sharedConnection != null
@@ -373,6 +367,8 @@ public class BaseUIMAAsynchronousEngine_
 					sharedConnection = new SharedConnection(
 							new ActiveMQConnectionFactory(aBrokerURI),
 							aBrokerURI);
+
+					sharedConnections.put( aBrokerURI, sharedConnection);
 					// Add AMQ specific connection validator
 					sharedConnection
 							.setConnectionValidator(connectionValidator);
@@ -399,6 +395,7 @@ public class BaseUIMAAsynchronousEngine_
 			}
 
 		}
+		return sharedConnection;
 	}
 
   private void addPrefetch(ActiveMQConnection aConnection) {
@@ -407,13 +404,13 @@ public class BaseUIMAAsynchronousEngine_
     ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
   }
 
-  private void validateConnection(String aBrokerURI) throws Exception {
+  private SharedConnection validateConnection(String aBrokerURI) throws Exception {
     // checks if a sharedConnection exists and if not creates a new one
-    createSharedConnection(aBrokerURI);
+    return createSharedConnection(aBrokerURI);
   }
 
   protected Session getSession(String aBrokerURI) throws Exception {
-    validateConnection(aBrokerURI);
+	SharedConnection sharedConnection = validateConnection(aBrokerURI);
     return getSession(sharedConnection.getConnection());
   }
 
@@ -423,7 +420,7 @@ public class BaseUIMAAsynchronousEngine_
   }
 
   protected MessageProducer lookupProducerForEndpoint(Endpoint anEndpoint) throws Exception
{
-    if (sharedConnection == null || producerSession == null) {
+    if (lookupConnection(brokerURI) == null || producerSession == null) {
       throw new ResourceInitializationException();
     }
     Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
@@ -432,7 +429,7 @@ public class BaseUIMAAsynchronousEngine_
 
   protected void initializeProducer(String aBrokerURI, String aQueueName) throws Exception
{
     // Check if a sharedConnection exists. If not it creates one
-    createSharedConnection(aBrokerURI);
+    SharedConnection sharedConnection = createSharedConnection(aBrokerURI);
 	synchronized (connectionMux) {
 	    initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection());
 	}
@@ -503,7 +500,7 @@ public class BaseUIMAAsynchronousEngine_
    * @throws Exception
    */
   protected void initializeConsumer(String aBrokerURI) throws Exception {
-    createSharedConnection(aBrokerURI);
+	SharedConnection  sharedConnection = createSharedConnection(aBrokerURI);
 	synchronized (connectionMux) {
 	    initializeConsumer(aBrokerURI, sharedConnection.getConnection());
 	}
@@ -698,6 +695,7 @@ public class BaseUIMAAsynchronousEngine_
       // prevent a race condition.
       createSharedConnection(brokerURI);
   	  synchronized (connectionMux) {
+        SharedConnection sharedConnection = lookupConnection(brokerURI);
         // Reuse existing JMS connection if available
         if (sharedConnection != null) {
           initializeProducer(brokerURI, endpoint, sharedConnection.getConnection());
@@ -876,7 +874,7 @@ public class BaseUIMAAsynchronousEngine_
    * 
    */
   public void undeploy(String aSpringContainerId, int stop_level) throws Exception {
-    if (aSpringContainerId == null) {
+    if (aSpringContainerId == null  ) {
       return;
     }
 

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
(original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
Wed Nov  2 18:42:11 2011
@@ -29,6 +29,9 @@ import java.io.Reader;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -40,6 +43,7 @@ import junit.framework.Assert;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.collections.functors.NotNullPredicate;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMA_IllegalStateException;
 import org.apache.uima.aae.UimaClassFactory;
@@ -47,6 +51,7 @@ import org.apache.uima.aae.client.UimaAS
 import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.MessageTimeoutException;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
@@ -65,7 +70,10 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+import org.apache.uima.resourceSpecifier.factory.DeploymentDescriptorFactory;
+import org.apache.uima.resourceSpecifier.factory.UimaASDeploymentDescriptor;
 import org.apache.uima.util.XMLInputSource;
+import org.josql.expressions.IsNullExpression;
 
 public class TestUimaASExtended extends BaseTestSupport {
 
@@ -91,6 +99,81 @@ public class TestUimaASExtended extends 
             + System.getProperty("file.separator") + "bin" + System.getProperty("file.separator")
             + "dd2spring.xsl");
   }
+  public void testMultipleSyncClientsWithMultipleBrokers() throws Exception  {
+	    System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------");
+	    
+	    class RunnableClient implements Runnable {
+	    	String brokerURL;
+	    	BaseTestSupport testSupport;
+            BaseUIMAAsynchronousEngine_impl uimaAsEngine;
+	    	
+	    	RunnableClient(String brokerURL,BaseTestSupport testSupport) {
+	    		this.brokerURL = brokerURL;
+	    		this.testSupport = testSupport;
+	    	}
+	    	public void initialize(String dd, String serviceEndpoint) throws Exception {
+	    		uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+	            // Deploy Uima AS Primitive Service
+	            deployService(uimaAsEngine, dd);
+
+	    		@SuppressWarnings("unchecked")
+			  Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint);
+		  	  appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+		  	  appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+		  	  testSupport.initialize(uimaAsEngine, appCtx);
+		  	  waitUntilInitialized();
+	    	}
+			public void run() {
+				try {
+		            for (int i = 0; i < 1000; i++) {
+			              CAS cas = uimaAsEngine.getCAS();
+			              cas.setDocumentText("Some Text");
+			              System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending
CAS#"+(i + 1) + " Request to a Service Managed by Broker:"+brokerURL);
+			              try {
+				                uimaAsEngine.sendAndReceiveCAS(cas);
+			              } catch( Exception e) {
+			            	  e.printStackTrace();
+			              } finally {
+			                cas.release();
+			              }
+			            }
+			            System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
+			            uimaAsEngine.stop();
+				} catch( Exception e) {
+					e.printStackTrace();
+				}
+
+			}
+	    	
+	    }
+	    
+	    ExecutorService executor = Executors.newCachedThreadPool();
+
+	    //	change broker URl in system properties
+	    System.setProperty("BrokerURL", broker.getMasterConnectorURI().toString());
+	    
+	    RunnableClient client1 = 
+	    		new RunnableClient(broker.getMasterConnectorURI(), this);
+	    client1.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
+
+	    final BrokerService broker2 = setupSecondaryBroker(true);
+
+	    //	change broker URl in system properties
+	    System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
+
+	    RunnableClient client2 = 
+	    		new RunnableClient(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(),
this);
+	    client2.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
+
+	    Future<?> f1 = executor.submit(client1);
+	    Future<?> f2 = executor.submit(client2);
+	    f1.get();
+	    f2.get();
+	    executor.shutdownNow();
+	    broker2.stop();
+	    broker.stop();
+	}
+  
   /**
    * Tests service quiesce and stop support. This test sets a CasPool to 1 to send just one
CAS at a
    * time. After the first CAS is sent, a thread is started with a timer to expire before
the reply
@@ -120,6 +203,7 @@ public class TestUimaASExtended extends 
     spinShutdownThread(eeUimaEngine, 5000, containers, SpringContainerDeployer.QUIESCE_AND_STOP);
     runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()),
             "TopLevelTaeQueue", 3, EXCEPTION_LATCH);
+    eeUimaEngine.stop();
   }
 
   public void testStopNow() throws Exception {
@@ -1243,8 +1327,16 @@ public class TestUimaASExtended extends 
     System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml");
-    runTest(null, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
-            1, PROCESS_LATCH);
+    
+    Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue");
+    appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+    
+    addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); 
+    
+    runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
+            10, PROCESS_LATCH);
   }
 
   public void testDeployAggregateServiceWithDelegateTimeoutAndContinueOnError() throws Exception
{

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
(original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
Wed Nov  2 18:42:11 2011
@@ -164,7 +164,7 @@ public abstract class BaseTestSupport ex
     return false;
   }
 
-  protected void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String,
Object> appCtx)
+  public void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String, Object>
appCtx)
           throws Exception {
     eeUimaEngine.addStatusCallbackListener(listener);
     eeUimaEngine.initialize(appCtx);

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
(original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Wed Nov  2 18:42:11 2011
@@ -37,6 +37,7 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.aae.message.UimaMessageValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
 import org.apache.uima.adapter.jms.message.PendingMessage;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.jms.error.handler.BrokerConnectionException;
@@ -135,7 +136,7 @@ public abstract class BaseMessageSender 
    * that it will not be sent to the destination due to the fact the broker connection is
down.
    */
   private boolean reject(PendingMessage pm ) {
-    return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination.
Connection To Broker "+engine.sharedConnection.getBroker()+" Has Been Lost"));
+    return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination.
Connection To Broker "+engine.getBrokerURI()+" Has Been Lost"));
   }
   
   /**
@@ -146,7 +147,8 @@ public abstract class BaseMessageSender 
     boolean rejectRequest = false;
     //  If the connection to a broker was lost, notify the client
     //  and reject the request unless this is getMeta Ping request.
-    if ( !engine.sharedConnection.isConnectionValid() ) {
+    SharedConnection sharedConnection = engine.lookupConnection(engine.getBrokerURI());
+    if ( sharedConnection != null && !sharedConnection.isConnectionValid() ) {
       String messageKind = "";
       if (pm.getMessageType() == AsynchAEMessage.GetMeta ) {
         messageKind = "GetMeta";
@@ -265,6 +267,9 @@ public abstract class BaseMessageSender 
       //  special case that we dont reject even though the broker connection has been lost.
It is allow to
       //  fall through and will be sent as soon as the connection is recovered.
       boolean rejectRequest = reject(pm);
+      if ( !engine.running) {
+    	  break;
+      }
       //  blocks until the connection is re-established with a broker
       engine.recoverSharedConnectionIfClosed();
       //  get the producer initialized from a valid connection

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
(original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Wed Nov  2 18:42:11 2011
@@ -76,7 +76,6 @@ import org.apache.uima.aae.monitor.stati
 import org.apache.uima.adapter.jms.ConnectionValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.PendingMessage;
-import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
@@ -101,7 +100,9 @@ public abstract class BaseUIMAAsynchrono
   public enum ClientState {INITIALIZING, RUNNING, FAILED, RECONNECTING, STOPPING, STOPPED};
   
   protected ClientState state = ClientState.INITIALIZING;
-  
+
+  protected String brokerURI = null;
+
   protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
 
   protected static final int MetadataTimeout = 1;
@@ -200,7 +201,10 @@ public abstract class BaseUIMAAsynchrono
 
   protected volatile boolean producerInitialized;
 
-  protected static SharedConnection sharedConnection = null;
+  protected static ConcurrentHashMap<String, SharedConnection> sharedConnections =
+		  new ConcurrentHashMap<String, SharedConnection>();
+  
+  //protected static SharedConnection sharedConnection = null;
 
   protected Thread shutdownHookThread = null;
 
@@ -239,6 +243,15 @@ public abstract class BaseUIMAAsynchrono
   
   abstract protected void initializeConsumer(String aBrokerURI, Connection connection) throws
Exception;
 
+  //abstract protected  String getBrokerURI();
+
+  protected void setBrokeryURI(String brokerURI ) {
+	  this.brokerURI = brokerURI;
+  }
+  protected String getBrokerURI() {
+	  return brokerURI;
+  }
+  
   public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
     if (running) {
 	   throw new UIMA_IllegalStateException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE,"UIMAJMS_listener_added_after_initialize__WARNING",
new Object[]{});
@@ -830,6 +843,8 @@ public abstract class BaseUIMAAsynchrono
 
         // The sendCAS() method is synchronized no need to synchronize the code below
         if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
+          SharedConnection sharedConnection = lookupConnection(getBrokerURI());
+
           //  Send Ping to service as getMeta request
           if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen()
) {
             serviceDelegate.setAwaitingPingReply();
@@ -861,7 +876,7 @@ public abstract class BaseUIMAAsynchrono
             return casReferenceId;
           } else {
             if ( !requestToCache.isSynchronousInvocation() ) {
-              Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+"
To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost");
+              Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+"
To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost");
               handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache,
true);
               return casReferenceId;
             } else {
@@ -875,7 +890,7 @@ public abstract class BaseUIMAAsynchrono
             }
           }
         }
-
+        SharedConnection sharedConnection = lookupConnection(getBrokerURI());
         if ( !sharedConnection.isOpen() ) {
           if (requestToCache != null && !requestToCache.isSynchronousInvocation()
&& aCAS != null ) {
             aCAS.release();
@@ -926,7 +941,10 @@ public abstract class BaseUIMAAsynchrono
    * 
    */
   public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException {
-    return this.sendCAS(aCAS, produceNewClientRequestObject());
+    if ( !running ) {
+    	throw new ResourceProcessException(new UimaEEServiceException("Uima AS Client Has Been
Stopped. Rejecting Request to Process CAS"));
+    }
+	  return this.sendCAS(aCAS, produceNewClientRequestObject());
   }
 
   /**
@@ -1188,8 +1206,45 @@ public abstract class BaseUIMAAsynchrono
     	      String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP);
     	      String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
     	      if ( casReferenceId != null && nodeIP != null && pid != null)
{
-    	        onBeforeProcessCAS(status,nodeIP, pid);
-    	      }
+    	    	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(
+                              Level.FINE,
+                              CLASS_NAME.getName(),
+                              "handleServiceInfo",
+                              JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAJMS_calling_onBeforeProcessCAS_FINE",
+                              new Object[] {
+                            	  casReferenceId,
+                             	 String.valueOf(casCachedRequest.getCAS().hashCode())
+                              });
+                  }
+    	    	  onBeforeProcessCAS(status,nodeIP, pid);
+    	    	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(
+                              Level.FINE,
+                              CLASS_NAME.getName(),
+                              "handleServiceInfo",
+                              JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAJMS_completed_onBeforeProcessCAS_FINE",
+                              new Object[] {
+                            	  casReferenceId,
+                             	 String.valueOf(casCachedRequest.getCAS().hashCode())
+                              });
+                  }
+    	     } else {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(
+                         Level.INFO,
+                         CLASS_NAME.getName(),
+                         "handleServiceInfo",
+                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAJMS_skipping_onBeforeProcessCAS_INFO",
+                         new Object[] {
+                       	  casReferenceId,
+                        	 String.valueOf(casCachedRequest.getCAS().hashCode()),
+                        	 nodeIP, pid 
+                         });
+    	    	 
+    	     }
     	}
     } catch( Exception e) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
@@ -2561,7 +2616,8 @@ public abstract class BaseUIMAAsynchrono
   }
 
   public boolean connectionOpen() {
-    if ( sharedConnection != null ) {
+	SharedConnection sharedConnection;
+    if ( (sharedConnection = lookupConnection(getBrokerURI())) != null ) {
       return sharedConnection.isConnectionValid();
     }
     return false;
@@ -2571,7 +2627,9 @@ public abstract class BaseUIMAAsynchrono
    * when the client is stopped or the connection is recovered.
    */
   public boolean recoverSharedConnectionIfClosed() {
+	SharedConnection sharedConnection;
     if ( !connectionOpen() ) {
+      sharedConnection = lookupConnection(getBrokerURI());
       while ( running ) {
         //  blocks until connection is refreshed 
         try {
@@ -2611,6 +2669,14 @@ public abstract class BaseUIMAAsynchrono
   protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) throws Exception
{
   }
 
+  
+  protected SharedConnection lookupConnection(String brokerUrl) {
+	  if ( sharedConnections.containsKey(brokerUrl) ) {
+		  return sharedConnections.get(brokerUrl);
+	  }
+	  return null;
+  }
+  
   // This class is used to share JMS Connection by all instances of UIMA AS
   // client deployed in the same JVM.
   public static class SharedConnection {
@@ -2798,6 +2864,10 @@ public abstract class BaseUIMAAsynchrono
      * @return
      */
     public boolean destroy() {
+    	return destroy(false);
+    }
+    
+    public boolean destroy(boolean doShutdown) {
       synchronized(destroyMux) {
         //  Check if all clients have terminated and only than stop the shared connection
         if (getClientCount() == 0 && connection != null

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Wed
Nov  2 18:42:11 2011
@@ -219,5 +219,6 @@ UIMAJMS_cas_dispatched__INFO= \t>>>>>>> 
 UIMAJMS_cas_reply_rcvd_FINE = \t<<<<<<< UIMA AS Client Received Reply
For CAS:{0} Hashcode:{1}
 UIMAJMS_cas_added_to_pending_FINE = UIMA AS Dispatch Thread Added CAS:{0} Hashcode:{1} To
Outstanding List. Current List:\n\n{2}\n\n
 UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} HashCode:{1} For Processing
- Forwarding to sendCAS() on Thread:{2}
-UIMAJMS_calling_onBeforeProcessCAS_INFO = UIMA AS Client Calling onBeforeMessageProcess For
CAS:{0} Hashcode:{1}
-UIMAJMS_completed_onBeforeProcessCAS_INFO = UIMA AS Client Completed onBeforeMessageProcess
For CAS:{0} Hashcode:{1}
+UIMAJMS_calling_onBeforeProcessCAS_FINE = UIMA AS Client Calling onBeforeMessageProcess For
CAS:{0} Hashcode:{1}
+UIMAJMS_completed_onBeforeProcessCAS_FINE = UIMA AS Client Completed onBeforeMessageProcess
For CAS:{0} Hashcode:{1}
+UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess
For CAS:{0} Hashcode:{1}. Invalid state: Node: {2} IP: {3}
\ No newline at end of file



Mime
View raw message