activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] git commit: Polished camel broker component
Date Sat, 02 Nov 2013 18:35:10 GMT
Updated Branches:
  refs/heads/trunk 6e49ef3a6 -> 1ad6bd59e


Polished camel broker component


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5469d806
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5469d806
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5469d806

Branch: refs/heads/trunk
Commit: 5469d806e8a8a94f774528fb65d6c52352ebce6f
Parents: 6e49ef3
Author: Claus Ibsen <claus.ibsen@gmail.com>
Authored: Sat Nov 2 19:27:00 2013 +0100
Committer: Claus Ibsen <claus.ibsen@gmail.com>
Committed: Sat Nov 2 19:27:00 2013 +0100

----------------------------------------------------------------------
 .../camel/component/broker/BrokerComponent.java |  3 +-
 .../component/broker/BrokerConfiguration.java   |  2 -
 .../camel/component/broker/BrokerConsumer.java  | 10 +++--
 .../camel/component/broker/BrokerEndpoint.java  |  7 +---
 .../component/broker/BrokerJmsMessage.java      | 12 +++---
 .../camel/component/broker/BrokerProducer.java  | 39 ++++++--------------
 6 files changed, 25 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
index 3fce9ae..16edd30 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java
@@ -57,13 +57,12 @@ public class BrokerComponent extends UriEndpointComponent implements EndpointCom
             remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()),
'/');
         }
 
-
         ActiveMQDestination destination = ActiveMQDestination.createDestination(remaining,
destinationType);
         BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, destination, brokerConfiguration);
+        setProperties(brokerEndpoint, parameters);
         return brokerEndpoint;
     }
 
-
     @Override
     public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration,
String completionText) {
         String brokerName = String.valueOf(componentConfiguration.getParameter("brokerName"));

http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
index 583720e..6609533 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java
@@ -23,7 +23,6 @@ public class BrokerConfiguration {
     @UriParam
     private String brokerName = "";
 
-
     public String getBrokerName() {
         return brokerName;
     }
@@ -32,5 +31,4 @@ public class BrokerConfiguration {
         this.brokerName = brokerName;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
index 39b25e2..1dc52ae 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java
@@ -25,11 +25,8 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.component.jms.JmsBinding;
 import org.apache.camel.impl.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor {
-    protected final transient Logger logger = LoggerFactory.getLogger(BrokerConsumer.class);
     private final JmsBinding jmsBinding = new JmsBinding();
 
     public BrokerConsumer(Endpoint endpoint, Processor processor) {
@@ -58,7 +55,12 @@ public class BrokerConsumer extends DefaultConsumer implements MessageIntercepto
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            logger.error("Failed to process " + exchange, e);
+            exchange.setException(e);
+        }
+
+        if (exchange.getException() != null) {
+            getExceptionHandler().handleException("Error processing intercepted message:
" + message, exchange, exchange.getException());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
index e327669..55e7f7b 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java
@@ -38,15 +38,13 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder;
 
 @ManagedResource(description = "Managed Camel Broker Endpoint")
 @UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class)
-
 public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport,
Service {
+
     static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
 
     @UriParam
     private final BrokerConfiguration configuration;
     private MessageInterceptorRegistry messageInterceptorRegistry;
-
-
     @UriPath
     private final ActiveMQDestination destination;
     private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
@@ -70,7 +68,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
         return consumer;
     }
 
-
     @Override
     public boolean isSingleton() {
         return false;
@@ -85,7 +82,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
         return destination;
     }
 
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -111,7 +107,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
 
     protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
         messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
-
     }
 
     protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message)
throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
index 02dcabe..f77e97b 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java
@@ -17,11 +17,14 @@
 package org.apache.activemq.camel.component.broker;
 
 import javax.jms.Message;
+
 import org.apache.camel.component.jms.JmsBinding;
 import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.jms.JmsMessageHelper;
 import org.apache.camel.util.ObjectHelper;
 
 public class BrokerJmsMessage extends JmsMessage {
+
     public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) {
         super(jmsMessage, binding);
     }
@@ -29,12 +32,10 @@ public class BrokerJmsMessage extends JmsMessage {
     @Override
     public String toString() {
         if (getJmsMessage() != null) {
-            try {
-                return "BrokerJmsMessage[JMSMessageID: " + getJmsMessage().getJMSMessageID();
-            } catch (Exception e) {
-            }
+            return "BrokerJmsMessage[JMSMessageID: " + JmsMessageHelper.getJMSMessageID(getJmsMessage());
+        } else {
+            return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
         }
-        return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
     }
 
     @Override
@@ -45,7 +46,6 @@ public class BrokerJmsMessage extends JmsMessage {
         }
     }
 
-
     @Override
     public BrokerJmsMessage newInstance() {
         return new BrokerJmsMessage(null, getBinding());

http://git-wip-us.apache.org/repos/asf/activemq/blob/5469d806/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
index c12fbee..fcf1256 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java
@@ -17,10 +17,8 @@
 package org.apache.activemq.camel.component.broker;
 
 import java.util.Map;
-import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -30,7 +28,6 @@ import org.apache.camel.converter.ObjectConverter;
 import org.apache.camel.impl.DefaultAsyncProducer;
 
 public class BrokerProducer extends DefaultAsyncProducer {
-    private final ActiveMQMessageConverter activeMQConverter = new ActiveMQMessageConverter();
     private final BrokerEndpoint brokerEndpoint;
 
     public BrokerProducer(BrokerEndpoint endpoint) {
@@ -38,24 +35,12 @@ public class BrokerProducer extends DefaultAsyncProducer {
         brokerEndpoint = endpoint;
     }
 
-
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        // deny processing if we are not started
-        if (!isRunAllowed()) {
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException());
-            }
-            // we cannot process so invoke callback
-            callback.done(true);
-            return true;
-        }
-
         try {
             //In the middle of the broker - InOut doesn't make any sense
             //so we do in only
             return processInOnly(exchange, callback);
-
         } catch (Throwable e) {
             // must catch exception to ensure callback is invoked as expected
             // to let Camel error handling deal with this
@@ -74,8 +59,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
                 ProducerBrokerExchange producerBrokerExchange = (ProducerBrokerExchange)
exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE);
 
                 brokerEndpoint.inject(producerBrokerExchange, message);
-
-
             }
         } catch (Exception e) {
             exchange.setException(e);
@@ -85,34 +68,34 @@ public class BrokerProducer extends DefaultAsyncProducer {
     }
 
     private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
-        ActiveMQMessage result = null;
-        Message camelMesssage = null;
+        ActiveMQMessage result;
+        Message camelMessage;
         if (exchange.hasOut()) {
-            camelMesssage = exchange.getOut();
+            camelMessage = exchange.getOut();
         } else {
-            camelMesssage = exchange.getIn();
+            camelMessage = exchange.getIn();
         }
 
-        Map<String, Object> headers = camelMesssage.getHeaders();
+        Map<String, Object> headers = camelMessage.getHeaders();
 
         /**
          * We purposely don't want to support injecting messages half-way through
          * broker processing - use the activemq camel component for that - but
          * we will support changing message headers and destinations
          */
-        if (camelMesssage instanceof JmsMessage) {
-            JmsMessage jmsMessage = (JmsMessage) camelMesssage;
+        if (camelMessage instanceof JmsMessage) {
+            JmsMessage jmsMessage = (JmsMessage) camelMessage;
             if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
                 result = (ActiveMQMessage) jmsMessage.getJmsMessage();
                 //lets apply any new message headers
                 setJmsHeaders(result, headers);
             } else {
-
-                throw new IllegalStateException("not the original message from the broker
" + jmsMessage.getJmsMessage());
+                throw new IllegalStateException("Not the original message from the broker
" + jmsMessage.getJmsMessage());
             }
         } else {
-            throw new IllegalStateException("not the original message from the broker " +
camelMesssage);
+            throw new IllegalStateException("Not the original message from the broker " +
camelMessage);
         }
+
         return result;
     }
 
@@ -154,6 +137,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
                 }
             }
         }
-
     }
+
 }


Mime
View raw message