Author: ningjiang
Date: Sun Apr 20 05:48:45 2008
New Revision: 649917
URL: http://svn.apache.org/viewvc?rev=649917&view=rev
Log:
Fixed the CS error in the trunk
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/dataformat/StringDataFormat.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StringDataFormatTest.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Sun Apr 20 05:48:45 2008
@@ -872,21 +872,22 @@
proceed = ((InterceptType) this).getProceed();
}
if (proceed == null) {
- for (ProcessorType node = parent; node != null; node = node.getParent()) {
+ for (ProcessorType node = parent; node != null; node = node.getParent()) {
if (node instanceof InterceptType) {
- InterceptType intercept = (InterceptType) node;
+ InterceptType intercept = (InterceptType)node;
proceed = intercept.getProceed();
break;
}
}
}
-
+
if (this instanceof InterceptType) {
- proceed = ((InterceptType) this).getProceed();
+ proceed = ((InterceptType)this).getProceed();
}
-
+
if (proceed == null) {
- throw new IllegalArgumentException("Cannot use proceed() without being within
an intercept() block");
+ throw new IllegalArgumentException(
+ "Cannot use proceed() without being within
an intercept() block");
}
addOutput(proceed);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/dataformat/StringDataFormat.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/dataformat/StringDataFormat.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/dataformat/StringDataFormat.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/dataformat/StringDataFormat.java
Sun Apr 20 05:48:45 2008
@@ -16,10 +16,10 @@
*/
package org.apache.camel.model.dataformat;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
import org.apache.camel.impl.RouteContext;
import org.apache.camel.spi.DataFormat;
@@ -48,5 +48,5 @@
public void setCharset(String charset) {
this.charset = charset;
}
-
-}
\ No newline at end of file
+
+}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StringDataFormatTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StringDataFormatTest.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StringDataFormatTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StringDataFormatTest.java
Sun Apr 20 05:48:45 2008
@@ -16,16 +16,16 @@
*/
package org.apache.camel.impl;
-import java.io.InputStream;
import java.io.ByteArrayInputStream;
+import java.io.InputStream;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* Unit test of the string data format.
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Sun Apr 20 05:48:45 2008
@@ -163,7 +163,7 @@
if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
- reply.setJMSCorrelationID(messageID);
+ reply.setJMSCorrelationID(messageID);
} else {
String correlationID = message.getJMSCorrelationID();
if (correlationID != null) {
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
Sun Apr 20 05:48:45 2008
@@ -64,7 +64,7 @@
}
public JmsBinding(JmsEndpoint endpoint) {
- this.endpoint = endpoint;
+ this.endpoint = endpoint;
}
/**
* Extracts the body from the JMS message
@@ -125,7 +125,7 @@
public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage,
Session session)
throws JMSException {
Message answer = null;
- boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage()
: false;
+ boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage()
: false;
if (!alwaysCopy && camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage)camelMessage;
answer = jmsMessage.getJmsMessage();
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
Sun Apr 20 05:48:45 2008
@@ -242,9 +242,9 @@
}
public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
- getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage);
+ getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage);
}
-
+
public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID);
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Sun Apr 20 05:48:45 2008
@@ -58,11 +58,11 @@
* @version $Revision$
*/
public class JmsConfiguration implements Cloneable {
- private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
protected static final String TRANSACTED = "TRANSACTED";
protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
+ private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
private JmsOperations jmsOperations;
private DestinationResolver destinationResolver;
private ConnectionFactory connectionFactory;
@@ -110,8 +110,8 @@
private boolean disableReplyTo;
private boolean eagerLoadingOfProperties;
// Always make a JMS message copy when it's passed to Producer
- private boolean alwaysCopyMessage = false;
- private boolean useMessageIDAsCorrelationID = false;
+ private boolean alwaysCopyMessage;
+ private boolean useMessageIDAsCorrelationID;
public JmsConfiguration() {
}
@@ -144,21 +144,21 @@
}
return answer;
}
-
+
public static interface MessageSentCallback {
- public void sent(Message message);
+ void sent(Message message);
}
-
+
public static class CamelJmsTemplate extends JmsTemplate {
private JmsConfiguration config;
-
+
public CamelJmsTemplate(JmsConfiguration config, ConnectionFactory connectionFactory)
{
super(connectionFactory);
this.config = config;
}
-
- public void send(final String destinationName,
- final MessageCreator messageCreator,
+
+ public void send(final String destinationName,
+ final MessageCreator messageCreator,
final MessageSentCallback callback) throws JmsException {
execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
@@ -177,8 +177,7 @@
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
- }
- finally {
+ } finally {
JmsUtils.closeMessageProducer(producer);
}
if (message != null && callback != null) {
@@ -206,53 +205,53 @@
}
}
producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(),
ttl);
- }
- else {
+ } else {
super.doSend(producer, message);
}
}
}
-
+
public static class CamelJmsTeemplate102 extends JmsTemplate102 {
private JmsConfiguration config;
-
+
public CamelJmsTeemplate102(JmsConfiguration config, ConnectionFactory connectionFactory,
boolean pubSubDomain) {
super(connectionFactory, pubSubDomain);
this.config = config;
}
-
- public void send(final String destinationName,
- final MessageCreator messageCreator,
+
+ public void send(final String destinationName,
+ final MessageCreator messageCreator,
final MessageSentCallback callback) throws JmsException {
- execute(new SessionCallback() {
- public Object doInJms(Session session) throws JMSException {
- Destination destination = resolveDestinationName(session, destinationName);
- Assert.notNull(messageCreator, "MessageCreator must not be null");
- MessageProducer producer = createProducer(session, destination);
- Message message = null;
- try {
- message = messageCreator.createMessage(session);
- if (logger.isDebugEnabled()) {
- logger.debug("Sending created message: " + message);
- }
- doSend(producer, message);
- // Check commit - avoid commit call within a JTA transaction.
- if (session.getTransacted() && isSessionLocallyTransacted(session))
{
- // Transacted session created by this template -> commit.
- JmsUtils.commitIfNecessary(session);
- }
- }
- finally {
- JmsUtils.closeMessageProducer(producer);
- }
- if (message != null && callback != null) {
- callback.sent(message);
- }
- return null;
- }
- }, false);
+ execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ Destination destination = resolveDestinationName(session, destinationName);
+ Assert.notNull(messageCreator, "MessageCreator must not be null");
+ MessageProducer producer = createProducer(session, destination);
+ Message message = null;
+ try {
+ message = messageCreator.createMessage(session);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sending created message: " + message);
+ }
+ doSend(producer, message);
+ // Check commit - avoid commit call within a JTA
+ // transaction.
+ if (session.getTransacted() && isSessionLocallyTransacted(session))
{
+ // Transacted session created by this template ->
+ // commit.
+ JmsUtils.commitIfNecessary(session);
+ }
+ } finally {
+ JmsUtils.closeMessageProducer(producer);
+ }
+ if (message != null && callback != null) {
+ callback.sent(message);
+ }
+ return null;
+ }
+ }, false);
}
-
+
/**
* Override so we can support preserving the Qos settings that have
* been set on the message.
@@ -270,19 +269,18 @@
}
}
if (isPubSubDomain()) {
- ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(),
+ ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(),
message.getJMSPriority(), ttl);
} else {
- ((QueueSender) producer).send(message, message.getJMSDeliveryMode(),
+ ((QueueSender) producer).send(message, message.getJMSDeliveryMode(),
message.getJMSPriority(), ttl);
}
- }
- else {
+ } else {
super.doSend(producer, message);
}
}
}
-
+
public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
if (jmsOperations != null) {
@@ -291,9 +289,9 @@
ConnectionFactory factory = getTemplateConnectionFactory();
- JmsTemplate template = useVersion102 ?
- new CamelJmsTeemplate102(this, factory, pubSubDomain)
- : new CamelJmsTemplate(this, factory);
+ JmsTemplate template = useVersion102
+ ? new CamelJmsTeemplate102(this, factory, pubSubDomain)
+ : new CamelJmsTemplate(this, factory);
template.setPubSubDomain(pubSubDomain);
if (destinationResolver != null) {
@@ -641,7 +639,7 @@
public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
this.messageTimestampEnabled = messageTimestampEnabled;
}
-
+
public int getPriority() {
return priority;
}
@@ -870,10 +868,10 @@
if (isEagerLoadingOfProperties()) {
listener.setEagerLoadingOfProperties(true);
}
- // REVISIT: We really ought to change the model and let JmsProducer
+ // REVISIT: We really ought to change the model and let JmsProducer
// and JmsConsumer have their own JmsConfiguration instance
- // This way producer's and consumer's QoS can differ and be
- // independently configured
+ // This way producer's and consumer's QoS can differ and be
+ // independently configured
JmsOperations operations = listener.getTemplate();
if (operations instanceof JmsTemplate) {
JmsTemplate template = (JmsTemplate)operations;
@@ -971,19 +969,19 @@
}
- public boolean isAlwaysCopyMessage() {
- return alwaysCopyMessage;
- }
-
- public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
- this.alwaysCopyMessage = alwaysCopyMessage;
- }
-
- public boolean isUseMessageIDAsCorrelationID() {
- return useMessageIDAsCorrelationID;
- }
-
- public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
- this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
- }
+ public boolean isAlwaysCopyMessage() {
+ return alwaysCopyMessage;
+ }
+
+ public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
+ this.alwaysCopyMessage = alwaysCopyMessage;
+ }
+
+ public boolean isUseMessageIDAsCorrelationID() {
+ return useMessageIDAsCorrelationID;
+ }
+
+ public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
+ this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
+ }
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
Sun Apr 20 05:48:45 2008
@@ -29,20 +29,16 @@
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
-import org.apache.camel.component.jms.JmsConfiguration.MessageSentCallback;
import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
-import org.apache.camel.component.jms.requestor.FailedToProcessResponse;
-import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
+import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.Out;
import org.apache.camel.util.UuidGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
-import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.core.SessionCallback;
/**
* @version $Revision$
@@ -63,7 +59,7 @@
public long getRequestTimeout() {
return endpoint.getRequestTimeout();
}
-
+
protected void doStart() throws Exception {
super.doStart();
deferredRequestReplyMap = endpoint.getRequestor().getDeferredRequestReplyMap(this);
@@ -93,36 +89,34 @@
final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
String correlationId = in.getHeader("JMSCorrelationID", String.class);
-
+
if (correlationId == null && !msgIdAsCorrId) {
in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
}
-
+
final Out<FutureTask> futureHolder = new Out<FutureTask>();
- final DeferredMessageSentCallback callback = (msgIdAsCorrId) ?
- deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
+ final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback()
: null;
final CamelJmsTemplate template = (CamelJmsTemplate)getInOutTemplate();
template.send(endpoint.getDestination(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message message = endpoint.getBinding().makeJmsMessage(exchange, in,
session);
message.setJMSReplyTo(replyTo);
-
+
FutureTask future = null;
- future = (!msgIdAsCorrId) ?
- requestor.getReceiveFuture(message.getJMSCorrelationID(),
- endpoint.getRequestTimeout())
- : requestor.getReceiveFuture(callback);
+ future = (!msgIdAsCorrId)
+ ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint
+ .getRequestTimeout()) : requestor.getReceiveFuture(callback);
futureHolder.set(future);
-
+
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " sending JMS message: " + message);
}
return message;
}
}, callback);
-
+
// lets wait and return the response
long requestTimeout = endpoint.getRequestTimeout();
try {
@@ -147,9 +141,8 @@
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
exchange.getOut(false).setHeader("JMSCorrelationID", correlationId);
- }
- }
- else {
+ }
+ } else {
// lets set a timed out exception
exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
Sun Apr 20 05:48:45 2008
@@ -21,8 +21,8 @@
import javax.jms.JMSException;
import javax.jms.Message;
-import org.apache.camel.component.jms.JmsProducer;
import org.apache.camel.component.jms.JmsConfiguration.MessageSentCallback;
+import org.apache.camel.component.jms.JmsProducer;
import org.apache.camel.util.TimeoutMap;
import org.apache.camel.util.UuidGenerator;
import org.apache.commons.logging.Log;
@@ -39,29 +39,29 @@
private DeferredRequestReplyMap map;
private String transitionalID;
private Object monitor;
-
+
public DeferredMessageSentCallback(DeferredRequestReplyMap map, UuidGenerator uuidGenerator,
Object monitor) {
transitionalID = uuidGenerator.generateId();
this.map = map;
this.monitor = monitor;
}
-
+
public DeferredRequestReplyMap getDeferredRequestReplyMap() {
return map;
}
-
+
public String getID() {
return transitionalID;
}
-
+
public void sent(Message message) {
map.processDeferredReplies(monitor, getID(), message);
}
}
-
- public DeferredRequestReplyMap(Requestor requestor,
- JmsProducer producer,
- TimeoutMap deferredRequestMap,
+
+ public DeferredRequestReplyMap(Requestor requestor,
+ JmsProducer producer,
+ TimeoutMap deferredRequestMap,
TimeoutMap deferredReplyMap) {
this.requestor = requestor;
this.producer = producer;
@@ -76,18 +76,18 @@
public DeferredMessageSentCallback createDeferredMessageSentCallback() {
return new DeferredMessageSentCallback(this, getUuidGenerator(), requestor);
}
-
+
public void put(DeferredMessageSentCallback callback, FutureTask futureTask) {
deferredRequestMap.put(callback.getID(), futureTask, getRequestTimeout());
}
public void processDeferredRequests(String correlationID, Message inMessage) {
- processDeferredRequests(requestor, deferredRequestMap, deferredReplyMap,
+ processDeferredRequests(requestor, deferredRequestMap, deferredReplyMap,
correlationID, requestor.getMaxRequestTimeout(), inMessage);
}
- public static void processDeferredRequests(Object monitor,
- TimeoutMap requestMap,
+ public static void processDeferredRequests(Object monitor,
+ TimeoutMap requestMap,
TimeoutMap replyMap,
String correlationID,
long timeout,
@@ -114,7 +114,7 @@
}
}
}
-
+
public void processDeferredReplies(Object monitor, String transitionalID, Message outMessage)
{
synchronized (monitor) {
try {
@@ -125,7 +125,7 @@
deferredRequestMap.remove(transitionalID);
String correlationID = outMessage.getJMSMessageID();
Object in = deferredReplyMap.get(correlationID);
-
+
if (in != null && in instanceof Message) {
Message inMessage = (Message)in;
if (handler instanceof ReplyHandler) {
@@ -147,7 +147,7 @@
}
}
}
-
+
protected UuidGenerator getUuidGenerator() {
return producer.getUuidGenerator();
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
Sun Apr 20 05:48:45 2008
@@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -31,12 +30,12 @@
import javax.jms.TemporaryQueue;
import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.DefaultTimeoutMap;
import org.apache.camel.util.TimeoutMap;
import org.apache.camel.util.UuidGenerator;
-import org.apache.camel.component.jms.JmsProducer;
-import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
@@ -50,6 +49,7 @@
*/
public class Requestor extends ServiceSupport implements MessageListener {
private static final transient Log LOG = LogFactory.getLog(Requestor.class);
+ private static UuidGenerator uuidGenerator;
private final JmsConfiguration configuration;
private ScheduledExecutorService executorService;
private AbstractMessageListenerContainer listenerContainer;
@@ -59,8 +59,8 @@
private TimeoutMap deferredReplyMap;
private Destination replyTo;
private long maxRequestTimeout = -1;
- private static UuidGenerator uuidGenerator;
-
+
+
public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService)
{
this.configuration = configuration;
this.executorService = executorService;
@@ -69,11 +69,11 @@
deferredRequestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
deferredReplyMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
}
-
+
public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer)
{
DeferredRequestReplyMap map = producerDeferredRequestReplyMap.get(producer);
if (map == null) {
- map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap);
+ map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap);
producerDeferredRequestReplyMap.put(producer, map);
}
if (maxRequestTimeout == -1) {
@@ -83,7 +83,7 @@
}
return map;
}
-
+
public synchronized void removeDeferredRequestReplyMap(JmsProducer producer) {
producerDeferredRequestReplyMap.remove(producer);
if (maxRequestTimeout == producer.getRequestTimeout()) {
@@ -96,19 +96,19 @@
maxRequestTimeout = max;
}
}
-
+
public synchronized long getMaxRequestTimeout() {
return maxRequestTimeout;
}
-
+
public TimeoutMap getRequestMap() {
return requestMap;
}
-
+
public TimeoutMap getDeferredRequestMap() {
return deferredRequestMap;
}
-
+
public TimeoutMap getDeferredReplyMap() {
return deferredReplyMap;
}
@@ -148,14 +148,14 @@
}
} else {
DeferredRequestReplyMap.processDeferredRequests(
- this, deferredRequestMap, deferredReplyMap,
+ this, deferredRequestMap, deferredReplyMap,
correlationID, getMaxRequestTimeout(), message);
}
} catch (JMSException e) {
throw new FailedToProcessResponse(message, e);
}
}
-
+
public AbstractMessageListenerContainer getListenerContainer() {
if (listenerContainer == null) {
@@ -225,7 +225,7 @@
}
return answer;
}
-
+
public static synchronized UuidGenerator getUuidGenerator() {
if (uuidGenerator == null) {
uuidGenerator = new UuidGenerator();
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=649917&r1=649916&r2=649917&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
Sun Apr 20 05:48:45 2008
@@ -51,9 +51,9 @@
protected static AtomicBoolean inited = new AtomicBoolean(false);
protected static Map<String, ContextBuilder> contextBuilders = new HashMap<String,
ContextBuilder>();
protected static Map<String, RouteBuilder> routeBuilders = new HashMap<String,
RouteBuilder>();
-
+
private interface ContextBuilder {
- public CamelContext buildContext(CamelContext context) throws Exception;
+ CamelContext buildContext(CamelContext context) throws Exception;
}
public static class SingleNodeDeadEndRouteBuilder extends RouteBuilder {
@@ -103,10 +103,10 @@
protected static void init() {
if (inited.compareAndSet(false, true)) {
-
+
ContextBuilder contextBuilderMessageID = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
- ConnectionFactory connectionFactory =
+ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
jmsComponent.setUseMessageIDAsCorrelationID(true);
@@ -115,10 +115,10 @@
return context;
}
};
-
+
ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
- ConnectionFactory connectionFactory =
+ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
jmsComponent.setUseMessageIDAsCorrelationID(false);
@@ -127,10 +127,10 @@
return context;
}
};
-
+
ContextBuilder contextBuilderCorrelationIDDiffComp = new ContextBuilder() {
public CamelContext buildContext(CamelContext context) throws Exception {
- ConnectionFactory connectionFactory =
+ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
jmsComponent.setUseMessageIDAsCorrelationID(false);
@@ -143,8 +143,8 @@
return context;
}
};
-
-
+
+
contextBuilders.put("testUseMessageIDAsCorrelationID", contextBuilderMessageID);
contextBuilders.put("testUseCorrelationID", contextBuilderCorrelationID);
contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", contextBuilderMessageID);
@@ -152,7 +152,7 @@
contextBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", contextBuilderCorrelationIDDiffComp);
contextBuilders.put("testUseMessageIDAsCorrelationIDTimeout", contextBuilderMessageID);
contextBuilders.put("testUseCorrelationIDTimeout", contextBuilderMessageID);
-
+
routeBuilders.put("testUseMessageIDAsCorrelationID", new SingleNodeRouteBuilder());
routeBuilders.put("testUseCorrelationID", new SingleNodeRouteBuilder());
routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", new MultiNodeRouteBuilder());
@@ -162,26 +162,26 @@
routeBuilders.put("testUseCorrelationIDTimeout", new SingleNodeDeadEndRouteBuilder());
}
}
-
+
public class Task extends Thread {
private AtomicInteger counter;
private boolean ok = true;
private String message = "";
-
+
public Task(AtomicInteger counter) {
this.counter = counter;
}
-
+
public void run() {
for (int i = 0; i < maxCalls; i++) {
int callId = counter.incrementAndGet();
Object reply = template.requestBody(endpoingUriA, request + "-" + callId);
if (!reply.equals(expectedReply + "-" + callId)) {
ok = false;
- message = "Unexpected reply. Expected: '" + expectedReply + "-" + callId
- + "'; Received: '" + reply +"'";
+ message = "Unexpected reply. Expected: '" + expectedReply + "-" + callId
+ + "'; Received: '" + reply + "'";
}
- }
+ }
}
public void assertSuccess() {
assertTrue(message, ok);
@@ -201,9 +201,9 @@
public void testUseCorrelationID() throws Exception {
runRequestReplyThreaded();
}
-
+
public void testUseMessageIDAsCorrelationIDMultiNode() throws Exception {
- runRequestReplyThreaded();
+ runRequestReplyThreaded();
}
public void testUseCorrelationIDTimeout() throws Exception {
@@ -223,28 +223,28 @@
Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
assertTrue(c.getRequestor().getDeferredRequestMap().size() == 0);
}
-
+
public void testUseCorrelationIDMultiNodeDiffComponents() throws Exception {
runRequestReplyThreaded();
}
-
+
/*
* REVISIT: This currently fails because there is a single instance of Requestor per
JmsComponent
* which shares requestMap amongst JmsProducers. This is a problem in case where the
same correlationID
* value travels between nodes serviced by the same JmsComponent:
* client -> producer1 -> corrId -> consumer1 -> producer2 -> corrId ->
consumer
* producer1 (Bum! @) <- corrId <- consumer1 <- producer2 <- corrId <-
reply
- *
+ *
* @ - The request entry for corrId was already removed from JmsProducer shared requestMap
- *
+ *
* Possible ways to solve this: Each JmsProducer gets its own replyTo destination
- *
+ *
public void testUseCorrelationIDMultiNode() throws Exception {
runRequestReplyThreaded();
}
*/
-
+
protected void runRequestReplyThreaded() throws Exception {
final AtomicInteger counter = new AtomicInteger(-1);
Task[] tasks = new Task[maxTasks];
@@ -258,7 +258,7 @@
tasks[i].assertSuccess();
}
}
-
+
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
return contextBuilders.get(getName()).buildContext(camelContext);
|