Return-Path: Delivered-To: apmail-incubator-uima-commits-archive@locus.apache.org Received: (qmail 94598 invoked from network); 9 Oct 2008 22:29:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Oct 2008 22:29:06 -0000 Received: (qmail 14343 invoked by uid 500); 9 Oct 2008 22:29:05 -0000 Delivered-To: apmail-incubator-uima-commits-archive@incubator.apache.org Received: (qmail 14316 invoked by uid 500); 9 Oct 2008 22:29:05 -0000 Mailing-List: contact uima-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: uima-dev@incubator.apache.org Delivered-To: mailing list uima-commits@incubator.apache.org Received: (qmail 14307 invoked by uid 99); 9 Oct 2008 22:29:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Oct 2008 15:29:05 -0700 X-ASF-Spam-Status: No, hits=-1999.9 required=10.0 tests=ALL_TRUSTED,DNS_FROM_SECURITYSAGE X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Oct 2008 22:27:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9C3BB23888A6; Thu, 9 Oct 2008 15:28:33 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r703279 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms: activemq/ client/ Date: Thu, 09 Oct 2008 22:28:31 -0000 To: uima-commits@incubator.apache.org From: schor@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081009222833.9C3BB23888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: schor Date: Thu Oct 9 15:28:27 2008 New Revision: 703279 URL: http://svn.apache.org/viewvc?rev=703279&view=rev Log: [UIMA-1196] apply patch to support binary serialization Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=703279&r1=703278&r2=703279&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Oct 9 15:28:27 2008 @@ -24,6 +24,7 @@ import java.util.Timer; import java.util.TimerTask; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -357,6 +358,37 @@ } throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object")); } + public BytesMessage produceByteMessage() throws AsynchAEException + { + Assert.notNull(producerSession); + boolean done = false; + int retryCount = 4; + while (retryCount > 0) + { + try + { + retryCount--; + return producerSession.createBytesMessage(); + } + catch ( javax.jms.IllegalStateException e) + { + try + { + open(); + } + catch ( ServiceShutdownException ex) + { + ex.printStackTrace(); + } + + } + catch ( Exception e) + { + throw new AsynchAEException(e); + } + } + throw new AsynchAEException(new InvalidMessageException("Unable to produce BytesMessage Object")); + } public ObjectMessage produceObjectMessage() throws AsynchAEException { Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=703279&r1=703278&r2=703279&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Thu Oct 9 15:28:27 2008 @@ -282,7 +282,8 @@ { int payload = aMessage.getIntProperty(AsynchAEMessage.Payload); if ( payload != AsynchAEMessage.XMIPayload && - payload != AsynchAEMessage.CASRefID && + payload != AsynchAEMessage.BinaryPayload && + payload != AsynchAEMessage.CASRefID && payload != AsynchAEMessage.Exception && payload != AsynchAEMessage.Metadata ) @@ -414,6 +415,8 @@ { case AsynchAEMessage.XMIPayload: return "XMIPayload"; + case AsynchAEMessage.BinaryPayload: + return "BinaryPayload"; case AsynchAEMessage.CASRefID: return "CASRefID"; case AsynchAEMessage.Metadata: Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=703279&r1=703278&r2=703279&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Oct 9 15:28:27 2008 @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -549,17 +550,33 @@ { if (anEndpoint.isRemote()) { - String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled()); - if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) - { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), - "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST", - new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS }); - } - - - // Send process request to remote delegate and start timeout timer - sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0); + if ( anEndpoint.getSerializer().equals("xmi")) { + String serializedCAS = getSerializedCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled()); + if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), + "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST", + new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS }); + } + + + // Send process request to remote delegate and start timeout timer + sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0); + } else { + byte[] serializedCAS = getBinaryCasAndReleaseIt(false, aCasReferenceId,anEndpoint, anEndpoint.isRetryEnabled()); + if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) ) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), + "sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_binary_cas__FINEST", + new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS }); + } + + + // Send process request to remote delegate and start timeout timer + sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0); + + } + } else { @@ -594,6 +611,9 @@ try { boolean cacheSerializedCas = endpointRetryEnabled(endpoints); + // The default serialization strategy for parallel step is xmi. + // Binary serialization doesnt support merge. + endpoints[0].setSerializer("xmi"); // Serialize CAS using serializer defined in the first endpoint. All endpoints in the parallel Flow // must use the same format (either XCAS or XMI) @@ -614,8 +634,9 @@ } - - + // The default serialization strategy for parallel step is xmi. + // Binary serialization doesnt support merge. + endpoints[i].setSerializer("xmi"); sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, endpoints[i], true, 0); } @@ -678,9 +699,15 @@ anEndpoint.setReplyEndpoint(true); if ( anEndpoint.isRemote() ) { - // Serializes CAS and releases it back to CAS Pool - String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled()); - sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false); + if ( anEndpoint.getSerializer().equals("xmi")) { + // Serializes CAS and releases it back to CAS Pool + String serializedCAS = getSerializedCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled()); + sendCasToRemoteEndpoint(false, serializedCAS, entry, anEndpoint, false); + } else { + byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled()); + sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false); + } + } else { @@ -789,9 +816,22 @@ new Object[] { anEndpoint.getEndpoint(), aCasReferenceId }); if ( anEndpoint.isRemote() ) { - - String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false); - sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0); +// String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false); +// sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0); + + if ( anEndpoint.getSerializer().equals("xmi")) { + // Serializes CAS and releases it back to CAS Pool + String serializedCAS = getSerializedCas(true, aCasReferenceId, anEndpoint, false); + sendCasToRemoteEndpoint(false, serializedCAS, null, aCasReferenceId, anEndpoint, false, 0); + } else { + CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId); + byte[] binaryCas = getBinaryCas(true, entry.getCasReferenceId(), anEndpoint, anEndpoint.isRetryEnabled()); + sendCasToRemoteEndpoint(false, binaryCas, entry, anEndpoint, false); + } + + + + } else { @@ -976,7 +1016,43 @@ } } - + private byte[] getBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception + { + CAS cas = null; + try + { + byte[] serializedCAS = null; + // Using Cas reference Id retrieve CAS from the shared Cash + cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId); + ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId); + CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId); + long t1 = getAnalysisEngineController().getCpuTime(); + // Serialize CAS for remote Delegates + String serializer = anEndpoint.getSerializer(); + if ( serializer.equals("binary")) { + serializedCAS = UimaSerializer.serializeCasToBinary(cas); + } else { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "getBinaryCas", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_invalid_serializer__WARNING", + new Object[] { getAnalysisEngineController().getName(),serializer, anEndpoint.getEndpoint()}); + throw new UimaEEServiceException("Invalid Serializer:"+serializer+" For Endpoint:"+anEndpoint.getEndpoint()); + } + long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1; + + getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas); + + entry.incrementTimeToSerializeCAS(timeToSerializeCas); + casStats.incrementCasSerializationTime(timeToSerializeCas); + getAnalysisEngineController().getServicePerformance(). + incrementCasSerializationTime(timeToSerializeCas); + return serializedCAS; + } + catch( Exception e) + { + throw new AsynchAEException(e); + } + + } private String getSerializedCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception { @@ -1022,12 +1098,56 @@ throw new AsynchAEException(e); } } + private byte[] getSerializedBinaryCas( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception + { + CAS cas = null; + try + { + byte[] serializedCAS = null; + // Using Cas reference Id retrieve CAS from the shared Cash + cas = getAnalysisEngineController().getInProcessCache().getCasByReference(aCasReferenceId); + ServicePerformance casStats = getAnalysisEngineController().getCasStatistics(aCasReferenceId); + CacheEntry entry = getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId); + long t1 = getAnalysisEngineController().getCpuTime(); + serializedCAS = UimaSerializer.serializeCasToBinary(cas); + long timeToSerializeCas = getAnalysisEngineController().getCpuTime()-t1; + + getAnalysisEngineController().incrementSerializationTime(timeToSerializeCas); + + entry.incrementTimeToSerializeCAS(timeToSerializeCas); + casStats.incrementCasSerializationTime(timeToSerializeCas); + getAnalysisEngineController().getServicePerformance(). + incrementCasSerializationTime(timeToSerializeCas); + return serializedCAS; + } + catch( Exception e) + { + throw new AsynchAEException(e); + } + } + private byte[] getBinaryCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception + { + try + { + return getBinaryCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas); + } + catch( Exception e) + { + throw new AsynchAEException(e); + } + finally + { + if ( getAnalysisEngineController() instanceof PrimitiveAnalysisEngineController && anEndpoint.isRemote() ) + { + getAnalysisEngineController().dropCAS(aCasReferenceId, true ); + } + } + } private String getSerializedCasAndReleaseIt( boolean isReply, String aCasReferenceId, Endpoint anEndpoint, boolean cacheSerializedCas ) throws Exception { - CAS cas = null; try { return getSerializedCas(isReply, aCasReferenceId, anEndpoint, cacheSerializedCas); @@ -1056,7 +1176,7 @@ } return false; } - private void populateStats( TextMessage aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception + private void populateStats( Message aTextMessage, Endpoint anEndpoint, String aCasReferenceId, int anAdminCommand, boolean isRequest) throws Exception { if ( anEndpoint.isFinal() ) { @@ -1433,6 +1553,128 @@ } } + + private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, String anInputCasReferenceId, String aCasReferenceId, Endpoint anEndpoint, boolean startTimer, long sequence) + throws AsynchAEException, ServiceShutdownException + { + + try + { + if ( aborting ) + { + return; + } + + // Get the connection object for a given endpoint + JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint); + // Create empty JMS Text Message + BytesMessage tm = endpointConnection.produceByteMessage(); + tm.writeBytes(aSerializedCAS); + tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload); + // Add Cas Reference Id to the outgoing JMS Header + tm.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId); + // Add common properties to the JMS Header + if ( isRequest == true ) + { + populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); + } + else + { + populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process); + CacheEntry entry = this.getCacheEntry(aCasReferenceId); + tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas()); + } + // The following is true when the analytic is a CAS Multiplier + if ( sequence > 0 && !isRequest ) + { + // Override MessageType set in the populateHeaderWithContext above. + // Make the reply message look like a request. This message will contain a new CAS + // produced by the CAS Multiplier. The client will treat this CAS + // differently from the input CAS. + tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request); + tm.setStringProperty(AsynchAEMessage.InputCasReference, anInputCasReferenceId); + // Add a sequence number assigned to this CAS by the controller + tm.setLongProperty(AsynchAEMessage.CasSequence, sequence); + isRequest = true; + if ( freeCASTempQueue != null ) + { + // Attach a temp queue to the outgoing message. This a queue where + // Free CAS notifications need to be sent from the client + tm.setJMSReplyTo(freeCASTempQueue); + } + if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) + { + CacheEntry cacheEntry = getCacheEntry(aCasReferenceId); + if ( cacheEntry != null ) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE", + new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), aCasReferenceId, anInputCasReferenceId, cacheEntry.getInputCasReferenceId() }); + } + } + } + + // Add stats + populateStats(tm, anEndpoint, aCasReferenceId, AsynchAEMessage.Process, isRequest); + if ( startTimer) + { + // Start a timer for this request. The amount of time to wait + // for response is provided in configuration for the endpoint + anEndpoint.startProcessRequestTimer(aCasReferenceId); + } + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE", + new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() }); + + // By default start a timer associated with a connection to the endpoint. Once a connection is established with an + // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval + // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when + // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied + // by anEndpoint.getDestination != null, we dont start the timer. + boolean startConnectionTimer = true; + + if ( anEndpoint.getDestination() != null || !isRequest ) + { + startConnectionTimer = false; + } + // ---------------------------------------------------- + // Send Request Messsage to the Endpoint + // ---------------------------------------------------- + endpointConnection.send(tm, startConnectionTimer); +// if ( getAnalysisEngineController().isTopLevelComponent() ) +// { +// getAnalysisEngineController().getInProcessCache().dumpContents(getAnalysisEngineController().getComponentName()); +// } + if ( !isRequest ) + { + addIdleTime(tm); + } + } + catch( JMSException e) + { + // Unable to establish connection to the endpoint. Logit and continue + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO", + new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()}); + + } + + catch( ServiceShutdownException e) + { + throw e; + } + catch( AsynchAEException e) + { + throw e; + } + catch( Exception e) + { + throw new AsynchAEException(e); + } + + } + + private void sendCasToRemoteEndpoint( boolean isRequest, String aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer ) throws AsynchAEException, ServiceShutdownException @@ -1561,6 +1803,125 @@ } + + private void sendCasToRemoteEndpoint( boolean isRequest, byte[] aSerializedCAS, CacheEntry entry, Endpoint anEndpoint, boolean startTimer ) + throws AsynchAEException, ServiceShutdownException + { + + try + { + if ( aborting ) + { + return; + } + // Get the connection object for a given endpoint + JmsEndpointConnection_impl endpointConnection = getEndpointConnection(anEndpoint); + // Create empty JMS Text Message + BytesMessage tm = endpointConnection.produceByteMessage(); + tm.writeBytes(aSerializedCAS); + tm.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload); + // Add Cas Reference Id to the outgoing JMS Header + tm.setStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId()); + // Add common properties to the JMS Header + if ( isRequest == true ) + { + populateHeaderWithRequestContext(tm, anEndpoint, AsynchAEMessage.Process); + } + else + { + populateHeaderWithResponseContext(tm, anEndpoint, AsynchAEMessage.Process); +// tm.setBooleanProperty(AsynchAEMessage.SentDeltaCas, entry.sentDeltaCas()); + } + // The following is true when the analytic is a CAS Multiplier + if ( entry.isSubordinate() && !isRequest ) + { + // Override MessageType set in the populateHeaderWithContext above. + // Make the reply message look like a request. This message will contain a new CAS + // produced by the CAS Multiplier. The client will treat this CAS + // differently from the input CAS. + tm.setIntProperty( AsynchAEMessage.MessageType, AsynchAEMessage.Request); + + isRequest = true; + // Save the id of the parent CAS + tm.setStringProperty(AsynchAEMessage.InputCasReference, getTopParentCasReferenceId(entry.getCasReferenceId())); + // Add a sequence number assigned to this CAS by the controller + tm.setLongProperty(AsynchAEMessage.CasSequence, entry.getCasSequence()); + // If this is a Cas Multiplier, add a reference to a special queue where + // the client sends Free Cas Notifications + if ( freeCASTempQueue != null ) + { + // Attach a temp queue to the outgoing message. This is a queue where + // Free CAS notifications need to be sent from the client + tm.setJMSReplyTo(freeCASTempQueue); + } + if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE) ) + { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_cas_to_collocated_service_detail__FINE", + new Object[] {getAnalysisEngineController().getComponentName(),"Remote", anEndpoint.getEndpoint(), entry.getCasReferenceId(), entry.getInputCasReferenceId(), entry.getInputCasReferenceId() }); + } + } + + // Add stats + populateStats(tm, anEndpoint, entry.getCasReferenceId(), AsynchAEMessage.Process, isRequest); + if ( startTimer) + { + // Start a timer for this request. The amount of time to wait + // for response is provided in configuration for the endpoint + anEndpoint.startProcessRequestTimer(entry.getCasReferenceId()); + } + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "sendCasToRemoteEndpoint", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_new_msg_to_remote_FINE", + new Object[] {getAnalysisEngineController().getName(),endpointConnection.getServerUri(), endpointConnection.getEndpoint() }); + + // By default start a timer associated with a connection to the endpoint. Once a connection is established with an + // endpoint it is cached and reused for subsequent messaging. If the connection is not used within a given interval + // the timer silently expires and closes the connection. This mechanism is similar to what Web Server does when + // managing sessions. In case when we want the remote delegate to respond to a temporary queue, which is implied + // by anEndpoint.getDestination != null, we dont start the timer. + boolean startConnectionTimer = true; + + if ( anEndpoint.getDestination() != null || !isRequest ) + { + startConnectionTimer = false; + } + + // ---------------------------------------------------- + // Send Request Messsage to the Endpoint + // ---------------------------------------------------- + endpointConnection.send(tm, startConnectionTimer); + + if ( !isRequest ) + { + addIdleTime(tm); + } + } + catch( JMSException e) + { + // Unable to establish connection to the endpoint. Logit and continue + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "sendCasToRemoteDelegate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_unable_to_connect__INFO", + new Object[] { getAnalysisEngineController().getName(), anEndpoint.getEndpoint()}); + + } + + catch( ServiceShutdownException e) + { + throw e; + } + catch( AsynchAEException e) + { + throw e; + } + catch( Exception e) + { + throw new AsynchAEException(e); + } + + } + + + private String getTopParentCasReferenceId( String casReferenceId ) throws Exception { if ( !getAnalysisEngineController().getInProcessCache().entryExists(casReferenceId) ) Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=703279&r1=703278&r2=703279&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Oct 9 15:28:27 2008 @@ -20,6 +20,7 @@ import java.util.List; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -103,6 +104,14 @@ } return session.createTextMessage(); } + public BytesMessage createBytesMessage() throws Exception + { + if ( session == null ) + { + throw new JMSException("Unable To Create JMS BytesMessage. Reason: JMS Session Not Initialized"); + } + return session.createBytesMessage(); + } /** * Cleanup any jms resources used by the worker thread */ Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=703279&r1=703278&r2=703279&view=diff ============================================================================== --- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu Oct 9 15:28:27 2008 @@ -23,8 +23,10 @@ import java.util.Map; import java.util.Properties; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -34,6 +36,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.uima.UIMAFramework; @@ -95,6 +98,10 @@ { return new ActiveMQTextMessage(); } + protected BytesMessage createBytesMessage() throws ResourceInitializationException + { + return new ActiveMQBytesMessage(); + } /** * Called at the end of collectionProcessingComplete - WAS closes receiving @@ -118,7 +125,7 @@ throw new ResourceProcessException(e); } } - protected void setMetaRequestMessage(TextMessage msg) throws Exception + protected void setMetaRequestMessage(Message msg) throws Exception { msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName()); @@ -126,16 +133,18 @@ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.GetMeta); msg.setJMSReplyTo(consumerDestination); - ((ActiveMQTextMessage) msg).setText(""); + if ( msg instanceof TextMessage ) { + ((ActiveMQTextMessage) msg).setText(""); + } } /** * Initialize JMS Message with properties relevant to Process CAS request. */ - protected void setCASMessage(String aCasReferenceId, CAS aCAS, TextMessage msg) throws ResourceProcessException + protected void setCASMessage(String aCasReferenceId, CAS aCAS, Message msg) throws ResourceProcessException { try{ - setCommonProperties(aCasReferenceId, msg); - msg.setText(serializeCAS(aCAS)); + setCommonProperties(aCasReferenceId, msg, "xmi"); + ((TextMessage)msg).setText(serializeCAS(aCAS)); } catch (Exception e) { @@ -143,18 +152,30 @@ } } - protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, TextMessage msg) throws ResourceProcessException + protected void setCASMessage(String aCasReferenceId, String aSerializedCAS, Message msg) throws ResourceProcessException { try{ - setCommonProperties(aCasReferenceId, msg); - msg.setText(aSerializedCAS); + setCommonProperties(aCasReferenceId, msg, "xmi"); + ((TextMessage)msg).setText(aSerializedCAS); } catch (Exception e) { throw new ResourceProcessException(e); } } - protected void setCommonProperties( String aCasReferenceId, TextMessage msg) throws ResourceProcessException + protected void setCASMessage(String aCasReferenceId, byte[] aSerializedCAS, Message msg) throws ResourceProcessException + { + try{ + setCommonProperties(aCasReferenceId, msg, "binary"); + ((BytesMessage)msg).writeBytes(aSerializedCAS); + } + catch (Exception e) + { + throw new ResourceProcessException(e); + } + } + + protected void setCommonProperties( String aCasReferenceId, Message msg, String aSerializationStrategy) throws ResourceProcessException { try{ msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName()); @@ -163,7 +184,13 @@ msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process); msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId); - msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload); + + if ( aSerializationStrategy.equals("binary")) { + msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.BinaryPayload); + } else if ( aSerializationStrategy.equals("xmi")) { + msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.XMIPayload); + } + msg.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true); msg.setJMSReplyTo(consumerDestination); @@ -230,7 +257,7 @@ } } - public void setCPCMessage(TextMessage msg) throws Exception + public void setCPCMessage(Message msg) throws Exception { msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName()); msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); @@ -239,7 +266,9 @@ msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); msg.setBooleanProperty(AsynchAEMessage.RemoveEndpoint, true); msg.setJMSReplyTo(consumerDestination); - msg.setText(""); + if ( msg instanceof TextMessage ) { + ((TextMessage)msg).setText(""); + } } protected Connection getConnection( String aBrokerURI ) throws Exception { @@ -422,6 +451,10 @@ { applicationName = (String) anApplicationContext.get(UimaAsynchronousEngine.ApplicationName); } + if (anApplicationContext.containsKey(UimaAsynchronousEngine.SerializationStrategy)) + { + super.serializationStrategy = (String) anApplicationContext.get(UimaAsynchronousEngine.SerializationStrategy); + } UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_uimaee_client__CONFIG", new Object[] { brokerURI, 0, casPoolSize, processTimeout, metadataTimeout, cpcTimeout }); @@ -479,7 +512,9 @@ { throw new ResourceInitializationException(e); } - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized_CONFIG", new Object[] { "" }); + if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_as_initialized__INFO", new Object[] { super.serializationStrategy }); + } } /**