uima-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cwik...@apache.org
Subject svn commit: r736712 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main: java/org/apache/uima/adapter/jms/client/ resources/
Date Thu, 22 Jan 2009 17:02:42 GMT
Author: cwiklik
Date: Thu Jan 22 09:02:41 2009
New Revision: 736712

URL: http://svn.apache.org/viewvc?rev=736712&view=rev
Log:
UIMA-1127 Modified to use GetMeta as Ping message on service Timeout

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=736712&r1=736711&r2=736712&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Thu Jan 22 09:02:41 2009
@@ -235,13 +235,12 @@
 						//	configuration. Most JMS Providers create a special dead-letter queue
 						//	where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ

 						//	are not auto evicted yet and accumulate taking up memory.
-						
 						long timeoutValue = cacheEntry.getProcessTimeout();
 
 						if ( timeoutValue > 0 )
 						{
-							// Set msg expiration time
-							producer.setTimeToLive(timeoutValue);
+							// Set high time to live value
+              producer.setTimeToLive(10*timeoutValue);
 						}
 						if (  pm.getMessageType() == AsynchAEMessage.Process )
 						{
@@ -261,19 +260,6 @@
 			handleException(e, destination);
 		}
 	}
-	private void setMsgExpiration( PendingMessage aPm, String aCommandTimeoutKey)
-	{
-		if ( producer != null && aPm.containsKey(aCommandTimeoutKey))
-		{
-			long timeToLive = Long.parseLong((String)aPm.get(aCommandTimeoutKey));
-			try
-			{
-				producer.setTimeToLive(timeToLive);
-			}
-			catch( Exception e){}
-		}
-		
-	}
 	private void initializeMessage( PendingMessage aPm, Message anOutgoingMessage)
 	throws Exception
 	{
@@ -282,7 +268,6 @@
 		{
 		case AsynchAEMessage.GetMeta:
 			engine.setMetaRequestMessage(anOutgoingMessage);
-			setMsgExpiration(aPm, UimaAsynchronousEngine.GetMetaTimeout);
 			break;
 			
 		case AsynchAEMessage.Process:
@@ -304,7 +289,6 @@
 			
 		case AsynchAEMessage.CollectionProcessComplete:
 			engine.setCPCMessage(anOutgoingMessage);
-			setMsgExpiration(aPm, UimaAsynchronousEngine.CpcTimeout);
 			break;
 		}
 	}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=736712&r1=736711&r2=736712&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Thu Jan 22 09:02:41 2009
@@ -28,6 +28,7 @@
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.BytesMessage;
@@ -53,10 +54,12 @@
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
 import org.apache.uima.aae.client.UimaASStatusCallbackListener;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.InvalidMessageException;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.error.UimaASCollectionProcessCompleteTimeout;
 import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
+import org.apache.uima.aae.error.UimaASPingTimeout;
 import org.apache.uima.aae.error.UimaASProcessCasTimeout;
 import org.apache.uima.aae.error.UimaEEServiceException;
 import org.apache.uima.aae.jmx.UimaASClientInfo;
@@ -94,6 +97,8 @@
 	protected static final int CpCTimeout = 2;
 
 	protected static final int ProcessTimeout = 3;
+	
+	protected static final int PingTimeout = 4;
 
 	protected volatile boolean initialized;
 
@@ -123,9 +128,11 @@
 
 	protected UIDGenerator idGenerator = new UIDGenerator();
 
-	protected ConcurrentHashMap clientCache = new ConcurrentHashMap();
+	protected ConcurrentHashMap<String,ClientRequest> clientCache = 
+	  new ConcurrentHashMap<String,ClientRequest>();
 	
-	protected ConcurrentHashMap threadMonitorMap = new ConcurrentHashMap();
+	protected ConcurrentHashMap<Long, ThreadMonitor> threadMonitorMap = 
+	  new ConcurrentHashMap<Long, ThreadMonitor>();
 
 	//	Default timeout for ProcessCas requests
 	protected int processTimeout = 0;
@@ -165,6 +172,8 @@
 
   protected ClientServiceDelegate serviceDelegate = null;
   
+  private Object stopMux = new Object();
+  
 	protected List pendingMessageList = new ArrayList();
 	protected volatile boolean producerInitialized;
 	abstract public String getEndPointName() throws Exception;
@@ -204,7 +213,6 @@
 	protected String serializeCAS(CAS aCAS,  XmiSerializationSharedData serSharedData) throws
Exception
 	{
 		return uimaSerializer.serializeCasToXmi(aCAS, serSharedData);
-
 	}
 	
 	protected String serializeCAS(CAS aCAS) throws Exception
@@ -264,7 +272,6 @@
 			PendingMessage msg = new PendingMessage(AsynchAEMessage.CollectionProcessComplete);
 			if (cpcTimeout > 0)
 			{
-				
 				requestToCache.startTimer();
 				msg.put(UimaAsynchronousEngine.CpcTimeout, String.valueOf(cpcTimeout));
 			}
@@ -292,7 +299,6 @@
 					((UimaASStatusCallbackListener) listeners.get(i)).collectionProcessComplete(null);
 				}
 			}
-			
 		}
 		catch (Exception e)
 		{
@@ -300,81 +306,87 @@
 		}
 	}
 
-	public synchronized void stop()
+	public void stop()
 	{
-    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_as_client_INFO", new Object[] {});
-    }
-		if (!running)
-		{
-			return;
-		}
-
-		running = false;
-		uimaSerializer.reset();
-		try
-		{
-			//	Unblock threads
-			if( threadMonitorMap.size() > 0 )
-			{
-				Iterator it = threadMonitorMap.keySet().iterator();
-				while( it.hasNext() )
-				{
-					long key = ((Long)it.next()).longValue();
-					ThreadMonitor threadMonitor = 
-						(ThreadMonitor)threadMonitorMap.get(key);
-					if ( threadMonitor == null || threadMonitor.getMonitor() == null)
-					{
-						continue;
-					}
-					synchronized( threadMonitor.getMonitor())
-					{
-						threadMonitor.setWasSignaled();
-						threadMonitor.getMonitor().notifyAll();
-					}
-				}
-			}
-			
-			synchronized(endOfCollectionMonitor)
-			{
-				receivedCpcReply = true;
-				endOfCollectionMonitor.notifyAll();
-			}
-			synchronized(metadataReplyMonitor)
-			{
-				receivedMetaReply = true;
-				metadataReplyMonitor.notifyAll();
-			}
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO", new Object[] {});
-      }
-      for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();)
-			{
-				Map.Entry entry = (Map.Entry) i.next();
-				Object key = entry.getKey();
-				undeploy((String) key);
-			}
-			asynchManager = null;
-			springContainerRegistry.clear();
-			listeners.clear();
-			clientCache.clear();
-		}
-		catch (Exception e)
-		{
-			e.printStackTrace();
-		}
-		finally
-		{
-			synchronized(this)
-			{
-				try
-				{
-					wait(2000); // Let asynch shutdown threads to stop
-				}
-				catch( Exception e) {}
-			}
+	  synchronized( stopMux ) {
+	    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_as_client_INFO", new Object[] {});
+	    }
+	    if (!running)
+	    {
+	      return;
+	    }
+
+	    running = false;
+	    uimaSerializer.reset();
+	    try
+	    {
+	      //  Unblock threads
+	      if( threadMonitorMap.size() > 0 )
+	      {
+	        Iterator it = threadMonitorMap.keySet().iterator();
+	        while( it.hasNext() )
+	        {
+	          long key = ((Long)it.next()).longValue();
+	          ThreadMonitor threadMonitor = 
+	            (ThreadMonitor)threadMonitorMap.get(key);
+	          if ( threadMonitor == null || threadMonitor.getMonitor() == null)
+	          {
+	            continue;
+	          }
+	          synchronized( threadMonitor.getMonitor())
+	          {
+	            threadMonitor.setWasSignaled();
+	            threadMonitor.getMonitor().notifyAll();
+	          }
+	        }
+	      }
+        synchronized (cpcGate)
+        {
+           howManySent = 0;
+           cpcGate.notifyAll();
+        }
+	      synchronized(endOfCollectionMonitor)
+	      {
+	        receivedCpcReply = true;
+	        endOfCollectionMonitor.notifyAll();
+	      }
+	      synchronized(metadataReplyMonitor)
+	      {
+	        receivedMetaReply = true;
+	        metadataReplyMonitor.notifyAll();
+	      }
+	      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+	        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "stop",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_as_client_INFO", new Object[] {});
+	      }
+	      for (Iterator i = springContainerRegistry.entrySet().iterator(); i.hasNext();)
+	      {
+	        Map.Entry entry = (Map.Entry) i.next();
+	        Object key = entry.getKey();
+	        undeploy((String) key);
+	      }
+	      asynchManager = null;
+	      springContainerRegistry.clear();
+	      listeners.clear();
+	      clientCache.clear();
+	    }
+	    catch (Exception e)
+	    {
+	      e.printStackTrace();
+	    }
+	    finally
+	    {
+	      synchronized(this)
+	      {
+	        try
+	        {
+	          wait(2000); // Let asynch shutdown threads to stop
+	        }
+	        catch( Exception e) {}
+	      }
 
-		}
+	    }
+	  }
 	}
 
 	public CAS getCAS() throws Exception
@@ -382,7 +394,10 @@
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_request_for_cas_FINEST", new Object[] {});
     }
-    if (!initialized || !running)
+    if ( !running ) {
+      throw new RuntimeException("Uima AS Client Is Stopping");
+    }
+    if (!initialized )
 		{
 			throw new ResourceInitializationException();
 		}
@@ -426,7 +441,7 @@
 		clientCache.put(uniqueIdentifier, requestToCache);
 		if (metadataTimeout > 0)
 		{
-			requestToCache.startTimer();
+		  serviceDelegate.startGetMetaRequestTimer();
 			msg.put(UimaAsynchronousEngine.GetMetaTimeout, String.valueOf(metadataTimeout));
 		}
 		synchronized( pendingMessageList )
@@ -536,7 +551,6 @@
         }
         return null;
 			}
-
 			PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
 			long t1 = System.nanoTime();
 			if ( serializationStrategy.equals("xmi")) {
@@ -566,19 +580,28 @@
 			requestToCache.setEndpoint(getEndPointName());
 			requestToCache.setProcessTimeout(processTimeout);
 			requestToCache.setThreadId(Thread.currentThread().getId());
-            requestToCache.clearTimeoutException();
-
+      requestToCache.clearTimeoutException();
 
 			clientCache.put(casReferenceId, requestToCache);
 
-			if (processTimeout > 0)
-			{
-				//requestToCache.startTimer();
-				//  Adds CAS Id to the list of CAS pending reply. It starts timer
-			  //  if the list is empty
-			}
-      serviceDelegate.addCasToOutstandingList(casReferenceId);
-
+      //  Check delegate's state before sending it a CAS. The delegate
+      //  may have previously timed out and the client is in a process of pinging
+      //  the delegate to check its availability. While the delegate
+      //  is in this state, delay CASes by placing them on a list of
+      //  CASes pending dispatch. Once the ping reply is received all
+      //  delayed CASes will be dispatched to the delegate.
+      if ( !delayCasIfDelegateInTimedOutState( casReferenceId) ) {
+        //  The delegate state is normal, add the CAS Id to the list
+        //  of CASes sent to the delegate.
+        serviceDelegate.addCasToOutstandingList(casReferenceId);
+      } else {
+        //  CAS was added to the list of CASes pending dispatch. The service
+        //  has previously timed out. A Ping message was dispatched to test
+        //  service availability. When the Ping reply is received ALL CASes
+        //  from the list of CASes pending dispatch will be sent to the 
+        //  delegate.
+        return casReferenceId;
+      }
       synchronized( pendingMessageList )
 			{
 				pendingMessageList.add(msg);
@@ -597,6 +620,24 @@
 		return casReferenceId;
 
 	}
+	
+  /**
+   * Checks the state of a delegate to see if it is in TIMEOUT State.
+   * If it is, push the CAS id onto a list of CASes pending dispatch.
+   * The delegate is in a questionable state and the aggregate sends
+   * a ping message to check delegate's availability. If the delegate
+   * responds to the ping, all CASes in the pending dispatch list will
+   * be immediately dispatched.
+  **/
+  public boolean delayCasIfDelegateInTimedOutState( String aCasReferenceId ) throws AsynchAEException
{
+    if (serviceDelegate != null && serviceDelegate.getState() == Delegate.TIMEOUT_STATE
) {
+      // Add CAS id to the list of delayed CASes.
+      serviceDelegate.addCasToPendingDispatchList(aCasReferenceId);
+      return true;
+    }
+    return false;  // Cas Not Delayed
+  }
+	
 	private ClientRequest produceNewClientRequestObject()
 	{
 		String casReferenceId = idGenerator.nextId();
@@ -657,7 +698,48 @@
 	 */
 	protected void handleMetadataReply(Message message) throws Exception
 	{
-		cancelTimer(uniqueIdentifier);
+		serviceDelegate.cancelDelegateTimer();
+		serviceDelegate.setState(Delegate.OK_STATE);
+
+		//  Check if this is a reply for Ping sent in response to a timeout
+		if ( serviceDelegate.isAwaitingPingReply() ) {
+      serviceDelegate.resetAwaitingPingReply();
+      String casReferenceId = null;
+      //  
+      if ( serviceDelegate.isSynchronousAPI() ) {
+        //  Synchronous API used for sending outgoing messages.
+        //  Notify ALL sending threads. A send thread may have
+        //  added a CAS to the list of pending CASes due to
+        //  a timeout and subsequently entered a wait state
+        //  waiting for this notification. The CAS this thread
+        //  was trying to deliver will be taken off the pending
+        //  dispatch list and send to the service.
+        ThreadMonitor threadMonitor = null;
+        Iterator it = threadMonitorMap.entrySet().iterator();
+        while( it.hasNext() ) {
+          threadMonitor = ((Entry<Long, ThreadMonitor>)it.next()).getValue();
+          synchronized(threadMonitor.getMonitor()) {
+            //  Awake the send thread
+            threadMonitor.getMonitor().notifyAll();
+          }
+        }
+      } else {
+        //  Asynch API used for sending outgoing messages.
+        //  If there are delayed CASes in the delegate's list of CASes
+        //  pending dispatch, send them all to the delegate now.
+        while( serviceDelegate.getState() == Delegate.OK_STATE && 
+                ( casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList())
!= null ) {
+          ClientRequest cachedRequest = (ClientRequest)clientCache.get(casReferenceId);
+          sendCAS(cachedRequest.getCAS(), cachedRequest);
+        }
+      }
+      if ( serviceDelegate.getCasPendingReplyListSize() > 0) {
+        serviceDelegate.restartTimerForOldestCasInOutstandingList();
+      }
+      //  Handled Ping reply
+      return;
+		}
+    //cancelTimer(uniqueIdentifier);
 		int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
 
 		if (AsynchAEMessage.Exception == payload)
@@ -707,6 +789,7 @@
 			switch( aCommand )
 			{
 			case AsynchAEMessage.GetMeta:
+      case AsynchAEMessage.Ping:
 				statCL.initializationComplete(aStatus);
 				break;
 				
@@ -714,7 +797,7 @@
 				statCL.collectionProcessComplete(aStatus);
 				break;
 				
-			case AsynchAEMessage.Process:
+      case AsynchAEMessage.Process:
 				statCL.entityProcessComplete(aCAS, aStatus);
 				break;
 			}
@@ -846,12 +929,6 @@
 			//	is sent to a service.
 			cachedRequest.setTimeWaitingForReply(System.nanoTime() - cachedRequest.getCASDepartureTime());
 			
-			//	Cancel the timer
-//			try
-//			{
-//				cancelTimer(casReferenceId);
-//			}
-//			catch( Exception e) {}
 			//	If the CAS was sent from a synchronous API sendAndReceive(), wake up the thread that
 			//	sent the CAS and process the reply
 			if ( cachedRequest.isSynchronousInvocation() )
@@ -1328,76 +1405,91 @@
 	}
 	/**
 	 * This is a synchronous method which sends a message to a destination and blocks waiting
for
-	 * reply. 
+	 * a reply. 
 	 */
-	public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
-	{
-		if ( !running )
-		{
-			throw new ResourceProcessException( new Exception("Uima EE Client Not In Running State"));
-		}
-		String casReferenceId = null;
-			// keep handle to CAS, we'll deserialize into this same CAS later
-			sendAndReceiveCAS = aCAS;
-			
-			ThreadMonitor threadMonitor = null;
-			
-			if ( threadMonitorMap.containsKey(Thread.currentThread().getId()))
-			{
-				threadMonitor = (ThreadMonitor) threadMonitorMap.get(Thread.currentThread().getId());
-			}
-			else
-			{
-				threadMonitor = new ThreadMonitor( Thread.currentThread().getId() );
-				threadMonitorMap.put(Thread.currentThread().getId(), threadMonitor );
-			}
-			
-			ClientRequest cachedRequest = produceNewClientRequestObject();
-			cachedRequest.setSynchronousInvocation();
-			// send CAS. This call does not block. Instead we will block the sending thread below.
-			casReferenceId = sendCAS(aCAS, cachedRequest);
-			if ( threadMonitor != null && threadMonitor.getMonitor() != null)
-			{
-			//	Block here waiting for reply
-			synchronized (threadMonitor.getMonitor())
-			{
-				//	Block sending thread until a reply is received
-				while (!threadMonitor.wasSignaled && running)
-				{
-					try
-					{
-						threadMonitor.getMonitor().wait();
-					}
-					catch (InterruptedException e)
-					{
-					}
-				}
- }
-			}
-			try
-			{
-				// check if timeout exception
-				if (cachedRequest.isTimeoutException()) {
-					throw new ResourceProcessException(new UimaASProcessCasTimeout());
-				}
-				//	Process reply in the sending thread
-				Message message = cachedRequest.getMessage();
-				deserializeAndCompleteProcessingReply( casReferenceId, message, cachedRequest, pt, false
);
-			}
-			catch( ResourceProcessException rpe )
-			{
-				throw rpe;
-			}
-			catch( Exception e )
-			{
-				throw new ResourceProcessException(e);
-			}
-			finally
-			{
-				threadMonitor.reset();
-			}
-			return casReferenceId;
-	}
+	public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException
{
+    if (!running) {
+      throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State"));
+    }
+    if ( !serviceDelegate.isSynchronousAPI() ) {
+      //  Change the flag to indicate synchronous invocation.
+      //  This info will be needed to handle Ping replies.
+      //  Different code is used for handling PING replies for 
+      //  sync and async API.
+      serviceDelegate.setSynchronousAPI();
+    }
+    String casReferenceId = null;
+    // keep handle to CAS, we'll deserialize into this same CAS later
+    sendAndReceiveCAS = aCAS;
+
+    ThreadMonitor threadMonitor = null;
+
+    if (threadMonitorMap.containsKey(Thread.currentThread().getId())) {
+      threadMonitor = (ThreadMonitor) threadMonitorMap.get(Thread.currentThread().getId());
+    } else {
+      threadMonitor = new ThreadMonitor(Thread.currentThread().getId());
+      threadMonitorMap.put(Thread.currentThread().getId(), threadMonitor);
+    }
+
+    ClientRequest cachedRequest = produceNewClientRequestObject();
+    cachedRequest.setSynchronousInvocation();
+    // send CAS. This call does not block. Instead we will block the sending thread below.
+    casReferenceId = sendCAS(aCAS, cachedRequest);
+    if (threadMonitor != null && threadMonitor.getMonitor() != null) {
+      // Block here waiting for reply
+      synchronized (threadMonitor.getMonitor()) {
+        //  Block sending thread until a reply is received. The thread
+        //  will be signaled either when a reply to the request just
+        //  sent is received OR a Ping reply was received. The latter
+        //  is necessary to allow handling of CASes delayed due to
+        //  a timeout. A previous request timed out and the service
+        //  state was changed to TIMEDOUT. While the service is in this
+        //  state all sending threads add outstanding CASes to the list
+        //  of CASes pending dispatch and each waits until the state
+        //  of the service changes to OK. The state is changed to OK
+        //  when the client receives a reply to a PING request. When
+        //  the Ping reply comes, the client will signal this thread.
+        //  The thread checks the list of CASes pending dispatch trying
+        //  to find an entry that matches ID of the CAS previously 
+        //  delayed. If the CAS is found in the delayed list, it will 
+        //  be removed from the list and send to the service for 
+        //  processing. The 'wasSignaled' flag is only set when the  
+        //  CAS reply is received. Ping reply logic does not change
+        //  this flag.
+        while (!threadMonitor.wasSignaled && running) {
+          try {
+            threadMonitor.getMonitor().wait();
+            //  Send thread was awoken by either process reply or ping reply 
+            //  If there service is in the ok state and the CAS is in the
+            //  list of CASes pending dispatch, remove the CAS from the list
+            //  and send it to the service.
+            if ( running && serviceDelegate.getState() == Delegate.OK_STATE &&

+                 serviceDelegate.removeCasFromPendingDispatchList(casReferenceId)) {
+              sendCAS(aCAS, cachedRequest);
+            }
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    }
+    try {
+      // check if timeout exception
+      if (cachedRequest.isTimeoutException()) {
+        throw new ResourceProcessException(new UimaASProcessCasTimeout());
+      }
+      // Process reply in the send thread
+      Message message = cachedRequest.getMessage();
+      deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
+    } catch (ResourceProcessException rpe) {
+      throw rpe;
+    } catch (Exception e) {
+      throw new ResourceProcessException(e);
+    } finally {
+      //  reset 'wasSignaled' flag
+      threadMonitor.reset();
+    }
+    return casReferenceId;
+  }
 	private void deserializeAndCompleteProcessingReply( String casReferenceId, Message message,
ClientRequest cachedRequest, ProcessTrace pt, boolean doNotify ) throws Exception
 	{
 		int payload = ((Integer) message.getIntProperty(AsynchAEMessage.Payload)).intValue();
@@ -1451,6 +1543,26 @@
 				metadataReplyMonitor.notifyAll();
 			}
 			break;
+    case (PingTimeout):
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "notifyOnTimout",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_meta_timeout_INFO", new Object[] { anEndpoint
});
+      }
+      status.addEventStatus("Ping", "Failed", new UimaASPingTimeout());
+      notifyListeners(null, status, AsynchAEMessage.Ping);
+      //  The main thread could be stuck waiting for a CAS. Grab any CAS in the
+      //  client cache and release it so that we can shutdown.
+      if ( !clientCache.isEmpty()) {
+        ClientRequest anyCasRequest = clientCache.elements().nextElement();
+        if ( anyCasRequest.getCAS() != null ) {
+          anyCasRequest.getCAS().release();
+        }
+      }
+      synchronized (metadataReplyMonitor)
+      {
+        abort = true;
+        metadataReplyMonitor.notifyAll();
+      }
+      break;
 		case (CpCTimeout):
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "notifyOnTimout",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cpc_timeout_INFO", new Object[] { anEndpoint
});
@@ -1898,7 +2010,7 @@
 	{
 		private long threadId;
 		private Object monitor = new Object();
-		private boolean wasSignaled = false;
+		private volatile boolean wasSignaled = false;
 		public ThreadMonitor( long aThreadId )
 		{
 			threadId = aThreadId;
@@ -1924,6 +2036,8 @@
 			return wasSignaled;
 		}
 	}
+	
+
 	/**
 	 * Called when the producer thread is fully initialized
 	 */

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=736712&r1=736711&r2=736712&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
Thu Jan 22 09:02:41 2009
@@ -18,17 +18,30 @@
  */
 package org.apache.uima.adapter.jms.client;
 
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.ErrorContext;
 import org.apache.uima.aae.error.MessageTimeoutException;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.cas.CAS;
+import org.apache.uima.util.Level;
 
 public class ClientServiceDelegate extends Delegate {
+  private static final Class CLASS_NAME = ClientServiceDelegate.class;
 
   private BaseUIMAAsynchronousEngineCommon_impl clientUimaAsEngine;
   private String applicationName = "UimaAsClient";
+  private volatile boolean usesSynchronousAPI;
+  
+  public boolean isSynchronousAPI() {
+    return usesSynchronousAPI;
+  }
+  public void setSynchronousAPI() {
+    this.usesSynchronousAPI = true;;
+  }
   public ClientServiceDelegate(String serviceName, String anApplicationName,  BaseUIMAAsynchronousEngineCommon_impl
engine ) {
     super.delegateKey = serviceName;
     clientUimaAsEngine = engine;
@@ -40,23 +53,69 @@
     return applicationName;
   }
 
-  public void handleError(Exception e, ErrorContext errorContext) {
-    String casReferenceId = (String)errorContext.get(AsynchAEMessage.CasReference);
-    ClientRequest cachedRequest = (ClientRequest)clientUimaAsEngine.clientCache.get(casReferenceId);
+  public synchronized void handleError(Exception e, ErrorContext errorContext) {
+    String casReferenceId = null;
     CAS cas = null;
+    ClientRequest cachedRequest = null;
+
+    int command = ((Integer)errorContext.get(AsynchAEMessage.Command)).intValue();
     try {
-       if (cachedRequest.isRemote()) {
-         cas = cachedRequest.getCAS();
-       }
-       if ( e instanceof MessageTimeoutException) {
-         
-         //  Notifies Listeners and removes ClientRequest instance from the client cache
-         clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(), BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout,
casReferenceId);
-         clientUimaAsEngine.clientSideJmxStats.incrementProcessTimeoutErrorCount();
-       }
+      if ( e instanceof MessageTimeoutException) {
+        switch( command ) {
+          case AsynchAEMessage.Process:
+            System.out.println(">>>>> Client Process Timed Out");
+            clientUimaAsEngine.clientSideJmxStats.incrementProcessTimeoutErrorCount();
+            casReferenceId = (String)errorContext.get(AsynchAEMessage.CasReference);
+            if ( casReferenceId != null ) {
+              cachedRequest =(ClientRequest)clientUimaAsEngine.clientCache.get(casReferenceId);
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) &&
+                      getEndpoint() != null )  {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(),
"handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_process_timeout_INFO", new
Object[] { getEndpoint().getEndpoint() });
+              }        
+              if (cachedRequest != null && cachedRequest.isRemote()) {
+                cas = cachedRequest.getCAS();
+              }
+            }
+            if ( !isAwaitingPingReply()) {
+              setAwaitingPingReply();
+              System.out.println(">>>>> Client Sending Ping");
+              // Send PING Request to check delegate's availability
+              clientUimaAsEngine.sendMetaRequest();
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE",
+                  new Object[] { getKey() });
+              }
+            }
+            break;
+            
+          case AsynchAEMessage.GetMeta:
+               if ( isAwaitingPingReply() ) {
+                 System.out.println(">>>>> Client Ping Timedout");
+                 clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(),
BaseUIMAAsynchronousEngineCommon_impl.PingTimeout, casReferenceId);
+               } else {
+                 //  Notifies Listeners and removes ClientRequest instance from the client
cache
+                 clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(),
BaseUIMAAsynchronousEngineCommon_impl.MetadataTimeout, casReferenceId);
+                 clientUimaAsEngine.clientSideJmxStats.incrementMetaTimeoutErrorCount();
+               }
+               if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_meta_timeout_INFO",
+                   new Object[] { getKey() });
+               }
+               System.out.println("Stopping Uima AS Client API. Service Not Responding To
a Ping.");
+               clientUimaAsEngine.stop();
+            break;
+              
+          case AsynchAEMessage.CollectionProcessComplete:
+            
+            break;
+        }
+      }
     }
     catch (Exception ex) {
-        ex.printStackTrace();
+      ex.printStackTrace();
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleError",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { ex });
+      }        
     }
     //  Dont release the CAS if synchronous API was used
     if (cas != null && !cachedRequest.isSynchronousInvocation())

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=736712&r1=736711&r2=736712&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Thu Jan 22 09:02:41 2009
@@ -157,3 +157,4 @@
 UIMAJMS_stopped_listener_INFO = >>>> Controller: {0} - Listener On Queue: {1}
Stopped 
 UIMAJMS_send_failed_deleted_queue_INFO = >>>> Controller: {0} - Failed to Send
Message To Queue: {1}. The Queue Has Been Deleted. 
 UIMAJMS_msg_size__FINE = Controller: {0} Sending {1} Message of Type: {2} to Endpoint: {3}
Message Size: {4}
+UIMAJMS_client_sending_ping__FINE = Uima AS Client Sent PING Message To Service: {0} 
\ No newline at end of file



Mime
View raw message