uima-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r703282 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main: java/org/apache/uima/adapter/jms/client/ java/org/apache/uima/adapter/jms/message/ resources/
Date Thu, 09 Oct 2008 22:33:26 GMT
Author: schor
Date: Thu Oct  9 15:33:26 2008
New Revision: 703282

URL: http://svn.apache.org/viewvc?rev=703282&view=rev
Log:
[UIMA-1196] apply patch supporting binary serialization

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=703282&r1=703281&r2=703282&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
Thu Oct  9 15:33:26 2008
@@ -206,7 +206,14 @@
 
 			try {
 				//	Request JMS Message from the concrete implementation
-				TextMessage message = createTextMessage();
+			  Message message = null;
+			  if ( engine.getSerializationStrategy().equals("xmi")) {
+			    message = createTextMessage();
+			  } else {
+          message = createBytesMessage();
+			  }
+				 
+			  
 				initializeMessage( pm, message );
 				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST,
 						CLASS_NAME.getName(), "run",
@@ -266,7 +273,7 @@
 		}
 		
 	}
-	private void initializeMessage( PendingMessage aPm, TextMessage anOutgoingMessage)
+	private void initializeMessage( PendingMessage aPm, Message anOutgoingMessage)
 	throws Exception
 	{
 		//	Populate message properties based on outgoing message type
@@ -280,9 +287,16 @@
 		case AsynchAEMessage.Process:
 			String casReferenceId =
 				(String)aPm.get(AsynchAEMessage.CasReference);
-			String serializedCAS = 
-				(String) aPm.get(AsynchAEMessage.CAS);
-			engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
+			if ( engine.getSerializationStrategy().equals("xmi")) {
+	      String serializedCAS = 
+	        (String) aPm.get(AsynchAEMessage.CAS);
+	      engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
+			} else {
+		     byte[] serializedCAS = 
+		        (byte[]) aPm.get(AsynchAEMessage.CAS);
+		      engine.setCASMessage(casReferenceId, serializedCAS, anOutgoingMessage);
+
+			}
 			//	Message Expiration for Process is added in the main run() loop
 			//	right before the message is dispatched to the AS Service
 			break;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=703282&r1=703281&r2=703282&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Thu Oct  9 15:33:26 2008
@@ -30,6 +30,7 @@
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -154,17 +155,21 @@
 
 	protected MessageConsumer consumer = null;
 
+	protected String serializationStrategy = "xmi";
+	
 	protected UimaASClientInfoMBean clientSideJmxStats =
 		new UimaASClientInfo();
 	
 	protected List pendingMessageList = new ArrayList();
 	protected volatile boolean producerInitialized;
 	abstract public String getEndPointName() throws Exception;
-	abstract protected TextMessage createTextMessage() throws Exception;
-	abstract protected void setMetaRequestMessage(TextMessage msg) throws Exception;
-	abstract protected void setCASMessage(String casReferenceId, CAS aCAS, TextMessage msg)
throws Exception;
-	abstract protected void setCASMessage(String casReferenceId, String aSerializedCAS, TextMessage
msg) throws Exception;
-	abstract public void setCPCMessage(TextMessage msg) throws Exception;
+  abstract protected TextMessage createTextMessage() throws Exception;
+  abstract protected BytesMessage createBytesMessage() throws Exception;
+	abstract protected void setMetaRequestMessage(Message msg) throws Exception;
+	abstract protected void setCASMessage(String casReferenceId, CAS aCAS,Message msg) throws
Exception;
+  abstract protected void setCASMessage(String casReferenceId, String aSerializedCAS, Message
msg) throws Exception;
+  abstract protected void setCASMessage(String casReferenceId, byte[] aSerializedCAS, Message
msg) throws Exception;
+	abstract public void setCPCMessage(Message msg) throws Exception;
 	abstract public void initialize(Map anApplicationContext) throws ResourceInitializationException;
 	abstract protected void cleanup() throws Exception;
 	abstract public String deploy(String[] aDeploymentDescriptorList, Map anApplicationContext)
throws Exception;
@@ -175,7 +180,14 @@
 	    listeners.add(aListener);
 	}
 
+	public String getSerializationStrategy()
+	{
+	  return serializationStrategy;
+	}
 	
+  protected void setSerializationStrategy(String aSerializationStrategy) {
+    serializationStrategy = aSerializationStrategy;
+  }
 	/**
 	 * Serializes a given CAS. 
 	 * 
@@ -530,10 +542,28 @@
 
 			PendingMessage msg = new PendingMessage(AsynchAEMessage.Process);
 			long t1 = System.nanoTime();
-			XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
-			String serializedCAS = serializeCAS(aCAS, serSharedData);
-			requestToCache.setSerializationTime(System.nanoTime()-t1);
-			msg.put( AsynchAEMessage.CAS, serializedCAS);
+			if ( serializationStrategy.equals("xmi")) {
+	      XmiSerializationSharedData serSharedData = new XmiSerializationSharedData();
+	      String serializedCAS = serializeCAS(aCAS, serSharedData);
+	      msg.put( AsynchAEMessage.CAS, serializedCAS);
+	      if (remoteService)
+	      {
+	        requestToCache.setCAS(aCAS);
+	        //  Store the serialized CAS in case the timeout occurs and need to send the 
+	        //  the offending CAS to listeners for reporting
+	        requestToCache.setCAS(serializedCAS);
+	        requestToCache.setXmiSerializationSharedData(serSharedData);
+	      }
+			} else {
+        byte[] serializedCAS = UimaSerializer.serializeCasToBinary(aCAS);
+        msg.put( AsynchAEMessage.CAS, serializedCAS);
+        if (remoteService)
+        {
+          requestToCache.setCAS(aCAS);
+        }
+			}
+			  
+      requestToCache.setSerializationTime(System.nanoTime()-t1);
 			msg.put( AsynchAEMessage.CasReference, casReferenceId);
 			requestToCache.setIsRemote(remoteService);
 			requestToCache.setEndpoint(getEndPointName());
@@ -541,14 +571,6 @@
 			requestToCache.setThreadId(Thread.currentThread().getId());
             requestToCache.clearTimeoutException();
 
-			if (remoteService)
-			{
-				requestToCache.setCAS(aCAS);
-				//	Store the serialized CAS in case the timeout occurs and need to send the 
-				//	the offending CAS to listeners for reporting
-				requestToCache.setCAS(serializedCAS);
-				requestToCache.setXmiSerializationSharedData(serSharedData);
-			}
 
 			clientCache.put(casReferenceId, requestToCache);
 
@@ -793,9 +815,11 @@
 		{
 			return;
 		}
-		
-		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
-				new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference),
message.toString()+((TextMessage) message).getText() });
+		if ( message instanceof TextMessage ) {
+	    UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleProcessReply",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_handling_process_reply_FINEST",
+	            new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom), message.getStringProperty(AsynchAEMessage.CasReference),
message.toString()+((TextMessage) message).getText() });
+		}
+		  
 
 		//	Fetch entry from the client cache for a cas id returned from the service
 		//	The client cache maintains an entry for every outstanding CAS sent to the
@@ -885,7 +909,19 @@
 				} 
 			}
 		}
-		CAS cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL );
+		CAS cas =  null;
+		if ( message instanceof TextMessage )
+		{
+	    cas = deserializeCAS(((TextMessage) message).getText(), SHADOW_CAS_POOL );
+		} 
+		else
+		{
+      long bodyLength = ((BytesMessage) message).getBodyLength();
+      byte[] serializedCas = new byte[(int)bodyLength];
+      ((BytesMessage) message).readBytes(serializedCas);
+      cas = deserializeCAS(serializedCas, SHADOW_CAS_POOL);
+
+		}
 		completeProcessingReply(cas, casReferenceId, payload, true, message, inputCasCachedRequest,
null);
 	}
 
@@ -911,6 +947,10 @@
 			clientSideJmxStats.incrementProcessErrorCount();
 		}
 		Exception exception = retrieveExceptionFormMessage(message);
+		
+		
+		exception.printStackTrace();
+		
 		receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent
a hang on CPC request 
 		synchronized(endOfCollectionMonitor)
 		{
@@ -946,7 +986,7 @@
 	private void completeProcessingReply( CAS cas, String casReferenceId, int payload, boolean
doNotify, Message message, ClientRequest cachedRequest, ProcessTrace pt  )
 	throws Exception
 	{
-		if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.CASRefID == payload)
+		if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload ||
AsynchAEMessage.CASRefID == payload)
 		{
 			if ( pt == null )
 			{
@@ -1122,6 +1162,12 @@
 		return aCAS;
 	}
 
+  private CAS deserialize(byte[] binaryData, ClientRequest cachedRequest) throws Exception
+  {
+    CAS cas = cachedRequest.getCAS();
+    UimaSerializer.deserializeCasFromBinary(binaryData, cas);
+    return cas;
+  }
 	
 	protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest) throws
Exception
 	{
@@ -1129,7 +1175,20 @@
 		return deserialize(aSerializedCAS, cas);
 	}
 
-	protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest, boolean
deltaCas) throws Exception
+  protected CAS deserializeCAS(byte[] aSerializedCAS, ClientRequest cachedRequest) throws
Exception
+  {
+    CAS cas = cachedRequest.getCAS();
+    UimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
+    return cas;
+  }
+
+  protected CAS deserializeCAS(byte[] aSerializedCAS, CAS aCas) throws Exception
+  {
+    UimaSerializer.deserializeCasFromBinary(aSerializedCAS, aCas);
+    return aCas;
+  }
+
+  protected CAS deserializeCAS(String aSerializedCAS, ClientRequest cachedRequest, boolean
deltaCas) throws Exception
 	{
 		CAS cas = cachedRequest.getCAS();
 		return deserialize(aSerializedCAS, cas, cachedRequest.getXmiSerializationSharedData(),
deltaCas);
@@ -1139,6 +1198,13 @@
 		CAS cas = asynchManager.getNewCas(aCasPoolName);
 		return deserialize(aSerializedCAS, cas);
 	}
+  protected CAS deserializeCAS(byte[] aSerializedCAS, String aCasPoolName ) throws Exception
+  {
+    CAS cas = asynchManager.getNewCas(aCasPoolName);
+    UimaSerializer.deserializeCasFromBinary(aSerializedCAS, cas);
+    return cas;
+  }
+
 	/**
 	 * Listener method receiving JMS Messages from the response queue.
 	 * 
@@ -1272,7 +1338,15 @@
 			if (message.propertyExists(AsynchAEMessage.SentDeltaCas)) {
 				deltaCas = message.getBooleanProperty(AsynchAEMessage.SentDeltaCas);
 			}
-			CAS cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest, deltaCas);
+			CAS cas = null;
+			if ( message instanceof TextMessage ) {
+	      cas = deserializeCAS(((TextMessage) message).getText(), cachedRequest, deltaCas);
+			} else {
+			  long bodyLength = ((BytesMessage) message).getBodyLength();
+			  byte[] serializedCas = new byte[(int)bodyLength];
+			  ((BytesMessage) message).readBytes(serializedCas);
+		    cas = deserializeCAS(serializedCas, cachedRequest);
+			}
 			cachedRequest.setDeserializationTime(System.nanoTime() - t1);
 			completeProcessingReply( cas, casReferenceId, payload, doNotify,  message, cachedRequest,
pt);
 		}
@@ -1426,6 +1500,11 @@
 		private long processErrorCount;
 		
 		private XmiSerializationSharedData sharedData;
+
+    private byte[] binaryCas = null;
+    
+    private volatile boolean isBinaryCas = false;
+
 		
 		public long getMetaTimeoutErrorCount() {
 			return metaTimeoutErrorCount;
@@ -1553,6 +1632,11 @@
 			serializedCAS = aSerializedCAS;
 			isSerializedCAS = true;
 		}
+		
+		public void setBinaryCAS(byte[] aBinaryCas) {
+		  binaryCas = aBinaryCas;
+		  isBinaryCas = true;
+		}
 
 		public void startTimer()
 		{
@@ -1585,7 +1669,11 @@
 						{
 							if (isRemote)
 							{
-								cas = deserializeCAS(serializedCAS, _clientReqRef);
+							  if ( isBinaryCas ) {
+							    cas = deserialize(binaryCas, _clientReqRef);
+							  } else {
+	                cas = deserializeCAS(serializedCAS, _clientReqRef);
+							  }
 							}
 							else
 							{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java?rev=703282&r1=703281&r2=703282&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/MessageSender.java
Thu Oct  9 15:33:26 2008
@@ -19,6 +19,7 @@
 
 package org.apache.uima.adapter.jms.client;
 
+import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
@@ -37,4 +38,5 @@
 	
 	public MessageProducer getMessageProducer(Destination destination) throws Exception;
 
+  public BytesMessage createBytesMessage() throws Exception;
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java?rev=703282&r1=703281&r2=703282&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/message/JmsMessageContext.java
Thu Oct  9 15:33:26 2008
@@ -58,6 +58,13 @@
 		message = aMessage;
 		try
 		{
+		  if ( aMessage instanceof BytesMessage )
+		  {
+		    endpoint.setSerializer("binary");
+		  } else if ( aMessage instanceof TextMessage ) {
+        endpoint.setSerializer("xmi");
+		  }
+		    
 			String msgFrom = (String)aMessage.getStringProperty(AsynchAEMessage.MessageFrom); 
 			if ( msgFrom != null )
 			{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=703282&r1=703281&r2=703282&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
(original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
Thu Oct  9 15:33:26 2008
@@ -44,7 +44,7 @@
 UIMAJMS_init_jms_consumer_INFO = Initializing JMS Message Consumer. Broker: {0} Queue Name:
{1}
 UIMAJMS_init_as1_CONFIG = Initializing Asynchronous Client. Broker URI:{0} Receive Window:{1}
Cas Pool Size: {2} Timeout: {3}
 UIMAJMS_aborting_as_WARNING = Aborting Asynchronous Client Due to: {0}
-UIMAJMS_as_initialized_CONFIG = Asynchronous Client Has Been Initialized. Ready To Process.
+UIMAJMS_as_initialized__INFO = Asynchronous Client Has Been Initialized. Serialization Strategy:
[{0}] Ready To Process.
 UIMAJMS_as_meta_request_sent_FINEST = Sending Metadata Request to Queue: {0} Broker: {1}
 UIMAJMS_blocking_on_semaphore_FINEST = Blocking On Semaphore: {0}
 UIMAJMS_done_blocking_on_semaphore_FINEST = Finished Blocking On Semaphore: {0}
@@ -127,6 +127,7 @@
 UIMAJMS_connection_opened_to_endpoint__FINE = Controller: {0} New Connection Opened To Endpoint:{1}
Managed By Broker: {2}
 UIMAJMS_reusing_existing_connection__FINE = Controller: {0} Reusing Cached Connection To
Endpoint:{1} Managed By Broker: {2}
 UIMAJMS_sending_serialized_cas__FINEST= Controller: {0} Sending Serialized CAS to Endpoint:
{1} CAS Reference Id: {2} \n\t Serialized CAS: {3}
+UIMAJMS_sending_binary_cas__FINEST= Controller: {0} Sending Binary CAS to Endpoint: {1} CAS
Reference Id: {2} \n\t Serialized CAS: {3}
 UIMAJMS_received_process_reply_FINEST = Uima EE Client Received CAS Process Reply From: {0}

 UIMAJMS_received_meta_reply_FINEST = Uima EE Client Received GetMeta Reply From: {0} 
 UIMAJMS_received_cpc_reply_FINEST = Uima EE Client Received CPC Reply From: {0} 
@@ -151,3 +152,4 @@
 UIMAJMS_activated_fcq__CONFIG = >>>> Cas Multiplier Controller: {0} Activated
Listener To Receive Free CAS Notifications From Clients - Temp Queue Name: {1}
 UIMAJMS_msg_processed__FINE = ++++ Controller: {0} Message Processing Completed [ CAS:{1}
]. Waiting For Next Message ...
 UIMAJMS_new_msg_in__FINEST = >>>> Controller: {0} New Message Received From:
{1} Command: {2} Type: {3} CasId: {4}
+UIMAJMS_invalid_serializer__WARNING = Controller: {0} Unable to Serialize CAS. Invalid Serializer:
{1} For Endpoint: {2}
\ No newline at end of file



Mime
View raw message