camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruce Elmore <belm...@qualcomm.com>
Subject RE: Handling Bean exceptions
Date Fri, 06 Jun 2008 13:49:20 GMT

I've taken this a step further and implemented (IMO) a more consistent
solution. 

First, I wanted the exception preserved and sent over in the JMS reply
queue. This requires a change to the JmsBindings.makeJmsMessage method (the
same change as shown before). This is arguably a kludge because I'm passing
back the exception in the body of the JMS message. I could have serialized
and returned it in a header instead, but I wondered if returned a large
header would be a good idea. Note that the problem of the client not being
able to deserialize the exception on the other side is still an issue. I'm
open to suggestions on this one. I was thinking about changing it to return
an InvocationTargetException passing exception.toString() to it's
constructor.

public class MyJmsBinding extends JmsBinding {

	public MyJmsBinding() {
	}

	@Override
	public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message
camelMessage,
		Session session) throws JMSException
	{
		Message answer = null;
		if (camelMessage instanceof JmsMessage) {
			JmsMessage jmsMessage = (JmsMessage) camelMessage;
			answer = jmsMessage.getJmsMessage();
		}
		if (answer == null) {
			if (exchange.getException() != null) {
				answer = createJmsMessage(exchange.getException(), 
					session, exchange.getContext());				
			}
			else {
				answer = createJmsMessage(camelMessage.getBody(), session,
exchange.getContext());
			}
			
			appendJmsProperties(answer, exchange, camelMessage);
		}
		return answer;
	}
}

--------------------

Now, I needed to change the JmsProducer's process method to set the
exception field on the Exchange if the object returned is a Throwable.

public class MyJmsProducer extends
org.apache.camel.component.jms.JmsProducer {
	private static final transient Log LOG =
LogFactory.getLog(JmsProducer.class);
	private JmsEndpoint endpoint;
	
	public MyJmsProducer(JmsEndpoint endpoint) {
		super(endpoint);
		
		this.endpoint = endpoint;
	}
	
	@Override
    public void process(final Exchange exchange) {
        final org.apache.camel.Message in = exchange.getIn();

        if (exchange.getPattern().isOutCapable()) {
            // create a temporary queue and consumer for responses...
            // note due to JMS transaction semantics we cannot use a single
transaction
            // for sending the request and receiving the response
            Requestor requestor;
            try {
                requestor = endpoint.getRequestor();
            } catch (Exception e) {
                throw new RuntimeExchangeException(e, exchange);
            }

            final Destination replyTo = requestor.getReplyTo();

            String correlationId = in.getHeader("JMSCorrelationID",
String.class);
            if (correlationId == null) {
                correlationId = getUuidGenerator().generateId();
                in.setHeader("JMSCorrelationID", correlationId);
            }

            // lets register the future object before we try send just in
case
            long requestTimeout = endpoint.getRequestTimeout();
            FutureTask future = requestor.getReceiveFuture(correlationId,
requestTimeout);

            getInOutTemplate().send(endpoint.getDestination(), new
MessageCreator() {
                public Message createMessage(Session session) throws
JMSException {
                    Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session);
                    message.setJMSReplyTo(replyTo);

                    if (LOG.isDebugEnabled()) {
                        LOG.debug(endpoint + " sending JMS message: " +
message);
                    }
                    return message;
                }
            });

            // lets wait and return the response
            try {
                Message message = null;
                try {
                    if (requestTimeout < 0) {
                        message = (Message)future.get();
                    } else {
                        message = (Message)future.get(requestTimeout,
TimeUnit.MILLISECONDS);
                    }
                } catch (InterruptedException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Future interupted: " + e, e);
                    }
                } catch (TimeoutException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Future timed out: " + e, e);
                    }
                }
                if (message != null) {
                	Object object =
endpoint.getBinding().extractBodyFromJms(exchange, message);
                	
                	if (object instanceof Throwable) {
                		exchange.setException((Throwable)object);
                	} else {	
                		exchange.setOut(new JmsMessage(message,
endpoint.getBinding()));
                	}
                } else {
                    // lets set a timed out exception
                    exchange.setException(new
ExchangeTimedOutException(exchange, requestTimeout));
                }
            } catch (Exception e) {
                exchange.setException(e);
            }
        } else {
            getInOnlyTemplate().send(endpoint.getDestination(), new
MessageCreator() {
                public Message createMessage(Session session) throws
JMSException {
                    Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(endpoint + " sending JMS message: " +
message);
                    }
                    return message;
                }
            });
        }
    }
}

-----------------------------------

Unfortunately, I couldn't just inject these classes using my Spring config.
Instead, I had to subclass several other classes. 

The JmsEndpoint and JmsQueueEndpoint are responsible for creating the
JmsProducer, so I subclassed those. Also, note that JmsEndpoint creates the
JmsBinding.

public class MyJmsEndpoint extends JmsEndpoint {

	public MyJmsEndpoint(String uri, JmsComponent component, String
destination,
		boolean pubSubDomain, JmsConfiguration configuration)
	{
		super(uri, component, destination, pubSubDomain, configuration);

		this.setBinding(new MyJmsBinding());
	}

	@Override
    public JmsProducer createProducer() throws Exception {
        return new MyJmsProducer(this);
    }
}

----------------------

public class MyJmsQueueEndpoint extends JmsQueueEndpoint {

	public MyJmsQueueEndpoint(String uri, JmsComponent component, String
destination,
		JmsConfiguration configuration, QueueBrowseStrategy queueBrowseStrategy)
	{
		super(uri, component, destination, configuration, queueBrowseStrategy);
		
		this.setBinding(new MyJmsBinding());
	}

	@Override
    public JmsProducer createProducer() throws Exception {
        return new MyJmsProducer(this);
    }
}

------------------------------------

The JmsComponent creates the endpoints, so I subclassed that as well.

public class MyJmsComponent extends JmsComponent {

	@Override
    protected Endpoint<JmsExchange> createEndpoint(String uri, String
remaining, Map parameters) throws Exception {

        boolean pubSubDomain = false;
        if (remaining.startsWith(QUEUE_PREFIX)) {
            pubSubDomain = false;
            remaining =
removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
        } else if (remaining.startsWith(TOPIC_PREFIX)) {
            pubSubDomain = true;
            remaining =
removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
        }

        final String subject = convertPathToActualDestination(remaining);

        // lets make sure we copy the configuration as each endpoint can
        // customize its own version
        JmsConfiguration newConfiguration = getConfiguration().copy();
        JmsEndpoint endpoint;
        QueueBrowseStrategy strategy = getQueueBrowseStrategy();
        if (pubSubDomain || strategy == null) {
            endpoint = new MyJmsEndpoint(uri, this, subject, pubSubDomain,
newConfiguration);
        } else {
            endpoint = new MyJmsQueueEndpoint(uri, this, subject,
newConfiguration, strategy);
        }

        String selector = (String)parameters.remove("selector");
        if (selector != null) {
            endpoint.setSelector(selector);
        }
        setProperties(endpoint.getConfiguration(), parameters);
        return endpoint;
    }
}

--------------------------

That resulted in behavior that seemed consistent with the direct transport
and how other exceptions in JMS (such as a receive timeout) behaved. But I
still didn't like that I had to check the Exchange object to determine if an
exception occurred. I subclassed CamelTemplate to fix that issue.

public class MyCamelTemplate<E extends Exchange> extends CamelTemplate<E> {

	public MyCamelTemplate(CamelContext context) {
		super(context);
	}

	@Override
    protected Object extractResultBody(E result) {
    	
    	if (result.getException() != null) {
    		throw new RuntimeException(result.getException());
    	}
    	
    	return super.extractResultBody(result);
    }
}

Please let me know your thoughts on this. Thanks!
-- 
View this message in context: http://www.nabble.com/Handling-Bean-exceptions-tp17671948s22882p17692630.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Mime
View raw message